Spark内存管理模型

1. 简介

作为基于内存的分布式计算引擎,Spark的内存管理模块在整个系统中扮演着非常重要的角色。了解Spark内存管理的基础知识可帮助您开发Spark应用程序并执行性能调整。

通常,Spark应用程序包括两个JVM进程DriverExecutor
- Driver 驱动程序是主要的控制过程,负责创建上下文context,提交作业 sumbit jobs,将作业转换为任务以及协调执行程序之间的任务执行。Driver的内存管理相对简单,并且一般的JVM程序之间的差异不大。

--Spark
    |__Driver
    |__Executor

2. 堆内存和堆外内存

执行程序充当JVM进程,其内存管理基于JVM。因此,JVM内存管理包括两种方法:

  1. 堆上内存管理
  2. 对象在JVM堆上分配,并由GC绑定。
  3. 堆外内存管理
  4. 对象通过序列化在JVM之外的内存中分配,由应用程序管理,并且不受GC绑定。这种内存管理方法可以避免频繁的GC,但是缺点是您必须编写内存分配和内存释放的逻辑。

通常,对象的读写速度为:
$$ 堆上>堆外>磁盘 $$

3. 序列化

序列化在任何分布式应用程序的性能中都起着重要作用[^2]。将对象序列化为慢速格式或占用大量字节的格式将大大减慢计算速度。通常,这是您应该优化Spark应用程序的第一件事。Spark旨在在便利性(允许您在操作中使用任何Java类型)和性能之间取得平衡。它提供了两个序列化库:

序列化会生效的几个地方:
1. 算子函数中使用到的外部变量(影响网络传输的性能,集群中内存的占用和消耗)
2. 持久化RDD时
3. shuffle(网络传输的性能)

3.1. Java 序列化机制

默认情况下,Spark内部是使用Java的序列化机制,ObjectOutputStream/ObjectInputStream对象输入输出流机制,来进行序列化。
- 优点:处理起来比较方便,只是在算子里面使用的变量,必须是实现Serializable接口的。
- 缺点:默认的序列化机制的效率不高,序列化的速度比较慢;序列化以后的数据,占用的内存空间相对还是比较大。

3.2. 使用Kryo序列化

Kryo序列化机制,
- 优点:
- 1. 比默认的Java序列化机制,速度要快,
- 2. 序列化后的数据要更小,大概是Java序列化机制的1/10。 Kryo序列化机制,一旦启用以后,会生效的几个地方:

使用Kryo序列化步骤:

SparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// 注册你使用到的,需要通过Kryo序列化的自定义类。
SparkConf.registerKryoClasses(new Class[]{CategorySoryKey.class})

4. 内存分配

在Spark中,支持两种内存管理模式:
1. 静态内存管理器
2. 统一内存管理器

Spark提供了一个统一的接口MemoryManager来管理存储内存和执行内存。同一执行器中的任务调用该接口以申请或释放内存。在实现MemoryManager时,默认情况下会在Spark 1.6之前使用StaticMemory管理,而默认方法已在Spark 1.6之后更改为UnifiedMemoryManager。在Spark 1.6+中,可以通过spark.memory.useLegacyMode参数启用静态内存管理。

4.1. 静态内存管理器

在静态内存管理器机制下,存储内存,执行内存和其他内存的大小在Spark应用程序运行期间是固定的,但是用户可以在应用程序启动之前对其进行配置。尽管已逐渐取消了这种分配方法,但出于兼容性原因,Spark仍然保留。

这里主要讨论静态内存管理器的弊端:静态内存管理器机制实现起来比较简单,但是如果用户不熟悉Spark的存储机制,或者不根据特定的数据大小进行相应的配置在执行计算任务时,很容易造成其中一个存储内存和执行内存剩余大量空间,而另一个则首先被填充,因此必须删除或删除旧内容以用于新内容。

在Spark 1.6+中,可以通过spark.memory.useLegacyMode参数启用静态内存管理。

4.2. 统一内存管理器

Spark 1.6之后引入了统一内存管理器机制。统一内存管理器和静态内存管理器之间的区别在于,在统一内存管理器机制下,存储内存和执行内存共享一个内存区域,并且两者都可以占据彼此的空闲区域。

4.2.1. 堆上模型

默认情况下,Spark仅使用堆上内存。 当Spark应用程序启动时,堆内存的大小由–executor-memory 或 spark.executor.memory参数配置 。在Executor中运行的并发任务共享JVM的堆上内存。

执行程序中的堆上内存区域可以大致分为以下四个块:

  1. 存储内存 :主要用于存储Spark缓存数据,如RDD缓存,Broadcast变量,Unroll数据等。
  2. 执行内存 :主要用于在Shuffle,Join,Sort,Aggregation等计算过程中存储临时数据。

  3. 用户内存 :它主要用于存储RDD转换操作所需的数据,例如RDD依赖项的信息。

  4. 保留的内存:内存是为系统保留的,用于存储Spark的内部对象。
    内存分配如下所示:

4.2.2. 堆外模型

Spark 1.6开始引入堆外内存(Off-heap)。默认情况下,堆外内存是禁用的,但是我们可以通过spark.memory.offHeap.enabled 参数启用它,并通过spark.memory.offHeap.size 参数设置内存大小 。与堆上内存相比,堆外内存的模型相对简单,仅包括存储内存和执行内存,其分布如下图所示:

如果启用了堆外内存,则执行器中将同时有堆上内存堆外内存。此时,执行器中的执行内存是堆内部的执行内存与堆外部的执行内存之和。存储内存也是如此。下图显示了Spark堆内部和外部的堆上和堆外内存。

4.2.3. 动态占用机制

提交程序后,将根据该spark.memory.storageFraction 参数设置存储存储区和执行存储区。
当程序运行时,如果双方的空间不足(存储空间不足以放置一个完整的块),它将根据LRU存储到磁盘;如果其中一个空间不足,而另一个空间可用,则它将借用另一个空间。

存储占用对方的内存,然后将占用的部分转移到硬盘上,然后“返还”借来的空间。
执行占用了对方的内存,在当前实现中无法“返回”借用的空间。因为由Shuffle进程生成的文件将在以后使用,并且以后不必再使用Cache中的数据,所以返回内存可能会导致严重的性能下降。

5. 优化内存使用

有三个方面的考虑在调整内存使用:
1. 变量对象的存储消耗(你可能希望你的整个数据集,以适应在内存中),
2. 访问这些对象的成本开销
3. 垃圾回收的开销

默认情况下,Java对象的访问速度很快,但是与其字段中的"原始"数据相比,它们很容易消耗2-5倍的空间。这是由于以下几个原因:

  1. 对象头。每个不同的Java对象都有一个"对象头",它大约16个字节,并包含诸如指向其类的指针之类的信息。对于其中数据很少的对象(例如一个Int字段),该对象可能大于数据。
  2. 字符串。Java String相对于原始字符串数据有大约40个字节的开销(因为它们将其存储在Chars 数组中并保留诸如长度之类的额外数据),并且由于UTF-16的内部用法,因此将每个字符存储为两个字节String编码。因此,一个10个字符的字符串可以轻松消耗60个字节。
  3. 链式结构数据结构。 常见的收集类(例如HashMapLinkedList)使用链接的数据结构,其中每个条目(例如Map.Entry)都有一个"包装"对象。该对象不仅具有标题,而且还具有指向列表中下一个对象的指针(通常每个指针8个字节)。
  4. 基本类型的集合通常将它们存储为"盒装"对象,例如java.lang.Integer

6. 内存管理概述

Spark中的内存使用情况大体上属于以下两类之一:执行和存储。执行内存是指用于洗牌,联接,排序和聚合中的计算的内存,而存储内存是指用于在集群中缓存和传播内部数据的内存。

在Spark中,执行和存储共享一个统一的区域(M)。当不使用执行内存时,存储可以获取所有可用内存,反之亦然。如果有必要,执行可能会驱逐存储,但只有在总存储内存使用率下降到某个阈值(R)以下时,才可以执行该操作。换句话说,R描述了一个子区域,在该子区域M中永远不会清除缓存的块。由于实现的复杂性,存储可能无法退出执行。

这种设计确保了几种理想的性能。首先,不使用缓存的应用程序可以将整个空间用于执行,从而避免了不必要的磁盘溢出。其次,确实使用缓存的应用程序可以保留最小的存储空间(R),以免其数据块被逐出。最后,这种方法可为各种工作负载提供合理的即用即用性能,而无需用户了解如何在内部划分内存。

尽管有两种相关的配置,但典型用户无需调整它们,因为默认值适用于大多数工作负载:

spark.memory.fraction应该设置的值,以便在JVM的旧版本或"长期使用的"一代中舒适地适应此堆空间量。有关详细信息,请参见下面有关高级GC调整的讨论。

6.1. 确定内存消耗

确定数据集所需的内存消耗量的最佳方法是创建一个RDD,将其放入缓存中,然后查看Web UI中的" Storage"页面。该页面将告诉您RDD占用了多少内存。

要估算特定对象的内存消耗,请使用SizeEstimatorestimate方法。这对于尝试使用不同的数据布局以减少内存使用量以及确定广播变量将在每个执行程序堆上占用的空间量很有用。

6.2. 调整数据结构

减少内存消耗的第一种方法是避免使用Java功能,这些功能会增加开销,例如基于指针的数据结构和包装对象。做这件事有很多种方法:

  1. 将数据结构设计为更喜欢对象数组和原始类型,而不是标准Java或Scala集合类(例如HashMap)。该fastutil 库提供方便的集合类基本类型是与Java标准库兼容。
  2. 尽可能避免使用带有许多小对象和指针的嵌套结构。
  3. 考虑使用数字ID或枚举对象代替键的字符串。
  4. 如果您的RAM少于32 GB,则设置JVM标志-XX:+UseCompressedOops以使指针为四个字节而不是八个字节。您可以在中添加这些选项 spark-env.sh

6.3. 序列化RDD存储[]

当您的对象仍然太大而无法进行优化存储时,减少内存使用的一种更简单的方法是使用RDD持久性API中的序列化StorageLevels (例如)以序列化形式存储它们。然后,Spark将每个RDD分区存储为一个大字节数组。由于必须动态地反序列化每个对象,因此以串行形式存储数据的唯一缺点是访问时间较慢。如果您想以序列化形式缓存数据,我们强烈建议使用Kryo,因为它导致的大小比Java序列化(当然也比原始Java对象)小。

6.4. 垃圾收集优化

(https://spark.apache.org/docs/latest/tuning.html#garbage-collection-tuning)

如果您在程序存储的RDD方面有较大的"搅动",则JVM垃圾回收可能会成为问题。(在只读取RDD一次然后对其执行许多操作的程序中,这通常不是问题。)当Java需要逐出旧对象为新对象腾出空间时,它将需要遍历所有Java对象并查找未使用的。这里要记住的要点是,垃圾回收的成本与Java对象的数量成正比,因此使用具有较少对象的数据结构(例如Ints而不是a 的数组LinkedList)可以大大降低此成本。一种更好的方法是如上所述以序列化形式持久化对象:现在将只有一个每个RDD分区的对象(字节数组)。在尝试其他技术之前,首先尝试使用GC解决问题的方法是使用序列化缓存

由于任务的工作内存(运行任务所需的空间量)与节点上缓存的RDD之间的干扰,GC也会成为问题。我们将讨论如何控制分配给RDD缓存的空间以减轻这种情况。

衡量GC的影响

GC调整的第一步是收集有关垃圾收集发生频率和花费GC时间的统计信息。这可以通过添加-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStampsJava选项来完成。(有关将Java选项传递给Spark作业的信息,请参阅配置指南。)下次运行Spark作业时,每次发生垃圾收集时,您都会在工作日志中看到打印的消息。请注意,这些日志将位于群集的工作节点上(stdout位于其工作目录中的文件中),而不是驱动程序上。

高级GC调整

为了进一步调整垃圾回收,我们首先需要了解有关JVM中内存管理的一些基本信息:

在Spark中进行GC调整的目标是确保在旧一代中仅存储长寿命的RDD,并确保新世代具有足够的大小以存储短寿命的对象。这将有助于避免完整的GC收集任务执行期间创建的临时对象。可能有用的一些步骤是:

我们的经验表明,GC调整的效果取决于您的应用程序和可用内存量。有更多的微调选项描述联机,但在较高的水平,管理GC如何充分频繁发生可以帮助减少开销。

可以通过spark.executor.extraJavaOptions在作业的配置中设置来指定执行程序的GC调整标志。

其他注意事项

并行度

除非您为每个操作设置足够高的并行度,否则群集将无法充分利用。Spark根据文件的大小自动设置要在每个文件上运行的"映射"任务的数量(尽管您可以通过可选的参数来控制它SparkContext.textFile,等等),并且对于分布式"减少"操作(例如groupByKey和)reduceByKey,它使用最大的父文件RDD的分区数。您可以将并行性级别作为第二个参数传递(请参阅spark.PairRDDFunctions文档),或将config属性设置spark.default.parallelism为更改默认值。通常,我们建议集群中每个CPU内核执行2-3个任务。

减少任务的内存使用

有时,您会收到OutOfMemoryError的原因不是因为您的RDD不能容纳在内存中,而是因为您的一项任务(例如中的reduce任务之一)的工作集groupByKey太大。斯巴克的整理操作(sortByKeygroupByKeyreduceByKeyjoin,等)建立每个任务中的哈希表来进行分组,而这往往是大的。此处最简单的解决方法是 提高并行度,以使每个任务的输入集更小。Spark可以高效地支持短至200 ms的任务,因为它可以在多个任务中重用一个执行器JVM,并且任务启动成本低,因此您可以安全地将并行级别提高到集群中核心的数量以上。

广播大变量

使用中 可用的广播功能SparkContext可以极大地减少每个序列化任务的大小,以及在群集上启动作业的成本。如果您的任务使用驱动程序中的任何大对象(例如,静态查找表),请考虑将其转换为广播变量。Spark在主服务器上打印每个任务的序列化大小,因此您可以查看它来确定任务是否太大;通常,大约20 KB以上的任务可能值得优化。

数据局部性

数据局部性可能会对Spark作业的性能产生重大影响。如果数据和对其进行操作的代码在一起,则计算速度往往会很快。但是,如果代码和数据是分开的,那么一个必须移到另一个。通常,从一个地方到另一个地方传送序列化代码要比块数据更快,因为代码大小比数据小得多。Spark围绕此数据本地性一般原则构建调度。

数据局部性是数据与处理它的代码之间的接近程度。根据数据的当前位置,可分为多个级别。从最远到最远的顺序:

Spark倾向于在最佳位置级别安排所有任务,但这并不总是可能的。在任何空闲执行器上没有未处理数据的情况下,Spark会切换到较低的本地级别。有两种选择:a)等待忙碌的CPU释放以在同一服务器上的数据上启动任务,或b)立即在需要将数据移动到更远的地方启动新任务。

Spark通常要做的是稍等一下,以期释放繁忙的CPU。一旦超时到期,它将开始将数据从很远的地方移到空闲的CPU中。每个级别之间的回退等待超时可以单独配置,也可以一起配置在一个参数中。有关详细信息,请参见配置页面spark.locality上的 参数。如果您的任务很长并且位置不佳,则应该增加这些设置,但是默认设置通常效果很好。

6.5. 摘要

这是一个简短的指南,指出了在调整Spark应用程序时应了解的主要问题-最重要的是数据序列化和内存调整。对于大多数程序,切换到Kryo序列化并以序列化形式保留数据将解决大多数常见的性能问题。随时在 Spark邮件列表中询问其他调优最佳实践。

7. 参考资料


如果你觉得这篇文章对你有帮助,不妨请我喝杯咖啡,鼓励我创造更多!