Flink 幕后之内存管理
引言
目前很多大数据处理框架,例如Hadoop、Spark、Storm、Flink等。它们都基于JVM语言开发(java or scala),运行在JVM上。为了加速合并或者排序(基于磁盘的方式通常要慢一些),需要将数据加载到内存中,由于数据量巨大,对内存是不小的压力。
数据存储最简单的做法是将封装成对象直接存储到如List或者Map这样的数据结构中。这种做法会引发两个问题。一个问题是数据规模大,需要创建的对象非常多(数据加上存储的数据结构,它们将耗费大量的内存),可能引发OOM。另一个问题是GC,源源不断的数据需要被处理,对象持续产生并需要被销毁,对GC来说是不小的压力。
鉴于JVM自带的GC无法满足高效稳定的流处理,Flink建立了一套自己的内存管理体系。
类图
MemorySegment
MemorySegment是Flink管理内存的最小单位,是内存的抽象。MemorySegment有两种实现,堆(HeapMemorySegment、基于JVM内存),非堆(HybridMemorySegment、混合,可使用JVM内存或者直接内存)。
用户可以通过配置决定使用哪一种,参数“taskmanager.memory.off-heap”默认false,使用堆;设置为true,使用非堆。
MemorySegment的实现做了一些优化。
- MemorySegment使用了较为底层的“sun.misc.Unsafe“。这里有一个典型的操作非堆的方式:将内存地址存储到address字段中,后续的通过index、offset的定位来读写数据。这种类C的写法,可以减少不必要的拷贝操作。
- 在Flink运行过程中,仅有一种MemorySegment实现类被加载(根据上述的配置),这样有利于JIT编译器的预热,通过使方法内连(method-inline)的方式降低调用的开销。
MemoryPool创建流程
MemoryPool
MemoryPool负责管理MemorySegment,例如创建、销毁、获取、重用等。
MemoryPool在初始化的时候,创建了空的ArrayDeque,之后申请内存加入到ArrayDeque中。内存申请有两种:new byte[segmentSize] -- 堆内存、ByteBuffer.allocateDirect(segmentSize) -- 直接内存。(SegmentSize由配置"taskmanager.memory.segment-size"指定,默认32kb) 。这两种不同类型的内存,在使用之前会由MemorySegmentFactory.wrapXXXHeapMemory包装一次,统一抽象成MemorySegment。
MemoryPool的生命周期很长,从TaskManager创建直至销毁。所以在任务执行期间它占用的内存(Segment)不会释放,而是通过回收来重复使用。MemoryPool通过减少对象的创建和回收,大大降低了GC压力。
需要注意的是:Flink不是将所有的对象都写入到MemoryPool管理的内存中。默认的,Flink分配70%的内存给MemoryManager。执行过程中还需要一些内存,例如用户实现的自定义函数,在函数中创建的对象存储在堆中,由JVM的GC机制管理。
序列化
Flink在内存管理之外,还有一套自己的序列化体系。在执行的过程中,数据对象通过序列化转换成字节,或者字节反序列化成对象。
以简单的ETL任务距离,抽取 --> 过滤 --> 存储,对象类型是提前预知的,调用对象的序列化即可。若用户需要加上自己的处理,抽取 --> 过滤 --> 转换 --> 存储。在转换的过程中,会引入Flink"未知"的对象类型。为了解决这种场景,Flink提供了一种基于反射的类型提取。用户需要提供TypeInformation来告知Flink类型信息,Flink根据类型自动选择合适的序列化方式。
创建TypeInformation:
TypeInformation<List<CommonLogBean>> typeInformation =
TypeInformation.of(new TypeHint<List<CommonLogBean>>() {});
总结
Flink构建了一套特有的内存管理体系,降低了OOM的风险以及GC的负载,另外提供了智能高效的序列化方式。它们功能构成了高效的流处理基础。
重要参考
[flink-memory-manager]https://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html