io.netty.buffer.PooledByteBuf<T>使用内存池中的一块内存作为自己的数据内存,这个块内存是PoolChunk<T>的一部分。PooledByteBuf<T>是一个抽象类型,它有4个派生类:
- PooledHeapByteBuf, PooledUnsafeHeapByteBuf 使用堆内存的PooledByteBuffer<byte[]>。
- PooledDirectByteBuf, PooledUnsafeDirectByteBuf 使用直接内存的PooledByteBuf<ByteBuffer>。
初始化
PooledByteBuf的初始化过程分为两个步骤:创建实例;初始化内存。这两个步骤的代码如下:
protected PooledByteBuf(Recycler.Handle recyclerHandle, int maxCapacity) { super(maxCapacity); this.recyclerHandle = recyclerHandle; } void init(PoolChunk<T> chunk, long handle, int offset, int length, int maxLength, PoolThreadCache cache) { init0(chunk, handle, offset, length, maxLength, cache); } private void init0(PoolChunk<T> chunk, long handle, int offset, int length, int maxLength, PoolThreadCache cache) { assert handle >= 0; assert chunk != null; this.chunk = chunk; memory = chunk.memory; allocator = chunk.arena.parent; this.cache = cache; this.handle = handle; this.offset = offset; this.length = length; this.maxLength = maxLength; tmpNioBuf = null; }
创建实例时调用的构造方法只是为maxCapacity和recyclerHandler属性赋值,构造方法是protected,不打算暴露到外面。派生类都提供了newInstance方法创建实例,以PooledHeapByteBuf为例,它的newInstance方法实现如下:
1 private static final Recycler<PooledHeapByteBuf> RECYCLER = new Recycler<PooledHeapByteBuf>() { 2 @Override 3 protected PooledHeapByteBuf newObject(Handle handle) { 4 return new PooledHeapByteBuf(handle, 0); 5 } 6 }; 7 8 static PooledHeapByteBuf newInstance(int maxCapacity) { 9 PooledHeapByteBuf buf = RECYCLER.get(); 10 buf.reuse(maxCapacity); 11 return buf; 12 }
这里的newInstance使用RECYCLER创建实例对象。Recycler<T>是一个轻量级的,支持循环使用的对象池。当对象池中没有可用对象时,会在第4行使用构造方法创建一个新的对象。
init调用init0初始化数据内存,init0方法为几个内存相关的关键属性赋值:
- chunk: PoolChunk<T>对象,这个PooledByteBuf使用的内存就是它的一部分。
- memory: 内存对象。更准确地说,PooledByteBuf使用的内存是它的一部分。
- allocator: 创建这个PooledByteBuf的PooledByteBufAllocator对象。
- cache: 线程专用的内存缓存。分配内存时会优先从这个缓存中寻找合适的内存块。
- handle: 内存在chunk中node的句柄。chunk使用handle可以计算出它对应内存的起始位置offset。
- offset: 分配内存的起始位置。
- length: 分配内存的长度,也是这个PooledByteBuf的capacity。
- maxLength: 这块内存node的最大长度。当调用capacity(int newCapacity)方法增加capacity时,只要newCapacity不大于这个值,就不用从新分配内存。
内存初始化完成之后,这个PooledByteBuf可使用的内存范围是memory内存中[offset, offset+length)。idx方法可以把ByteBuf的索引转换成memory的索引:
1 protected final int idx(int index) { 2 return offset + index; 3 }
重新分配内存
和前面讲过的ByteBuf实现一样,PooledByteBuf也需要使用capacity(int newCapacity)改变内存大小,也会涉及到把数据从旧内存中复制到新内存的问题。也就是说,要解决的问题是一样的,只是具体实现的差异。
1 @Override 2 public final ByteBuf capacity(int newCapacity) { 3 checkNewCapacity(newCapacity); 4 5 // If the request capacity does not require reallocation, just update the length of the memory. 6 if (chunk.unpooled) { 7 if (newCapacity == length) { 8 return this; 9 } 10 } else { 11 if (newCapacity > length) { 12 if (newCapacity <= maxLength) { 13 length = newCapacity; 14 return this; 15 } 16 } else if (newCapacity < length) { 17 if (newCapacity > maxLength >>> 1) { 18 if (maxLength <= 512) { 19 if (newCapacity > maxLength - 16) { 20 length = newCapacity; 21 setIndex(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity)); 22 return this; 23 } 24 } else { // > 512 (i.e. >= 1024) 25 length = newCapacity; 26 setIndex(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity)); 27 return this; 28 } 29 } 30 } else { 31 return this; 32 } 33 } 34 35 // Reallocation required. 36 chunk.arena.reallocate(this, newCapacity, true); 37 return this; 38 }
这个方法处理两大类情况: 不重新分配内存;重新分配内存并复制ByteBuf中的数据和状态。
不重新分配内存:
8行: chunk不需要回收到内存池中,且newCapacity没有变化。
11-32行: chunk需要回收到内存池中。
13-14行:内存增大,且newcapacity不大于maxLength。把容量修改成newCapacity即可。
20-22行: 内存减小, newCapacity 大于maxLength的一半,maxLength<=512, newCapacity >maxLength - 16。 把容量修改成newCapacity, 调整readerIndex, writerIndex。
25-27行: 内存减小,newCapacity大于maxLength的一半, maxLength > 512。把容量修改成newCapacity, 调整readerIndex, writerIndex。
31行: 内存不变,不做任何操作。
需要重新分配内存:
36行: 任何不满足以上情况的都要重新分配内存。这里使用Arena的reallocate方法重新分配内存,并把旧内存释放调,代码如下:
1 //io.netty.buffer.PoolArena#reallocate, 2 void reallocate(PooledByteBuf<T> buf, int newCapacity, boolean freeOldMemory) { 3 if (newCapacity < 0 || newCapacity > buf.maxCapacity()) { 4 throw new IllegalArgumentException("newCapacity: " + newCapacity); 5 } 6 7 int oldCapacity = buf.length; 8 if (oldCapacity == newCapacity) { 9 return; 10 } 11 12 PoolChunk<T> oldChunk = buf.chunk; 13 long oldHandle = buf.handle; 14 T oldMemory = buf.memory; 15 int oldOffset = buf.offset; 16 int oldMaxLength = buf.maxLength; 17 int readerIndex = buf.readerIndex(); 18 int writerIndex = buf.writerIndex(); 19 20 allocate(parent.threadCache(), buf, newCapacity); 21 if (newCapacity > oldCapacity) { 22 memoryCopy( 23 oldMemory, oldOffset, 24 buf.memory, buf.offset, oldCapacity); 25 } else if (newCapacity < oldCapacity) { 26 if (readerIndex < newCapacity) { 27 if (writerIndex > newCapacity) { 28 writerIndex = newCapacity; 29 } 30 memoryCopy( 31 oldMemory, oldOffset + readerIndex, 32 buf.memory, buf.offset + readerIndex, writerIndex - readerIndex); 33 } else { 34 readerIndex = writerIndex = newCapacity; 35 } 36 } 37 38 buf.setIndex(readerIndex, writerIndex); 39 40 if (freeOldMemory) { 41 free(oldChunk, oldHandle, oldMaxLength, buf.cache); 42 } 43 }
7-9行: 内存大小没变化,返回。
12-18行: 记录下旧内存的信息,readerIndex, writerIndex。
20行: 为PooledByteBuf分配新内存。
21-38行: 把旧内存中数据复制到新内存,并把readerIndex,writerIndex设置在正确。
41行: 释放就内存。
释放内存
内存释放代码在deallocate中:
1 @Override 2 protected final void deallocate() { 3 if (handle >= 0) { 4 final long handle = this.handle; 5 this.handle = -1; 6 memory = null; 7 tmpNioBuf = null; 8 chunk.arena.free(chunk, handle, maxLength, cache); 9 chunk = null; 10 recycle(); 11 } 12 }
关键是第8行代码,使用PoolArena的free方法释放内存。然后是recycle把当前PooledByteBuf对象放到RECYCLER中循环使用。
PooledByteBufAllocator创建内存管理模块
在前面分析PooledByteBuf内存初始化,重新分配及释放时,看到了内存管理的三个核心模块: PoolArena(chunk.arena), PoolChunk(chunk), PoolThreadCache(cache)。PooledByteBuf的内存管理能力都是使用这三模块实现的,它本身没有实现内存管理算法。当需要为PooledByteBuf分配一块内存时,先从一个线程专用的PoolThreadCache中得到一个PoolArena, 使用PoolArena的allocate找到一个满足要求内存块PoolChunk, 从这个内存块中分配一块连续的内存handle,计算出这块内存起始位置的偏移量offset, 最后调用PooledByteBuf的init方法初始化内存完成内存分配。 释放内存调用PoolArena的free方法。在内存分配时,会优先从PoolThreadCache中寻找合适的内存块。在内存释放时会把内存块暂时放在PoolThreadCache中,等使用频率过低时才会还给PoolChunk。这三个模块中PoolArena, PoolThreadCache由PooledByteBufAllocator创建,PoolChunk由PoolArean维护。
PooledByteBufAllocator维护了相关的几个属性:
PoolArena<byte[]>[] heapArenas
PoolArena<ByteBuffer>[] directArenas
PoolThreadLocalCache threadCache
headArenas和directArenas分别维护了多个PoolArena, 他们分别用来分配堆内存和直接内存。 如果使用得当,可以让每个线程持有一个专用的PoolArena, 避免线程间数据同步的开销。PoolThreadLocalCache会为每个线程创建一个专用的PoolThreadCache实例,这个实例分别持有一个heapArena和directArena。
接下来的的几个章节会详细分析PoolArena和PoolThreadCache的实现代码。