Shuffle

按照一定的规则对数据重新分区的过程就是 Shuffle。需要 Shuffle 是因为某种具有共同特征的数据汇聚到一个计算节点上进行计算。

Shuffle 过程中包含了许多低效的操作,包括磁盘 IO、序列化、网络数据传输等。

Shuffle 分类

  • Hash Shuffle V1
    • 两个严重问题:生成大量文件,占用文件描述符,同时引入 DiskObjectWriter 带来的 Writer Handler 的缓存也非常消耗内存。如果在 Reduce Task 时需要合并操作的话,会把数据放在一个 HashMap 中进行合并,如果数据量较大,很容易引发 OOM
  • Hash Shuffle V2
    • 在 v1 基础上引入 File Consolidation
    • 一个 Executor 上所有的 Map Task 生成的分区文件只有一份,即将所有的 Map Task 相同的分区文件合并,这样每个 Executor 上最多只生成 N 个分区文件。
  • Sort Shuffle V1
    • 每个 Task 不会为后续的每个 Task 创建单独的文件,而是将所有对结果写入同一个文件。该文件中的记录首先是按照 Partition Id 排序,每个 Partition 内部再按照 Key 进行排序,Map Task 运行期间会顺序写每个 Partition 的数据,同时生成一个索引文件记录每个 Partition 的大小和偏移量。
    • 在 Reduce 阶段,Reduce Task 拉取数据做 Combine 时不再采用 HashMap,而是采用 ExternalAppendOnlyMap,该数据结构在做 Combine 时,如果内存不足,会刷写磁盘,避免(AppendOnlyMap)大数据情况下的 OOM。
    • Sort Shuffle 解决了 Hash Shuffle 的所有弊端,但是因为需要其 Shuffle 过程需要对记录进行排序,所以在性能上有所损失。
  • Tungsten-Sort Based Shuffle(Unsafe Shuffle)
    • 1.5+ 后开始钨丝计划,优化内存和 CPU 的使用,使用了堆外内存进一步提升 Spark 的性能。
    • 将数据记录用二进制的方式存储,直接在序列化的二进制数据上 Sort 而不是在 Java 对象上。减少内存的使用和 GC 的开销,避免 Shuffle 过程中频繁的序列化以及反序列化。
    • 排序过程中,提供 cache-efficient sorter,使用一个 8 bytes 的指针,把排序转化成了一个指针数组的排序,极大的优化了排序性能。
    • 使用要求:
      • Shuffle 阶段不能有 aggregate 操作,对于 aggretateByKey 之类的算子无法使用。
      • 分区数不能超过 2^24
      • 序列化支持 relocation,如 kyro 序列化
  • Sort Shuffle V2
    • Spark1.6+,把 Sort Shuffle 和 Tungsten-Sort Based Shuffle 全部统一到 Sort Shuffle 中,如果检测到满足 Tungsten-Sort Based Shuffle 条件会自动采用 Tungsten-Sort Based Shuffle,否则采用 Sort Shuffle。
    • sort-based shuffle 的缺陷
      • mapper 的 Task 数量过大,依旧会产生大量小文件,此时在 shuffle 传递数据的过程中 reducer 端,reduce 会需要同时大量的记录进行反序列化,导致大量的内存消耗和 GC 的巨大负担,造成系统缓慢甚至崩溃。
      • 如果需要在分片内也进行排序,此时需要进行 mapper 端和 reducer 端的两次排序。

ShuffleWriter 执行流程

  1. 数据先写入一个内存数据结构中。不同的 shuffle 算子,可能选用不同的数据结构
    • 如果是 reduceByKey 聚合类的算子,选用 Map(ExternalAppendOnlyMap) 数据结构,一边通过 Map 进行聚合,一边写入内存
    • 如果是 join 类的 shuffle 算子,那么选用 Array(CompactBuffer) 数据结构,直接写入内存
  2. 检查是否达到内存阈值。每写一条数据进入内存数据结构之后,就会判断一下,是否达到了某个临界阈值。如果达到临界阈值的话,那么就会将内存数据结构中的数据溢写到磁盘,并清空内存数据结构
  3. 数据排序。在溢写到磁盘文件之前,会先根据 key 对内存数据结构中已有的数据进行排序。排序过后,会分批将数据写入磁盘文件。默认的 batch 数量是 10000 条,也就是说,排序好的数据,会以每批 1 万条数据的形式分批写入磁盘文件
  4. 数据写入缓冲区。写入磁盘文件是通过 Java 的 BufferedOutputStream 实现的。BufferedOutputStream 是 Java 的缓冲输出流,首先会将数据缓冲在内存中,当内存缓冲满溢之后再一次写入磁盘文件中,这样可以减少磁盘 IO 次数,提升性能
  5. 重复写多个临时文件。一个 Task 将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,会产生多个临时文件
  6. 临时文件合并。最后将所有的临时磁盘文件进行合并,这就是 merge 过程。此时会将之前所有临时磁盘文件中的数据读取出来,然后依次写入最终的磁盘文件之中写索引文件。由于一个 Task 就只对应一个磁盘文件,也就意味着该 task 为下游 stage 的 task 准备的数据都在这一个文件中,因此还会单独写一份索引文件,其中标识了下游各个 Task 的数据在文件中的 start offset 与 end offset。

SortShuffleManager

当前 ShuffleManager 唯一实现类。在基于排序的 shuffle 中,传入记录根据其目标分区 id 进行排序,然后写入单个映射输出文件。Reducers 获取这个文件的连续区域,以便读取映射输出中属于它们的部分。在 map 输出数据太大而无法装入内存的情况下,可以将输出的排序子集溢出到磁盘,并合并那些磁盘上的文件以生成最终的输出文件。

  • getReader(): 返回读取 shuffle 过程的数据的 reader,当前使用的位置
    • 当前获取的位置: CoGroupedRDD、ShuffledRDD、SubtractedRDD、ShuffledRowRDD
  • getWriter(): 根据条件判断选择哪种 ShuffleWriter,可选择 BypassMergeSortShuffleWriter, UnsafeShuffleWriter, SortShuffleWriter
  • registerShuffle(): 注册 shuffle
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager with Logging {

/**
* A mapping from shuffle ids to the number of mappers producing output for those shuffles.
*/
private[this] val numMapsForShuffle = new ConcurrentHashMap[Int, Int]()

override val shuffleBlockResolver = new IndexShuffleBlockResolver(conf)

/**
* Obtains a [[ShuffleHandle]] to pass to tasks.
*/
override def registerShuffle[K, V, C](
shuffleId: Int,
numMaps: Int,
dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {
// 小于 bypassMerge(spark.shuffle.sort.bypassMergeThreshold) 的阈值分区数,不需要 map 端的聚合,
// 然后直接写入MnumPartitions 文件,并在最后将它们连接起来。这避免了两次序列化和反序列化来合并溢出的文件,缺点是一次打开多个文件,从而分配给缓冲区更多的内存
new BypassMergeSortShuffleHandle[K, V](
shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
} else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
// 尝试以序列化的形式缓冲映射输出,因为这样更有效率
new SerializedShuffleHandle[K, V](
shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
} else {
// 缓冲区映射以反序列化形式输出
new BaseShuffleHandle(shuffleId, numMaps, dependency)
}
}

/**
* 获取一系列 reduce 分区的读取器(startPartition 到 endPartition-1,包括在内)。通过 reduce 任务调用 executor。
* 当前使用到的位置:
* - CoGroupedRDD
* - ShuffledRDD
* - SubtractedRDD
* - ShuffledRowRDD
*/
override def getReader[K, C](
handle: ShuffleHandle,
startPartition: Int,
endPartition: Int,
context: TaskContext): ShuffleReader[K, C] = {
// shuffle 过程的 reader, 当前 ShuffleReader 的唯一实现类
new BlockStoreShuffleReader(
handle.asInstanceOf[BaseShuffleHandle[K, _, C]], startPartition, endPartition, context)
}

// 获取给定分区的写入器。通过 map 任务调用 executor
override def getWriter[K, V](
handle: ShuffleHandle, // 根据传入的 ShuffleHandle 获取到
mapId: Int,
context: TaskContext): ShuffleWriter[K, V] = {
numMapsForShuffle.putIfAbsent(
handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps)
val env = SparkEnv.get
handle match {
// 序列化的 Shuffle 处理,使用 UnsafeShuffleWriter
case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>
new UnsafeShuffleWriter(
env.blockManager,
shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
context.taskMemoryManager(),
unsafeShuffleHandle,
mapId,
context,
env.conf)
// 跳过 mergeSort 的 shuffle 处理
case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>
new BypassMergeSortShuffleWriter(
env.blockManager,
shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
bypassMergeSortHandle,
mapId,
context,
env.conf)
// 通用的 ShuffleWriter 处理
case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)
}
}

...
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
private[spark] object SortShuffleManager extends Logging {

/**
* 在以序列化形式缓冲映射输出时 SortShuffleManager 支持的最大 shuffle 输出分区数。
* 这是一种极端的防御性编程措施,因为一次 shuffle 极不可能产生超过 1600 万个输出分区。
* */
val MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE =
PackedRecordPointer.MAXIMUM_PARTITION_ID + 1

/**
* 用于确定 shuffle 是否可使用优化的序列化 shuffle path 是否应该退回到对反序列化对象进行操作的 original path
*/
def canUseSerializedShuffle(dependency: ShuffleDependency[_, _, _]): Boolean = {
val shufId = dependency.shuffleId
val numPartitions = dependency.partitioner.numPartitions
// 需要依赖对应的序列化器支持 relocation
if (!dependency.serializer.supportsRelocationOfSerializedObjects) {
log.debug(s"Can't use serialized shuffle for shuffle $shufId because the serializer, " +
s"${dependency.serializer.getClass.getName}, does not support object relocation")
false
} else if (dependency.mapSideCombine) {
// 不可为 MapSideCombine
log.debug(s"Can't use serialized shuffle for shuffle $shufId because we need to do " +
s"map-side aggregation")
false
} else if (numPartitions > MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE) { // 2^24
// 分区数小于 2^24
log.debug(s"Can't use serialized shuffle for shuffle $shufId because it has more than " +
s"$MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE partitions")
false
} else {
log.debug(s"Can use serialized shuffle for shuffle $shufId")
true
}
}
}

与 Hadoop Shuffle 的比较

二者从功能上看是相似的;从 High Level来看,没有本质区别。

  • 何时进行 fetch Map 端的数据: Hadoop 中有一个 Map 完成,Reduce 便可以去 fetch 数据了,MR Shuffle 不必等到所有 Map 任务完成;而 Spark shuffle 必须等到父 stage 完成,也就是父 stage 的 map 操作全部完成才能去 fetch 数据。这是因为 spark 必须等到父 stage 执行完,才能执行子 stage,主要是为了迎合 stage 规则
  • 何时执行 Reduce 端的聚合: Hadoop 的 Reduce 要等到 fetch 完全部数据,才将数据传入 reduce 函数进行聚合,Spark 是一边 fetch 一边聚合。
  • 分区有序的要求: Hadoop 的 Shuffle 是 sort-base 的,那么不管是 Map 的输出,还是 Reduce 的输出,都是 partition 内有序的,而 spark 不要求 partition 内有限

Shuffle Writer

BypassMergeSortShuffleWriter

与 Hash Shuffle 的实现基本相同,区别在于 map task 输出汇总一个文件,同时还会产生一个 index file

特点:

  • 类似于Hash Shuffle,多了文件的合并
  • 对于大量 reduce 分区的 shuffle,是低效的,因为同时为所有分区打开单独的序列化器和文件流

Writer 流程:

  • 每个 Map Task 为每个下游 reduce task 创建一个临时磁盘文件,并将数据按 key 进行 hash 然后根据 hash 值写入内存缓冲,缓冲写满之后再溢写到磁盘文件;
  • 最后将所有临时磁盘文件都合并成一个磁盘文件,并创建索引文件;
  • 在 shuffle 过程中会创建很多磁盘文件,最后多了一个磁盘文件合并的过程。Shuffle Read 的性能会更好;

Bypass 方式与普通的 Sort Shuffle 方式的不同点:

  • 磁盘写机制不同
  • 根据 key 求 hash,减少了数据排序操作,提高了性能

选择条件:

  • 分区数 <=200(spark.shuffle.sort.bypassMergeThreshold)
  • 非聚合操作
  • 没有指定排序
  • 不是 mapSideCombine

UnsafeShuffleWriter

序列化的排序方式: 在序列化排序模式中,传入的记录一旦传递给 shuffle 写入器就会被序列化,并在排序期间以序列化的形式进行缓冲。

序列化排序方式的优化:

  • 它的排序操作是序列化的二进制数据,而不是 Java 对象,这减少了内存消耗和 GC 开销。此优化要求记录序列化器具有某些属性,以允许在不反序列化的情况下重新排序序列化的记录。
  • 它使用专用的高效缓存排序器([[ShuffleExternalSorter]])对压缩记录指针和分区id数组进行排序。通过在排序数组中每条记录只使用 8 个字节的空间,这可以将更多的数组放入缓存中
  • spill 合并过程对属于同一分区的序列化记录块进行操作,合并过程中不需要反序列化记录。
  • 当 spill 压缩编解码器支持连接压缩数据时,spill merge 只是将序列化的 spill 分区和压缩的 spill 分区连接起来,以产生最终的输出分区。这允许使用高效的数据复制方法,如 NIO 的 transferTo,并避免了在合并期间分配解压缩缓冲区或复制缓冲区的需要。

选择条件:

  • shuffle 序列化器支持 object relocation, 目前 KryoSerializer 或 SparkSQL 自定义的一些序列化方式支持
  • 分区数 < 2^24(16777216)
  • shuffle 依赖项不指定聚合或输出顺序,即 mapSideCombine 为 false。
  • 设置堆外内存大小

SortShuffleWriter

对于没有选择 BypassMergeSortShuffleWriter、UnsafeShuffleWriter 的,默认选择 SortShuffleWriter。

执行流程:

  • 数据先写入内存数据结构。聚合类操作写入 Map,非聚合类算子写入 Array
  • 检查是否达到内存阈值。非实时检查,不定时采样,不准确
  • 数据排序
  • 数据写入缓冲区(32k)
  • 重复写多个临时文件
  • 最后临时文件合并为数据文件
  • 写索引文件
  • 将文件位置、计算状态等封装到 MapStatus 中,汇报给 Driver

MapOutputTracker

Shuffle 过程中的中间数据的元信息,由 MapOutputTracker 负责管理。

Shuffle Writer 会将中间数据保存到 Block 里面,然后将数据的位置发送给 MapOutputTracker。Shuffle Reader 通过向 MapOutputTracker 获取中间数据的位置之后,才能读取到数据。

  • MapOutputTracker: 跟踪 map 阶段输出的位置,在 executor 和 driver 端都存在。
    • trackerEndpoint: 一个 RpcEndpointRef
  • MapOutputTrackerMaster: 存在于 driver 端,DAGScheduler 使用该类注册 map 输出状态和查找统计信息执行位置感知减少任务调度。
    • 负责管理所有 shuffleMapTask 的输出数据,每个 shuffleMapTask 执行完后会把执行结果(MapStatus)注册到 MapOutputTrackerMaster
    • MapOutputTrackerMaster 会处理 executor 发送的 GetMapOutputStatuses 请求,并返回 serializedMapStatus 给 executor 端
  • MapOutputTrackerMasterEndpoint:存在于 driver 端
  • MapOutputTrackerWorker: 存在于 executor 端
    • 负责为 reduce 任务提供 shuffleMapTask 的输出数据信息(MapStatus)
    • 如果 MapOutputTrackerWorker 在本地没有找到请求的 shuffle 的 mapStatus,则会向 MapOutputTrackerMasterEndpoint 发送 GetMapOutputStatuses 请求获取对应的 mapStatus
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private[spark] class MapOutputTrackerMasterEndpoint(
override val rpcEnv: RpcEnv, tracker: MapOutputTrackerMaster, conf: SparkConf)
extends RpcEndpoint with Logging {
// 处理 GetMapOutputStatuses、StopMapOutputTracker 的偏函数
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case GetMapOutputStatuses(shuffleId: Int) =>
val hostPort = context.senderAddress.hostPort
logInfo("Asked to send map output locations for shuffle " + shuffleId + " to " + hostPort)
// MapOutputTrackerMaster 发送 GetMapOutputMessage
val mapOutputStatuses = tracker.post(new GetMapOutputMessage(shuffleId, context))

case StopMapOutputTracker =>
logInfo("MapOutputTrackerMasterEndpoint stopped!")
context.reply(true)
stop()
}
...
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
private[spark] class MapOutputTrackerMaster(
conf: SparkConf,
broadcastManager: BroadcastManager,
isLocal: Boolean)
extends MapOutputTracker(conf) {
private val minSizeForBroadcast =
conf.getSizeAsBytes("spark.shuffle.mapOutput.minSizeForBroadcast", "512k").toInt
// spark.rpc.message.maxSize 默认为 128M
private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)
// requests for map output statuses
private val mapOutputRequests = new LinkedBlockingQueue[GetMapOutputMessage]
// 用于处理映射输出状态请求的线程池。
// 这是一个单独的线程池,以确保我们不会阻塞普通的调度程序线程。
private val threadpool: ThreadPoolExecutor = {
val numThreads = conf.getInt("spark.shuffle.mapOutput.dispatcher.numThreads", 8)
val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "map-output-dispatcher")
for (i <- 0 until numThreads) {
pool.execute(new MessageLoop)
}
pool
}
// 发送消息
def post(message: GetMapOutputMessage): Unit = {
// 放到 LinkedBlockingQueue 阻塞队列中
mapOutputRequests.offer(message)
}
// 用于发送消息的 message loop, threadpool 中执行的任务
private class MessageLoop extends Runnable {
override def run(): Unit = {
try {
while (true) {
try {
// 消费阻塞队列中的 GetMapOutputMessage 消息
val data = mapOutputRequests.take()
if (data == PoisonPill) {
// Put PoisonPill back so that other MessageLoops can see it.
mapOutputRequests.offer(PoisonPill)
return
}
val context = data.context
val shuffleId = data.shuffleId
val hostPort = context.senderAddress.hostPort
logDebug("Handling request to send map output locations for shuffle " + shuffleId +
" to " + hostPort)
val shuffleStatus = shuffleStatuses.get(shuffleId).head
context.reply(
shuffleStatus.serializedMapStatus(broadcastManager, isLocal, minSizeForBroadcast))
} catch {
case NonFatal(e) => logError(e.getMessage, e)
}
}
} catch {
case ie: InterruptedException => // exit
}
}
}
...
}

Shuffle Reader

  • Map Task 执行完毕后会将文件位置、计算状态等信息封装到 MapStatus 中,通过 MapOutPutTrackerWorker 对象将其发送给 Driver 进程的 MapOutPutTrackerMaster
  • Reduce Task 开始执行之前会先让本进程中的 MapOutputTrackerWorker 向 Driver 进程中的 MapOutputTrackerMaster 发动请求,获取磁盘文件位置等信息
  • 当所有的 Map Task 执行完毕后,Driver 进程中的 MapOutputTrackerMaster 获得了所有的 Shuffle 文件的信息。此时 MapOutPutTrackerMaster 会告诉 MapOutPutTrackerWorker 磁盘小文件的位置信息
  • 完成之前的操作之后,由 BlockTransforService 去 Executor 所在的节点拉数据,默认会启动五个子线程。每次拉取的数据量不能超过 48M
  • spark.reducer.maxSizeInFlight: shuffle reader 时候,一次 fetch 不能过多,不能超过的数据量,默认为 48M, 空间是由这 5 个 fetch 线程共享的
  • BlockStoreShuffleReader: 当前 ShuffleReader 的唯一实现类
    • read(): 读取 reduce 任务的组合键值
  • ShuffleBlockFetcherIterator: 获取多个块的迭代器。对于本地块,它从本地块管理器获取。对于远程块,它使用提供的 BlockTransferService 获取。
    • 创建 (BlockID, InputStream) 元组,对远程获取进行了限制,不会超过 maxBytesInFlight,避免使用太多的内存

Shuffle 的优化

开发过程中的优化:

  • 减少 Shuffle 过程中的数据量: 使用高性能算子,如使用 filter + coalesce 过滤不需要的值并减少分区数,使用 reduceByKey 替代 groupByKey 有 mapSideCombine 减少聚合的数据量,
  • 避免 Shuffle: 更改代码避免 shuffle,使用 map 端的 join 避免, 如使用 colasce 替代 repartititon, repartitionAndSortxxx 将会产生两个 shuffle 的合并为一个

参数优化:

  • 调节 map 端缓冲区大小
    • spark.shuffle.file.buffer 默认值为32K,shuffle write 阶段 buffer 缓冲大小。将数据写到磁盘文件之前,会先写入 buffer 缓冲区,缓冲写满后才溢写到磁盘
  • 调节 reduce 端拉取数据缓冲区大小
    • spark.reducer.maxSizeInFlight 默认值为48M。设置shuffle read阶段buffer缓冲区大小,这个buffer缓冲决定了每次能够拉取多少数据
    • 在内存资源充足的情况下,可适当增加参数的大小(如96m),减少拉取数据的次数及网络传输次数,进而提升性能
    • 合理设置参数,性能会有 1%~5% 的提升
  • 调节 reduce 端拉取数据重试次数及等待间隔,Shuffle read 阶段拉取数据时,如果因为网络异常导致拉取失败,会自动进行重试。
    • Shuffle read阶段拉取数据时,如果因为网络异常导致拉取失败,会自动进行重试
    • spark.shuffle.io.maxRetries,默认值3。最大重试次数
    • spark.shuffle.io.retryWait,默认值5s。每次重试拉取数据的等待间隔
    • 一般调高最大重试次数,不调整时间间隔
  • 调节 Sort Shuffle 排序操作阈值
    • 如果shuffle reduce task的数量小于阈值,则shuffle write过程中不会进行排序操作,而是直接按未经优化的Hash Shuffle方式写数据,最后将每个task产生的
    • 所有临时磁盘文件都合并成一个文件,并创建单独的索引文
    • spark.shuffle.sort.bypassMergeThreshold,默认值为200 当使用SortShuffleManager时,如果的确不需要排序操作,建议将这个参数调大
  • 调节 Shuffle 内存大小,分配多一些的比例给执行内存用于 Shuffle

配置参数

Shuffle 中的重要参数:

  • spark.local.dir: Shuffle 缓存目录

  • spark.shuffle.file.buffer: shuffle write 阶段 buffer 缓冲大小,将数据写到磁盘文件之前,会先写入 buffer 缓冲中,待缓冲写满之后,才会溢写到磁盘。默认值为 32K。

  • spark.reducer.maxSizeInFlight: shuffle read 阶段 buffer 缓冲区大小。默认值为 48M。

  • spark.shuffle.io.maxRetries: Shuffle read 阶段拉取数据失败时的最大重试次数。默认值 3。

  • spark.shuffle.io.retryWait: Shuffle read 阶段拉取数据失败重试时的等待时间。默认值 5s。

  • spark.shuffle.sort.bypassMergeThreshold: 使用 bypassMergeSortShuffleWriter 机制,RDD 分区数的限制阈值。默认值为 200。

  • spark.memory.fraction & spark.memory.storageFraction: 调整 Shuffle 相关内存所占的比例

  • spark.memory.fraction: 缺省值 0.6。存储内存和执行内存占(heap 内存 - 300M)的百分比

  • spark.memory.storageFraction: 缺省值 0.5 存储内存与 (存储内存与执行内存之和)的百分比

  • spark.shuffle.manager: 通过反射方式生成的 SortShuffleManager 的实例。默认为 SortShuffleManager。

  • Spark 1.5 以后,有三个可选项:hash、sort 和 tungsten-sort。

  • spark.shuffle.consolidateFiles

  • spark.shuffle.mapOutput.minSizeForBroadcast:默认值 512K

  • spark.shuffle.mapOutput.dispatcher.numThreads: 默认值为 8,map 端输出派发线程池中的线程数