Spark 物理计划阶段,join Selection 类会根据 join hints 策略,Join 表的大小、join 是否是等值 join、参与 join 的 key 是否可排序等条件选择最终的 join 策略。

Spark 支持的 join 类型:

  • inner join: 内连接

  • left outer join: 左外连接

  • right outer join: 右外连接

  • full outer join: 全连接

    full outer join 仅采用 sort merge join 实现,左边和右表既要作为 streamIter,又要作为 buildIter

  • left semi join: 以左表为准,在右表中查找匹配的记录,如果查找成功,则仅返回左边的记录,否则返回 null

  • left anti join: 以左表为准,在右表中查找匹配的记录,如果查找成功,则返回 null,否则仅返回左边的记录

  • cross join:

五种 Join 策略

  • Broadcast hash join (BHJ): 将小表的数据广播到 Spark 所有的 Executor,作业中较常见
  • Shuffle hash join(SHJ): 是大表和小表进行 join 时选择的一种策略
  • Shuffle sort merge join (SMJ): join 表都很大的时候选择,作业中较常见
  • Shuffle-and-replicate nested loop join: 又称笛卡尔积 join,非等值连接的时候可能使用
  • Broadcast nested loop join (BNLJ):

JoinSelection

选择策略:

  • Join 的 Key 为等值 Join 来选择 BHJ、SHJ、SMJ 中的一个;
  • Join 的 Key 为不等值 Join 或者没有指定 Join 条件,则会选择 BNLJ 或 Shuffe-and-replicate nested loop join

SparkStrategies.JoinSelection 的 apply 对各种 join 策略的选择:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
object JoinSelection extends Strategy with PredicateHelper {
/**
* 匹配输出应该足够小以用于广播连接的计划,小表的大小小于配置 spark.sql.autoBroadcastJoinThreshold(10M)
*/
private def canBroadcast(plan: LogicalPlan): Boolean = {
plan.stats.sizeInBytes >= 0 && plan.stats.sizeInBytes <= conf.autoBroadcastJoinThreshold
}

/**
* 该 plan 的单个分区应该足够小,可以构建哈希表。
*/
private def canBuildLocalHashMap(plan: LogicalPlan): Boolean = {
plan.stats.sizeInBytes < conf.autoBroadcastJoinThreshold * conf.numShufflePartitions
}

/**
* 返回计划a是否比计划b小很多(3X)。
* 构建哈希映射的成本比排序高,我们应该只在一个比其他表小得多的表上构建哈希映射。因为我们没有行数的统计信息,所以这里使用字节大小作为估计。
*/
private def muchSmaller(a: LogicalPlan, b: LogicalPlan): Boolean = {
a.stats.sizeInBytes * 3 <= b.stats.sizeInBytes
}

def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {

// --- BroadcastHashJoin --------------------------------------------------------------------

// broadcast hints were specified
// 广播提示被指定
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
if canBroadcastByHints(joinType, left, right) =>
val buildSide = broadcastSideByHints(joinType, left, right)
Seq(joins.BroadcastHashJoinExec(
leftKeys, rightKeys, joinType, buildSide, condition, planLater(left), planLater(right)))

// broadcast hints were not specified, so need to infer it from size and configuration.
// 没有指定广播提示,所以需要从大小和配置推断它。
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
// 选择 BroadcastHashJoin 策略,需要为等值连接,广播的小小符合
if canBroadcastBySizes(joinType, left, right) =>
val buildSide = broadcastSideBySizes(joinType, left, right)
Seq(joins.BroadcastHashJoinExec(
leftKeys, rightKeys, joinType, buildSide, condition, planLater(left), planLater(right)))

// --- ShuffledHashJoin ---------------------------------------------------------------------

// 选择 ShuffledHashJoin
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
// 配置中不是优先 SortMergeJoin
// 且 Join 类型符合要求
// 且 可以构建本地的 HashMap
// 且足够小(返回计划a是否比计划b小很多(3X)) 或 leftKeys 不可排序
if !conf.preferSortMergeJoin && canBuildRight(joinType) && canBuildLocalHashMap(right)
&& muchSmaller(right, left) ||
!RowOrdering.isOrderable(leftKeys) =>
Seq(joins.ShuffledHashJoinExec(
leftKeys, rightKeys, joinType, BuildRight, condition, planLater(left), planLater(right)))

// 和上面相同逻辑,转换下 left,right 进行比较
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
if !conf.preferSortMergeJoin && canBuildLeft(joinType) && canBuildLocalHashMap(left)
&& muchSmaller(left, right) ||
!RowOrdering.isOrderable(leftKeys) =>
// 选择 ShuffledHashJoin 策略
Seq(joins.ShuffledHashJoinExec(
leftKeys, rightKeys, joinType, BuildLeft, condition, planLater(left), planLater(right)))

// --- SortMergeJoin ------------------------------------------------------------

// 等值连接,leftKeys 是可排序的情况下选择
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
if RowOrdering.isOrderable(leftKeys) =>
// 选择 SortMergeJoin 策略
joins.SortMergeJoinExec(
leftKeys, rightKeys, joinType, condition, planLater(left), planLater(right)) :: Nil

// --- Without joining keys ------------------------------------------------------------

// Pick BroadcastNestedLoopJoin if one side could be broadcast
case j @ logical.Join(left, right, joinType, condition)
if canBroadcastByHints(joinType, left, right) =>
val buildSide = broadcastSideByHints(joinType, left, right)
// 选择 BroadcastNestedLoopJoin(BNLJ)
joins.BroadcastNestedLoopJoinExec(
planLater(left), planLater(right), buildSide, joinType, condition) :: Nil

case j @ logical.Join(left, right, joinType, condition)
if canBroadcastBySizes(joinType, left, right) =>
val buildSide = broadcastSideBySizes(joinType, left, right)
joins.BroadcastNestedLoopJoinExec(
planLater(left), planLater(right), buildSide, joinType, condition) :: Nil

// Pick CartesianProduct for InnerJoin
case logical.Join(left, right, _: InnerLike, condition) =>
joins.CartesianProductExec(planLater(left), planLater(right), condition) :: Nil

case logical.Join(left, right, joinType, condition) =>
val buildSide = broadcastSide(
left.stats.hints.broadcast, right.stats.hints.broadcast, left, right)
// This join could be very slow or OOM
joins.BroadcastNestedLoopJoinExec(
planLater(left), planLater(right), buildSide, joinType, condition) :: Nil

// --- Cases where this strategy does not apply ---------------------------------------------

case _ => Nil
}
}
}

Join 策略

BroadcastHashJoin

将小表的数据广播到 Spark 所有的 Executor 端,只能用于等值连接。避免了 Shuffle 操作。一般而言,Broadcast Hash Join 会比其他 Join 策略执行的要快。

Join 步骤:

  • 利用 collect 算子将小表的数据从 Executor 端拉到 Driver 端
  • 在 Driver 端调用 sparkContext.broadcast 广播到所有 Executor 端
  • 在 Executor 端使用广播的数据与大表进行 Join 操作

使用条件:

  • 必须为等值连接,不要求 Join 的 keys 可排序
  • 小表大小小于 spark.sql.autoBroadcastJoinThreshold(default 10M)设定的值

ShuffleHashJoin

与 BHJ 都是在大表和小表进行 Join 的时候选择的一种策略。

Join 步骤:把大表和小表按照相同的分区算法和分区数进行分区(Join 的 keys 进行分区),保证了 hash 值一样的数据都分发到同一个分区中,然后在同一个 Executor 中两张表 hash 值一样的分区就可以在本地进行 hash Join 。在进行 Join 之前,还会对小表的分区构建 Hash Map。

使用条件:

  • 必须为等值连接
  • 不是优先 SortMergeJoin,即配置 spark.sql.join.preferSortMergeJoin 为 false
  • 小表的大小(plan.stats.sizeInBytes)必须小于 spark.sql.autoBroadcastJoinThreshold(10M) * spark.sql.shuffle.partitions(200)
  • 小表大小(stats.sizeInBytes)的三倍必须小于等于大表的大小(stats.sizeInBytes)

SortMergeJoin

对表的大小没有条件,不管分区多大,SortMergeJoin 都不用把一侧的数据全部加载到内存中,而是即用即丢;两个序列都有序。从头遍历,碰到 key 相同的就输出,如果不同,左边小就继续取左边,反之取右边,提高了大数据量下sql join 的稳定性。

Join 步骤:

  • 将两张表按照 join key 进行shuffle,保证join key值相同的记录会被分在相应的分区
  • 对每个分区内的数据进行排序
  • 排序后再对相应的分区内的记录进行连接

使用条件:

  • 等值连接
  • 参与 join 的 key 可排序

BroadcastNestedLoopJoin

1
2
3
for record_1 in relation_1:
for record_2 in relation_2:
# join condition is executed

在某些情况会对某张表重复扫描多次,效率非常低下,会根据相关条件对小表进行广播,以减少表的扫描次数。

支持等值和不等值 Join,支持所有的 Join 类型。

CartesianProductJoin

join 表未指定连接条件时使用

相关配置

影响 join 策略选择的配置

  • spark.sql.autoBroadcastJoinThreshold: 使用 broadcast hash join 的阈值,默认为 10M,为 -1 表示关闭这种连接方式
  • spark.sql.join.preferSortMergeJoin: 是否尝试使用 Shuffle Hash Join,默认为 true
  • spark.sql.shuffle.partitions: 默认 200