云计算技术

阿里云离线 离线计算平台系列之一

在蚂蚁金服风控体系里面,有一个重要的环节就是离线仿真,在规则,模型上线之前,在离线的环境里面进行仿真验证,来对规则和模型进行效能的评估,避免人为因素造成不准确性从而造成的资损。起初为了达到这个目的,离线计算平台就这样孕育而生了,慢慢地整个离线平台覆盖了更多风控的业务,也慢慢变成目前Odps-Spark最大的用户,拥有的集群数目也是最大的。离线计算平台主要以Spark为基础,在其上建立起来的一套平台. 后面我们团队会给大家带来一系列,关于离线平台的架构以及我们做过相应业务以及经验,希望和大家一起来探讨。
下面由我来给大家分享下,我们整个团队建立起离线计算平台里面的SparkContext管理以及几个Spark优化手段。

在我们的离线业务场景里面,我们需要持续地接受用户提交实验任务进行分析以及运算,所以对于Spark的要求第一点就是需要Long-Live的SparkContext, 可以持续接受任务提交,而不是类似d2那样提交一次性的任务。所以我们以目前Odps-Spark Client模式为基础,建立了一套自己的SparkContext 管理系统,提供了动态新增,删除SparkContext功能,可以持续地提交任务,调度相应任务的功能。

由于SparkContext需要一个独立的JVM进程服务,我们目前是利用RMI来完成启动SparkContext的工作。 在应用服务器上面,当它被通知自己需要启动SparkContext的时候,就会在进行相应的Spark Jar准备,然后进行打包,并且启动相应的RMI进程来当做SparkContext. 在任务执行的流程中,前端系统会向离线系统提交任务,离线系统就有一套自己的任务调度系统,根据目前管理的SparkContext里面选择一个相对比较空闲的,进行任务提交,否则就进行等待。当任务提交到RMI Server之后,就会对任务进行相应的组装以及转化成Spark可以执行的任务,进行执行并且同Spark Cluster进行交互。最终Spark任务结果会同步地返回给RMI Server, 然后再通过RMI Server异步的通知到应用服务器,并且进行任务的进一步处理。

经验分享:控制好每个SparkContext的并发度,充分利用SparkContext的能力(目前我们一个SparkContext上面并发3个任务,后续会引入动态任务大小的评估,从而实现真正的动态任务提交以及资源分配),统一管理每个SparkContext下面的并发线程,而且不是每个任务都去创建线程池,特别注意从RMI Server返回给离线系统数据,不能带有Scala相关的东西,离线系统无法解析。优雅地停掉SparkContext, 防止资源的泄露。

对于风控来说,查看用户历史数据来进行判断风险的行为是一种常见的手段,所以动不动就30天的交易记录,90的某相累计数据。。。 做离线的小伙伴已经哭晕在厕所,在做Spark离线的期间,我们也产出了自己的东西,这里我就介绍三种比较有意义的内容,后续欢迎大家来交流。

动态加载Jar
由于安全性的问题,目前Odps-Spark并没有实现动态加载Jar的功能,那如果真有这部分需求的时候,我们应该怎么做呢?例如有些规则脚本在我们实验的时候进行了更新,离线也需要同步更新,可是这里SparkContext已经创建出来,没有办法再新增Jar了。目前我们的解决方案是,在执行任务的时候,把这些Jar打好包,并且创建出一个ClassLoader来加载这些可变的Jar, 然后在实验的时候,把Classloader 通过broadcast机制分发到各个worker上面,然后再需要利用这些Jar的闭包里面,通过这个classloader进行加载并且执行。这里需要注意的问题是: 多线程的问题, 因为broadcast value每台worker只有一份,如果一个台worker上面有多个cpu去访问broadcast value的时候,如果没有控制好多线程的问题,就会出现一些奇葩问题,而且这种问题不好排查,因为已经在worker上面,不好去打日志进行验证。所以在处理Broadcast value的时候,尽量做到线程安全。

减少shuffle key的数目
在Spark开发中,基本上都会用到join, cogroup等操作,这类操作就会产生shuffle操作, Spark程序的性能很大程度就是取决于shuffle的性能上面,除了调优修改shuffle的参数(spark.shuffle.memoryFraction, spark.shuffle.manager, spark.shuffle.file.buffer等), 也可以利用其它手段来完成

map side join, 特别是在两个RDD, 一个数据量小, 一个数据量大的情况,我们可以把数据量小的那个做成broadcast, 从而把这次shuffle操作转变成一次map操作,大大地减少shuffle中的性能消耗。

利用Bloom Filter过滤掉多余的key, 这也是我们在实践发现的,当两个RDD进行cogroup的时候,其实会有很多无用的key, 例如用userId进行关联的时候,会很多无用的userId进行干扰,但是他们也参与整个shuffle的流程中,这时候我们可以把key数目相对比较少的那个RDD的key收集来(可能会有多次collect操作,因为每次collect操作有大小限制),然后把这些做成Bloom Filter去过滤另外一个RDD里面的key, 从而达到减少shuffle中间的数据量的效果。

数据倾斜的解决
当你在logview上面发现你的2400分区的数据,2399都跑完,另外一个分区怎么跑都跑不完,并且执行时间已经远远超越同伴啦。那么恭喜你,很有可能是数据倾斜发生了。目前介绍下我们的做法

首先是去采集哪些key会出现数据倾斜,这里可以使用groupbykey,然后进行count, 如果这样都会挂掉,那么进行sample抽样来解决,随机抽样10%的数据来进行判断。

找到这样的key之后, 在RDD A里面过滤掉热点对应的key, 形成nonHot的RDD A, 然后针对有HotKey的RDD, 里面的每一个key打上 n 以内的随机数作为后缀。

hasHotEvent.map(x =>
if (hotKeySet.value.contains(x.1)) {
(x.1 + "@" + Random.nextInt(hashPartitionSize), x.2)
else
x
})

在RDD B里面首先同样先过滤掉热点的key, 形成nonHot的RDD B, 然后针对HotKey的RDD, 里面每一个key按顺序附加 0 - n 的后缀,每条hotKey的数据就会膨胀成n条数据

"java">val hotValues = res.filter(x => hotKeySet.value.contains(x.1)).flatMap(x =>
for (i <- 0.until(hashPartitionSize)) yield {
(x.1 + "@" + i, x.2)
}
})

最后notHotKey的RDD 进行join, hotkey的RDD进行join, 最后再进行union操作就可以等到最后的结果。