分布式系统的数据倾斜问题

1. 什么是数据倾斜

对Spark/Hadoop这样的大数据系统来讲,数据量大并不可怕,可怕的是数据倾斜。

何谓数据倾斜?数据倾斜指的是,并行处理的数据集中,某一部分(如Spark或Kafka的一个Partition)的数据量显著多于其它部分,从而使得该部分的处理速度成为整个数据集处理的瓶颈。

对于分布式系统而言,理想情况下,随着系统规模(节点数量)的增加,应用整体耗时线性下降。如果一台机器处理一批大量数据需要120分钟,当机器数量增加到三时,理想的耗时为120 / 3 = 40分钟

但是,上述情况只是理想情况,实际上将单机任务转换成分布式任务后,会有overhead,使得总的任务量较之单机时有所增加,所以每台机器的执行时间加起来比单台机器时更大。这里暂不考虑这些overhead,假设单机任务转换成分布式任务后,总任务量不变。

但即使如此,想做到分布式情况下每台机器执行时间是单机时的1 / N,就必须保证每台机器的任务量相等。不幸的是,很多时候,任务的分配是不均匀的,甚至不均匀到大部分任务被分配到个别机器上,其它大部分机器所分配的任务量只占总得的小部分。比如一台机器负责处理80%的任务,另外两台机器各处理10%的任务,如下图所示。

在上图中,机器数据增加为三倍,但执行时间只降为原来的80%,远低于理想值。   

2. 数据倾斜的危害

  1. 资源消耗分布不均。木桶理论、出现性能瓶颈,能充分发挥分布式系统的并行计算、多内存的优势

3. 数据倾斜是如何造成的

Stage的数据来源主要分为如下两类

  1. 从数据源直接读取。如读取HDFS,Kafka
  2. 读取上一个Stage的Shuffle数据

3.1. 数据源读取

3.2. shuffle

必须将各个节点上相同的key拉取到某个节点上的一个task来进行处理,比如按照key进行聚合或join等操作。此时如果某个key对应的数据量特别大的话,就会发生数据倾斜。比如大部分key对应10条数据,但是个别key却对应了100万条数据,那么大部分task可能就只会分配到10条数据,然后1秒钟就运行完了;但是个别task可能分配到了100万数据,要运行一两个小时。因此,整个Spark作业的运行进度是由运行时间最长的那个task决定的。

4. 如何缓解/消除数据倾斜

4.1. 避免数据源的数据倾斜

4.1.1. 来自Kafka的数据

以Spark Stream通过DirectStream方式读取Kafka数据为例。由于Kafka的每一个Partition对应Spark的一个Task(Partition),所以Kafka内相关Topic的各Partition之间数据是否平衡,直接决定Spark处理该数据时是否会产生数据倾斜。

如《Kafka设计解析(一)- Kafka背景及架构介绍》一文所述,Kafka某一Topic内消息在不同Partition之间的分布,主要由Producer端所使用的Partition实现类决定。如果使用随机Partitioner,则每条消息会随机发送到一个Partition中,从而从概率上来讲,各Partition间的数据会达到平衡。此时源Stage(直接读取Kafka数据的Stage)不会产生数据倾斜。

但很多时候,业务场景可能会要求将具备同一特征的数据顺序消费,此时就需要将具有相同特征的数据放于同一个Partition中。一个典型的场景是,需要将同一个用户相关的PV信息置于同一个Partition中。此时,如果产生了数据倾斜,则需要通过其它方式处理。

4.1.2. 来自文件系统的数据

Spark以通过textFile(path, minPartitions)方法读取文件时,使用TextFileFormat。

关键:Partition文件大小一致性

4.1.2.1. 对于不可分文件

尽量使用可切分的格式代替不可切分的格式,或者保证各文件实际包含数据量大致相同。

每个文件对应一个Split从而对应一个Partition。此时各文件大小是否一致,很大程度上决定了是否存在数据源侧的数据倾斜。

另外,对于不可切分的压缩文件,即使压缩后的文件大小一致,它所包含的实际数据量也可能差别很多,因为源文件数据重复度越高,压缩比越高。反过来,即使压缩文件大小接近,但由于压缩比可能差距很大,所需处理的数据量差距也可能很大。

此时可通过在数据生成端将不可切分文件存储为可切分文件,或者保证各文件包含数据量相同的方式避免数据倾斜。

4.1.2.2. 对于可分文件

$$每个文件大小=\frac{所有文件大小}{minPartitions}$$

protected long computeSplitSize(long goalSize, long minSize, long blockSize) {
    return Math.max(minSize, Math.min(goalSize, blockSize));
}

默认情况下各Split的大小不会太大,一般相当于一个Block大小(在Hadoop 2中,默认值为128MB),所以数据倾斜问题不明显。如果出现了严重的数据倾斜,可通过上述参数调整。

4.2. Stage 操作中的避免--合理的数据分割器Partitioner

4.2.1. HashPartitioner

Spark在做Shuffle时,默认使用HashPartitioner(非Hash Shuffle)对数据进行分区。如果并行度设置的不合适,可能造成大量不相同的Key对应的数据被分配到了同一个Task上,造成该Task所处理的数据远大于其它Task,从而造成数据倾斜。

如果调整Shuffle时的并行度,使得原本被分配到同一Task的不同Key发配到不同Task上处理,则可降低原Task所需处理的数据量,从而缓解数据倾斜问题造成的短板效应。

大量不同的Key被分配到了相同的Task造成该Task数据量过大。

解决方案
调整并行度。一般是增大并行度,但有时如本例减小并行度也可达到效果。

优势
实现简单,可在需要Shuffle的操作算子上直接设置并行度或者使用spark.default.parallelism设置。如果是Spark SQL,还可通过SET spark.sql.shuffle.partitions=[num_tasks]设置并行度。可用最小的代价解决问题。一般如果出现数据倾斜,都可以通过这种方法先试验几次,如果问题未解决,再尝试其它方法。

劣势
适用场景少,只能将分配到同一Task的不同Key分散开,但对于同一Key倾斜严重的情况该方法并不适用。并且该方法一般只能缓解数据倾斜,没有彻底消除问题。从实践经验来看,其效果一般。

4.2.2. 自定义Partitioner

使用自定义的Partitioner(默认为HashPartitioner),将原本被分配到同一个Task的不同Key分配到不同Task。

适用场景
大量不同的Key被分配到了相同的Task造成该Task数据量过大。

解决方案
使用自定义的Partitioner实现类代替默认的HashPartitioner,尽量将所有不同的Key均匀分配到不同的Task中。

优势
不影响原有的并行度设计。如果改变并行度,后续Stage的并行度也会默认改变,可能会影响后续Stage。

劣势
适用场景有限,只能将不同Key分散开,对于同一Key对应数据集非常大的场景不适用。效果与调整并行度类似,只能缓解数据倾斜而不能完全消除数据倾斜。而且需要根据数据特点自定义专用的Partitioner,不够灵活。

4.3. Stage 操作中的避免--Broadcast机制

共享变量

当一组任务在不同的节点上并行运行一个函数时,Spark会为函数中的每个变量发送一个副本到各个任务中去(低效)。有时,变量需要在任务与任务、任务与驱动程序间共享。Spark有两种共享变量。

累加器:将工作节点中的值聚合到驱动程序中
广播变量:在各个节点中cache一个只读变量

4.3.1. 将Reduce side Join转变为Map side Join

3.5.1 原理
通过Spark的Broadcast机制,将Reduce侧Join转化为Map侧Join,避免Shuffle从而完全消除Shuffle带来的数据倾斜。

3.5.3 总结
适用场景
参与Join的一边数据集足够小,可被加载进Driver并通过Broadcast方法广播到各个Executor中。

解决方案
在Java/Scala代码中将小数据集数据拉取到Driver,然后通过Broadcast方案将小数据集的数据广播到各Executor。或者在使用SQL前,将Broadcast的阈值调整得足够大,从而使用Broadcast生效。进而将Reduce侧Join替换为Map侧Join。

优势
避免了Shuffle,彻底消除了数据倾斜产生的条件,可极大提升性能。

劣势
要求参与Join的一侧数据集足够小,并且主要适用于Join的场景,不适合聚合的场景,适用条件有限。

4.4. 两阶段聚合(局部聚合+全局聚合)

4.5. 修改key

4.5.1. 为skew的key增加随机前/后缀

3.6.1 原理
为数据量特别大的Key增加随机前/后缀,使得原来Key相同的数据变为Key不相同的数据,从而使倾斜的数据集分散到不同的Task中,彻底解决数据倾斜问题。Join另一则的数据中,与倾斜Key对应的部分数据,与随机前缀集作笛卡尔乘积,从而保证无论数据倾斜侧倾斜Key如何加前缀,都能与之正常Join。

通过分析Join Stage的所有Task可知

由于Join分倾斜数据集Join和非倾斜数据集Join,而各Join的并行度均为48,故总的并行度为96
由于提交任务时,设置的Executor个数为4,每个Executor的core数为12,故可用Core数为48,所以前48个Task同时启动(其Launch时间相同),后48个Task的启动时间各不相同(等待前面的Task结束才开始)
由于倾斜Key被加上随机前缀,原本相同的Key变为不同的Key,被分散到不同的Task处理,故在所有Task中,未发现所处理数据集明显高于其它Task的情况

实际上,由于倾斜Key与非倾斜Key的操作完全独立,可并行进行。而本实验受限于可用总核数为48,可同时运行的总Task数为48,故而该方案只是将总耗时减少一半(效率提升一倍)。如果资源充足,可并发执行Task数增多,该方案的优势将更为明显。在实际项目中,该方案往往可提升数倍至10倍的效率。

3.6.3 总结
适用场景
两张表都比较大,无法使用Map则Join。其中一个RDD有少数几个Key的数据量过大,另外一个RDD的Key分布较为均匀。

解决方案
将有数据倾斜的RDD中倾斜Key对应的数据集单独抽取出来加上随机前缀,另外一个RDD每条数据分别与随机前缀结合形成新的RDD(相当于将其数据增到到原来的N倍,N即为随机前缀的总个数),然后将二者Join并去掉前缀。然后将不包含倾斜Key的剩余数据进行Join。最后将两次Join的结果集通过union合并,即可得到全部Join结果。

优势
相对于Map则Join,更能适应大数据集的Join。如果资源充足,倾斜部分数据集与非倾斜部分数据集可并行进行,效率提升明显。且只针对倾斜部分的数据做数据扩展,增加的资源消耗有限。

劣势
如果倾斜Key非常多,则另一侧数据膨胀非常大,此方案不适用。而且此时对倾斜Key与非倾斜Key分开处理,需要扫描数据集两遍,增加了开销。

4.5.2. 大表随机添加N种随机前缀,小表扩大N倍

如果出现数据倾斜的Key比较多,上一种方法将这些大量的倾斜Key分拆出来,意义不大。此时更适合直接对存在数据倾斜的数据集全部加上随机前缀,然后对另外一个不存在严重数据倾斜的数据集整体与随机前缀集作笛卡尔乘积(即将数据量扩大N倍)。

一个数据集存在的倾斜Key比较多,另外一个数据集数据分布比较均匀。

对大部分场景都适用,效果不错。

需要将一个数据集整体扩大N倍,会增加资源消耗。

5. 总结

对于数据倾斜,并无一个统一的一劳永逸的方法。更多的时候,是结合数据特点(数据集大小,倾斜Key的多少等)综合使用上文所述的多种方法。

6. 参考资料


  1. The Internals of Apache Spark https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-rdd-shuffle.html 


如果你觉得这篇文章对你有帮助,不妨请我喝杯咖啡,鼓励我创造更多!