文章目录
一. MemorySegment架构概览
在flink1.16.1中MemorySegment已作为单独的一个类用于处理:堆内内存、堆外直接内存或堆外不安全内存。
MemorySegment架构描述
JIT编译
之前的版本将HybridMemorySegment和HeapMemorySegment两个子类加载到JVM,此时JIT编译器只有在真正执行方法的时候才会确认是哪一个子类的方法,这样就无法提前判断使用哪一个实现的虚方法,也就无法直接调用,就会影响JVM的性能。
二. MemorySegment详解
1.基于MemorySegment管理堆内存
我们已经知道,MemorySegment只能通过MemorySegmentFactory创建,并且在MemorySegmentFactory中直接提供了基于堆内存创建MemorySegment的方法。
如下代码所示,在MemorySegmentFactory.wrap()方法中可以直接将byte[] buffer数组封装成MemorySegment,其中byte[]数组中的内存空间实际上就是从堆内存中申请的。
/**
*创建一个以给定堆内存区域为新的memory segments。
此方法应用于将短期字节数组转换为memory segments.
*/
public static MemorySegment wrap(byte[] buffer) {
return new MemorySegment(buffer, null);
}
除了将已有的byte[]数组空间转换成MemorySegment之外,在MemorySegmentFactory中同时提供了通过分配堆内存空间创建MemorySegment的方法。如代码所示,在MemorySegmentFactory.allocateUnpooledSegment()方法中通过指定参数size申请固定数量的byte[]数组,这里new byte[size]的操作实际上就是从堆内存申请内存空间。
//分配一些非池化内存并创建一个代表该内存的新内存段。
//此方法类似于allocateUnpooledSegment(int, Object) ,但内存段的所有者为 null。
public static MemorySegment allocateUnpooledSegment(int size) {
return allocateUnpooledSegment(size, null);
}
在MemorySegment构造器中,提供了对byte[] buffer堆内存进行初始化的逻辑,在方法中首先将buffer赋值给heapMemory,然后将address设定为BYTE_ARRAY_BASE_OFFSET,表示byte[]数组内容的起始部分,然后根据数组对象和偏移量获取元素值(getObject)。
设定offHeapBuffer和cleaner为空。offHeapBuffer和cleaner主要在OffHeap中使用,owner参数表示当前的所有者,通常情况下设定为空。
//创建一个新的memory segment,表示字节数组的内存。
//由于字节数组由堆内存支持,因此该内存段将其数据保存在堆上。缓冲区的大小必须至少为 8 字节。
//memory segment引用给定的owner
MemorySegment(@Nonnull byte[] buffer, @Nullable Object owner) {
this.heapMemory = buffer;
this.offHeapBuffer = null;
this.size = buffer.length;
this.address = BYTE_ARRAY_BASE_OFFSET;
this.addressLimit = this.address + this.size;
this.owner = owner;
this.allowWrap = true;
this.cleaner = null;
this.isFreedAtomic = new AtomicBoolean(false);
}
2.基于MemorySegment管理堆外内存
在MemorySegment中通过ByteBuffer.allocateDirect(numBytes)方法申请堆外内存,然后用sun.misc.Unsafe对象操作堆外内存。
如下代码,在MemorySegmentFactory.allocateOffHeapUnsafeMemory()方法中,
/**
分配堆外不安全内存并创建一个新的内存段来表示该内存。
该段的创建会在其 java 包装对象即将被垃圾回收时调度其内存释放操作,类似于java.nio.DirectByteBuffer.DirectByteBuffer(int) 。不同之处在于,此内存分配不受选项 -XX:MaxDirectMemorySize 限制。
**/
public static MemorySegment allocateOffHeapUnsafeMemory(
int size, Object owner, Runnable customCleanupAction) {
long address = MemoryUtils.allocateUnsafe(size);
ByteBuffer offHeapBuffer = MemoryUtils.wrapUnsafeMemoryWithByteBuffer(address, size);
Runnable cleaner = MemoryUtils.createMemoryCleaner(address, customCleanupAction);
return new MemorySegment(offHeapBuffer, owner, false, cleaner);
}
如代码清单8-6所示,在MemoryUtils.wrapUnsafeMemoryWithByteBuffer()方法中,
//用ByteBuffer包装不安全的native memory
static ByteBuffer wrapUnsafeMemoryWithByteBuffer(long address, int size) {
//noinspection OverlyBroadCatchBlock
try {
ByteBuffer buffer = (ByteBuffer) UNSAFE.allocateInstance(DIRECT_BYTE_BUFFER_CLASS);
UNSAFE.putLong(buffer, BUFFER_ADDRESS_FIELD_OFFSET, address);
UNSAFE.putInt(buffer, BUFFER_CAPACITY_FIELD_OFFSET, size);
buffer.clear();
return buffer;
} catch (Throwable t) {
throw new Error("Failed to wrap unsafe off-heap memory with ByteBuffer", t);
}
}
在MemoryUtils中,DIRECT_BUFFER_CONSTRUCTOR通过反射获取。通过java.nio.DirectByteBuffer构造器创建ByteBuffer内存对象,并将其封装在MemorySegment中。
private static Class<?> getClassByName(
@SuppressWarnings("SameParameterValue") String className) {
try {
return Class.forName(className);
} catch (ClassNotFoundException e) {
throw new Error("Could not find class '" + className + "' for unsafe operations.", e);
}
}
接下来就可以通过MemorySegment使用申请到的堆外内存存储数据了,数据最终会以二进制的形式存储在指定地址的堆外内存空间中,再看下构造方法。
public static MemorySegment allocateOffHeapUnsafeMemory(
int size, Object owner, Runnable customCleanupAction) {
long address = MemoryUtils.allocateUnsafe(size);
ByteBuffer offHeapBuffer = MemoryUtils.wrapUnsafeMemoryWithByteBuffer(address, size);
Runnable cleaner = MemoryUtils.createMemoryCleaner(address, customCleanupAction);
return new MemorySegment(offHeapBuffer, owner, false, cleaner);
}
3.基于Unsafe管理MemorySegment
使用unsafe操作内存
MemorySegment能够同时操作堆内存和堆外内存,得益于sun.misc.Unsafe类的实现,Unsafe类提供了一系列可以直接操作内存的方法。sun.misc.Unsafe目前也已经下沉到MemorySegment中实现。
借助unsafe可以操作堆内、堆外内存使用
如代码所示,MemorySegment中使用sun.misc.Unsafe实现了对内存空间的管理,MemorySegment将创建出来的Unsage对象存储至静态变量,供所有MemorySegment持有者操作堆内存和堆外内存使用。
/** The unsafe handle for transparent memory copied (heap / off-heap). */
@SuppressWarnings("restriction")
private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE;
MemorySegment中的Unsafe对象主要通过MemoryUtils创建,和其他框架使用Unsafe库一样,都是通过反射的方式创建的,MemorySegment中的Unsafe创建完毕后,可以通过Unsafe库操作和管理堆内存和堆外内存空间。
4.写入和读取内存数据
在MemorySegment中将int类型的数据写入内存中
/**将给定的 int 值(32 位,4 个字节)写入系统本机字节顺序中的给定位置。
此方法为整数写入提供了最佳速度,除非需要特定的字节顺序,否则应使用此方法。
在大多数情况下,只要知道写入值的字节顺序与读取值的字节顺序相同就足够了
(例如内存中的瞬时存储,或者 I/O 和网络的序列化),使得此方法更好的选择。
**/
public void putInt(int index, int value) {
final long pos = address + index;
if (index >= 0 && pos <= addressLimit - 4) {
UNSAFE.putInt(heapMemory, pos, value);
} else if (address > addressLimit) {
throw new IllegalStateException("segment has been freed");
} else {
// index is in fact invalid
throw new IndexOutOfBoundsException();
}
}
读取MemorySegment中int类型的数据
/**
按照系统的本机字节顺序从给定位置读取 int 值(32 位,4 字节)。
此方法提供了整数读取的最佳速度,除非需要特定的字节顺序,否则应使用此方法。
在大多数情况下,只要知道写入值的字节顺序与读取值的字节顺序相同就足够了
(例如内存中的瞬时存储,或者 I/O 和网络的序列化),使得此方法更好的选择
**/
public int getInt(int index) {
final long pos = address + index;
if (index >= 0 && pos <= addressLimit - 4) {
return UNSAFE.getInt(heapMemory, pos);
} else if (address > addressLimit) {
throw new IllegalStateException("segment has been freed");
} else {
// index is in fact invalid
throw new IndexOutOfBoundsException();
}
}
5.创建MemoryCleaner垃圾清理器
内存泄漏的情况
flink提供了MemoryCleaner
createMemoryCleaner封装了两个逻辑:
//MemorySegmentFactory. 创建MemoryCleaner
public static MemorySegment allocateOffHeapUnsafeMemory(
int size, Object owner, Runnable customCleanupAction) {
long address = MemoryUtils.allocateUnsafe(size);
ByteBuffer offHeapBuffer = MemoryUtils.wrapUnsafeMemoryWithByteBuffer(address, size);
Runnable cleaner = MemoryUtils.createMemoryCleaner(address, customCleanupAction);
return new MemorySegment(offHeapBuffer, owner, false, cleaner);
}
//在MemoryUtils中,创建MemoryCleaner
/**
创建一个清理器来释放不安全的内存。
参数:
address – 要释放的不安全内存的地址 customCleanup – 清理 GC 的自定义操作
返回:
手动运行以释放不安全内存的操作
**/
static Runnable createMemoryCleaner(long address, Runnable customCleanup) {
return () -> {
releaseUnsafe(address);
customCleanup.run();
};
}
private static void releaseUnsafe(long address) {
UNSAFE.freeMemory(address);
}
customCleanup在flink内部定义了两种情况的清理逻辑:
- 内存大小来释放内存
package org.apache.flink.runtime.memory.UnsafeMemoryBudget.
//根据内存大小来释放内存
void releaseMemory(@Nonnegative long size) {
if (size == 0) {
return;
}
boolean released = false;
long currentAvailableMemorySize = 0L;
while (!released
&& totalMemorySize
>= (currentAvailableMemorySize = availableMemorySize.get()) + size) {
released =
availableMemorySize.compareAndSet(
currentAvailableMemorySize, currentAvailableMemorySize + size);
}
if (!released) {
throw new IllegalStateException(
String.format(
"Trying to release more managed memory (%d bytes) than has been allocated (%d bytes), the total size is %d bytes",
size, currentAvailableMemorySize, totalMemorySize));
}
}
- 没有清理策略,直接清理
private static final Runnable NO_OP = () -> {};