内存管理

静态内存管理

Spark 2.0 以前版本采用静态内存管理机制。存储内存、执行内存和其他内存的大小在 Spark 应用程序运行期间均为固定的。 容易造成存储内存和执行内存中的一方剩余大量的空间,而另一方却早早被占满,不得不淘汰或移出旧的内容以存储新的内容。目前已经被淘汰。

统一内存管理

Spark 2.0 之后引入统一内存管理机制,存储内存和执行内存共享同一块空间,可以动态占用对方的空闲区域,统一内存管理的堆内内存结构。

可以借用内存

最重要的优化在于动态占用机制

在执行过程中:执行内存的优先级 > 存储内存的优先级

凭借统一内存管理机制,Spark 在一定程度上提高了堆内和堆外内存资源的利用率, 降低了开发者维护 Spark 内存的难度

存储内存管理

RDD 持久化机制

Task 在启动之初读取一个分区时:

  • 先判断这个分区是否已经被持久化
  • 如果没有则需要检查 Checkpoint 或按照血统重新计算。

RDD 的持久化由 Spark 的 Storage(BlockManager) 模块负责,实现了 RDD 与物理存储的解耦合。

Storage 模块在逻辑上以 Block 为基本存储单位,RDD 的每个 Partition 经过处理后 唯一对应一个Block。

RDD 缓存过程 RDD 缓存的源头:Other (Iterator / 内存空间不连续)

Record 的对象实例在逻辑上占用了 JVM 堆内内存的 other 部分的空间,同一 Partition 的不同 Record 的存储空间并不连续。

RDD 在缓存到存储内存之后,Partition 被转换成 Block,Record 在堆内或堆外存储内存中占用一块连续的空间 将 Partition 由不连续的存储空间转换为连续存储空间的过程,Spark 称之为展开(Unroll)。

淘汰与落盘 存储内存的淘汰规则为:

  • 被淘汰的旧 Block 要与新 Block 的 MemoryMode 相同,即同属于堆外或堆内内存
  • 新旧 Block 不能属于同一个 RDD,避免循环淘汰
  • 旧 Block 所属 RDD 不能处于被读状态,避免引发一致性问题
  • 遍历 LinkedHashMap 中 Block,按照最近最少使用(LRU)的顺序淘汰,直到满足新 Block 所需的空间。
1
2
3
4
5
6
7
8
9
10
private[spark] class MemoryStore(
conf: SparkConf,
blockInfoManager: BlockInfoManager,
serializerManager: SerializerManager,
memoryManager: MemoryManager,
blockEvictionHandler: BlockEvictionHandler)
extends Logging {
private val entries = new LinkedHashMap[BlockId, MemoryEntry[_]](32, 0.75f, true)
...
}

执行内存管理

主要用来存储任务在执行 Shuffle 时占用的内存

Spark 会使用一种叫 AppendOnlyMap 的哈希表在堆内执行内存中存储数据

Spark 的存储内存和执行内存使用不同的管理方式:

  • 对存储内存来说,Spark 用一个 LinkedHashMap 来集中管理所有的 Block, Block 由需要缓存的 RDD 的 Partition 转化而成;
  • 对执行内存来说,Spark 用 AppendOnlyMap 来存储 Shuffle 过程中的数据, 在 Tungsten 排序中甚至抽象成为页式内存管理,开辟了全新的 JVM 内存管理机制。

数据持久化

数据持久化/缓存的目的:

Spark 默认数据存放在内存中,适合高速迭代,多个步骤只有第一个输入数据,中间不产生临时数据,风险很高,容易出错,需要进行容错。 RDD 出错/分片可以根据血统重新计算出来,如果没有父 RDD 进行 persist 或者 cache,就需要重新做,耗时较大。将数据缓存起来,方便进行迭代计算。

RDD 持久化

RDD 中的数据持久化:

对于 Spark 中 RDD 不支持的持久化方式,可使用 foreachPartition 进行自定义。

1
2
3
4
5
saveAsObjectFile()
saveAsTextFile()
saveAsHadoopFile()
saveAsHadoopDataset()
saveAsSequenceFile()

cache 和 persist 比较

都是用于将一个 RDD 缓存,之后使用的时候就不用重新计算了,节省程序运行时间

  • cache 只有一个默认的缓存级别 MEMORY_ONLY, cache 调用了persist,而 persist 可以根据情况设置其它的缓存级别;
  • executor 执行的时候,默认 60% 做 cache,40% 做 task 操作,persist 是最根本的函数,最底层的函数。

persist 适用场景

  • 特定步骤耗时高
  • 计算的链条长,重新计算步骤多
  • checkpoint 所在的 rdd 要持久化 persist
  • shuffle 之后进行 percist, shuffle 需要进行网络传输,风险大
  • shuffle 之前进行 precast,框架默认将数据持久化到磁盘,自动执行的

cache 和 checkpoint 比较

cache 和 checkpoint 是有显著区别的,缓存把 RDD 计算出来然后放在内存中,但是 RDD 的依赖链还在, 当某个点某个 executor 挂掉了,上面 cache 的 RDD 就会丢掉,需要通过依赖链重放计算。checkpoint 是把 RDD 保存在 HDFS 中,是多副本可靠存储,依赖链可以丢掉,斩断了依赖链,在 executor 发生故障的时候,从 HDFS 中取出,无需重放计算。

Partition 与 Block 的关联关系

  • HDFS 中的 Block 是分布式存储的最小单位,等分,可设置冗余,可能有一部分磁盘空间的浪费
  • Spark 中的 Partition 是 RDD 的最小单元,RDD 是由分布在各个节点上的 Partition 组成。Partition 指的是 Spark 计算过程中,生成的数据在计算空间内的最小单元,同一份数据(RDD) 的 partition 大小不一、数量不定,根据 Application 里的算子和最初读入的数据分块决定
  • Block 位于存储空间、Partition 位于计算空间,Block 的大小是固定的,Partition 的大小是不固定的。

BlockManager

管理数据块。是一个嵌入在 Spark 中的 key-value 型分布式存储系统,也是 Master-Slave 结构的,RDD-cache、 shuffle-output、broadcast 等的实现都是基于 BlockManager 来实现的。

http://img.janhen.com/20210414144422ZnWph0.jpg

BlockManager 中的组件:

  • DiskStore:负责对磁盘数据进行读写
  • MemoryStore:负责对内存数据进行读写
  • BlockTransferService:负责建立到远程其他节点 BlockManager 的连接,对远程其他节点的 BlockManager 的数据进行读写

存储和管理机制:

每个节点上存储的 block 信息都会汇报给 Driver 端的 BlockManager Master 作统一管理,BlockManager 对外提供 get 和 set 数据接口,可将数据存储在 Memory、Disk、Off-heap。

BlockManger 在 Spark 中的使用

  • shuffle 的过程中使用 BlockManager 作为数据的中转站
  • 将广播变量发送到 Executor 时, broadcast 底层使用的数据存储层
  • spark streaming 一个 ReceiverInputDStream 接收到的数据,先放在 BlockManager 中, 然后封装为一个 BlockRdd 进行下一步运算
  • 如果对一个 RDD 进行了 cache,CacheManager 也是把数据放在了 BlockManager 中, 后续 Task 运行的时候可以直接从 CacheManager 中获取到缓存的数据,不用再从头计算

相关配置

  • spark.storage.memoryFraction
    • 用于设置 RDD 持久化数据在 Executor 内存中能占的比例,默认是 0.6, 默认 Executor 60% 的内存,可以用来保存持久化的 RDD 数据。根据的持久化策略,如果内存不够时,可能数据就不会持久化,或者数据会写入磁盘;
    • 如果持久化操作比较多,可以提高 spark.storage.memoryFraction 参数,使得更多的持久化数据保存在内存中,提高数据的读取性能,如果 shuffle 的操作比较多,有很多的数据读写操作到 JVM 中,那么应该调小一点,节约出更多的内存给 JVM,避免过多的 JVM gc 发生。
  • spark.shuffle.memoryFraction
    • 为 Spark 调优中的重要参数,shuffle 从上一个 task 拉去数据过来,要在Executor进行聚合操作, 聚合操作时使用Executor内存的比例由该参数决定,默认是20%如果聚合时数据超过了该大小,那么就会spill到磁盘,极大降低性能;
    • 如果Spark作业中的RDD持久化操作较少,shuffle操作较多时,建议降低持久化操作的内存占比,提高shuffle操作的内存占比比例, 避免shuffle过程中数据过多时内存不够用,必须溢写到磁盘上,降低了性能。此外,如果发现作业由于频繁的gc导致运行缓慢,意味着task执行用户代码的内存不够用, 那么同样建议调低这个参数的值。

Ref

BlockManager - The Internals of Apache Spark