数据倾斜

概述

数据倾斜: Task 之间数据分配的非常不均匀

DataSkew 的原因:

数据倾斜一般发生在 shuffle 过程中。

  • 数据异常: key 有大量的空值/默认值
  • Map Task 数据倾斜
    • 数据文件大小不一致,数据压缩格式不可切分
    • 接收的数据源,如 Kafka 对应的数据分区不均匀
  • Reduce task 数据倾斜:
    • 数据存在很多空值或缺省值
    • Shuffle: 外因,Shuffle 操作涉及到大量的磁盘、网络 IO 、对作业型任务影响极大
    • Key 分布不均匀: 内部原因
    • Shuffle + Key 分布不均匀(主因):

主要原因为 shuffle + key 分布不均

DataSkew 问题定位:

  • 了解数据 key 的分布情况
  • 一般通过 Spark Web UI 或其他监控方式出现的异常来进行综合判断
  • 查看代码中可能导致 shuffle 的算子出现

DataSkew 的影响:

  • OutOfMemory,某个 Task 莫名出现内存移除情况, 一般情况下,OOM 的原因都是数据倾斜
  • 某个 Task 执行特别慢, 大量数据交由一个 Task 处理,拖累整个程序
  • Shuffle 过程报错

数据倾斜处理

  • 数据的预处理:

    • 过滤空值/默认值,filter + coalesce
    • 小区数据源带来的数据倾斜
  • 避免 Shuffle: 通过程序避免使用 shuffle 的算子完成相应的任务,通过改写程序实现。

  • 减少 Shuffle 过程中的数量

    • 使用 groupBy –> reduceByKey / aggregateByKey
    • 通过 Map 端的 Join 来减少
  • 选择新的可用于聚合/join 的 key

    • 根据业务,选择新的 key 去做聚合或 join。可尝试使用更细的维度。
  • 改变 reduce 的并行度: 一般情况下不管用,数据倾斜可能是由很多 key 造成的

  • 加盐强行打散 Key:

    处理 ****shuffle + key 不能分散。

    • 为数据量特别大的 Key 增加随机前/后缀,使得原来 Key 相同的数据变为 Key 不相同的数据,从而使倾斜的数据集分散到不同的 Task 中,彻底解决数据倾斜问题。
  • 自定义 Partitioner: 使用自定义的 Partitioner,将原本被分配到同一个 Task 的不同 Key 分配到不同 Task。

加盐打散 Key

两阶段聚合

不能改变原来的结果

  • 对于聚合类的 shuffle 操作导致的数据倾斜,效果较好。一般可以解决掉 DataSkew,将 Spark 作业的性能提升数倍以上。
  • 仅适用于聚合类的 shuffle 操作,适用范围相对较窄。如果是 join 类的 shuffle 操作,需要选择其他的解决方案。

执行步骤:

  • 加盐打散 key。给每个 key 都加一个随机数,如 10 以内的随机数。此时 key 就被打散了
  • 局部聚合。对打上随机数的数据,执行一次聚合操作,得到结果
  • 全局聚合。将各个 key 的前缀去掉,再进行一次聚合操作,得到最终结果

采样倾斜 key 并拆分 join 操作

使用场景:计算两个 RDD /两张表中的 key 分布情况。如果出现数据倾斜,是其中一个 RDD/Hive 表中的少数几个 key 的数据量过大,而另一个 RDD/Hive 表中的所有 key 都分布比较均匀,那么采用这个解决方案比较合适。

处理步骤:

1、对包含少数几个数据量过大的 key 的那个 RDD,通过 sample 算子采样出一份样本来,然后统计一下每个 key 的数量,计算出数据量最大的是哪几个 key;

2、将这几个 key 对应的数据从原来的 RDD 中拆分出来,形成一个单独的 RDD,并给每个 key 都打上 n 以内的随机数作为前缀,而不会导致倾斜的大部分 key 形成另外一个 RDD;

3、将需要 join 的另一个 RDD,也过滤出来那几个倾斜 key 对应的数据并形成一个单独的 RDD,将每条数据膨胀成 n 条数据,这 n 条数据都按顺序附加一个 0~n 的前缀,不会导致倾斜的大部分 key 也形成另外一个 RDD;

4、再将附加了随机前缀的独立 RDD 与另一个膨胀 n 倍的独立 RDD 进行 join,此时就可以将原先相同的 key 打散成 n 份,分散到多个 task 中去进行 join 了;

5、另外两个普通的 RDD 就照常 join 即可;

6、最后将两次 join 的结果使用 union 算子合并起来即可,就是最终的 join 结果。

使用随机前缀和扩容再进行 join

业务场景:如果在进行 join 操作时,RDD 中有大量的 key 导致数据倾斜,进行分拆 key 没什么意义,此时就只能使用最后一种方案来解决问题了。 处理步骤:

1、选一个 RDD,将每条数据都打上一个 n 以内的随机前缀(打散)

2、对另外一个 RDD 进行扩容,将每条数据都扩容成 n 条数据,扩容出来的每条数据都依次打上一个 0~n 的前缀

3、将两个处理后的 RDD 进行 join 即可

扩容保证能正常 join 的执行

使用注意事项:

  • 如果两个 RDD 都很大,那么将 RDD 进行 N 倍的扩容显然行不通
  • 使用扩容的方式通常能缓解数据倾斜,不能彻底解决数据倾斜问题