一、RecyclableMemoryStreamManager
源码地址:https://github.com/Microsoft/Microsoft.IO.RecyclableMemoryStream
小对象池和大对象池管理、RecyclableMemoryStream创建、各场景的ETW消息\事件钩子;线程安全
备注:官方这张图,只是池块增长策略阐述,不能很好理解其内部池具体实现。小对象还好理解,大对象组织分配并不像画的这样。
1.1、构造函数参数
blockSize:小对象池,块大小;默认128KB。
largeBufferMultiple:大对象池,策略被乘数大小;默认1M。
maximumBufferSize:大对象池,块最大大小;默认128M。
useExponentialLargeBuffer:大对象池策略,true:指数、false:线性;一个类实体只能对应一种策略,如果想使用2种策略,就要定义2个类实体;默认线性。
maximumSmallPoolFreeBytes:byte[] 归还小对象池,小对象池最大大小;超过GC回收,否则归还小对象池;默认不限制。
maximumLargePoolFreeBytes:byte[] 归还大对象池,大对象池最大大小;超过GC回收,否则归还大对象池;默认不限制。
1.2、小对象池
private readonly ConcurrentStack<byte[]> smallPool; //小对象池
private long smallPoolFreeSize; //小对象池归还(空闲)byte大小
private long smallPoolInUseSize; //小对象借出(使用)byte大小
smallPool 使用线程安全堆栈,每个元素bye[] 大小一样,对应blockSize
如:blockSize = 128K,则bye[] 大小都是 128K
maximumSmallPoolFreeBytes = 128M,则代表smallPool 所有块大小总和最大值为128M,超出则归还时不保存到最小池中
1.3、大对象池
private readonly ConcurrentStack<byte[]>[] largePools; //大对象池
private readonly long[] largeBufferFreeSize; //大对象每个层级,归还(空闲)byte大小
private readonly long[] largeBufferInUseSize; //大对象每个层级,借出(使用)byte大小
largePools 使用线程安全堆栈的数组,数组每个索引对应指数/线性一个层级大小,每个层级bye[] 大小一样。
如:largeBufferMultiple = 1M、maximumBufferSize = 128M
则,指数 1M、2M、4M、8M、16M、32M、64M、128M,largePools.Length = 8;largePools[2] 下面bye[] 大小都是4M
线性 1M、2M、3M、4M、5M、6M、7M、......、128M,largePools.Length = 128;largePools[2] 下面bye[] 大小都是3M
largeBufferFreeSize.Length == largePools.Length
largeBufferInUseSize = largePools.Length + 1 ,多出一个元素保存,借出时requiredSize > maximumBufferSize 所有byte[]大小,此byte[] 无法归还到大对象池,会被GC直接回收。
maximumLargePoolFreeBytes = 256M,则代表大池的各个维度块大小总和最大值,超出则归还时不保存到池中,各个维度如:线性有128个维度,指数8个维度,各个维度都是堆栈
线性最大空间:256M * 128 = 32G
指数最大空间:256M * 8 = 2G
1.4、byte[] 借出/归还
借出:
internal byte[] GetBlock() //从小对象池获取byte[],若无则直接创建 new byte[this.BlockSize]
internal byte[] GetLargeBuffer(long requiredSize, Guid id, string tag) //从大对象池获取byte[],首先根据requiredSize计算对应大对象索引位置,若无则直接创建 new byte[requiredSize]
归还:
internal void ReturnBlocks(List<byte[]> blocks, Guid id, string tag) //多个块归还小对象池,判断是否maximumSmallPoolFreeBytes超出,不超出则归还
internal void ReturnBlock(byte[] block, Guid id, string tag) //单个块归还小对象池,判断是否maximumSmallPoolFreeBytes超出,不超出则归还
//归还大对象池,首先根据buffer.Length计算对应大对象索引位置,判断索引对应层级大小是否maximumLargePoolFreeBytes超出,不超出则归还
internal void ReturnLargeBuffer(byte[] buffer, Guid id, string tag)
1.5、RecyclableMemoryStream创建
public MemoryStream GetStream(Guid id, string tag, long requiredSize, bool asContiguousBuffer)
asContiguousBuffer == true && requiredSize > this.BlockSize
请求连续byte[] 且 请求字节大于1个小对象块大小时,则使用大象池创建RecyclableMemoryStream,否则使用小对象池创建RecyclableMemoryStream。
其他重载方法,无asContiguousBuffer参数,默认使用小对象池创建RecyclableMemoryStream。
方法中,byte[] buffer、Memory<byte> buffer、ReadOnlySpan<byte> buffer 参数,会把其中的byte 数据写入新申请的小对象blocks里面,不会复用这些对象。
二、RecyclableMemoryStream
非线程安全
2.1、构造函数
internal RecyclableMemoryStream(RecyclableMemoryStreamManager memoryManager, Guid id, string tag, long requestedSize, byte[] initialLargeBuffer)
memoryManager:RecyclableMemoryStreamManager引用,调用其提供借出\归还\通知状态等方法或者属性。
initialLargeBuffer:不为null代表使用大对象池,否则小对象池。GetBuffer()方法也会影响小对象池转为大对象池。
requestedSize:根据请求大小,分配1个大对象池bye[] 或者多个小对象池Block byte[]
2.2、属性
private readonly List<byte[]> blocks = new List<byte[]>(); //如果使用小对象池,则保存借出的多个小对象
private byte[] largeBuffer; //如果使用大对象池,则保存借出的大对象
/*
* 如果使用大对象池,Capacity调整需要更换更大的大对象;
* 老的大对象归还,大对象池超出暂时无法回收大对象,则保存到此,在对象Dispose时再次尝试归还。
* 因为可能有多次此情况发生,所有为List<>
*/
private List<byte[]> dirtyBuffers;
2.3、释放/关闭/析构方法
/// <summary>
/// The finalizer will be called when a stream is not disposed properly.
/// </summary>
/// <remarks>Failing to dispose indicates a bug in the code using streams. Care should be taken to properly account for stream lifetime.</remarks>
~RecyclableMemoryStream()
{
// 析构方法,兜底释放
this.Dispose(false);
}
//非公开方法
/// <summary>
/// Returns the memory used by this stream back to the pool.
/// </summary>
/// <param name="disposing">Whether we're disposing (true), or being called by the finalizer (false).</param>
//disposing 区分是否析构函数调用
protected override void Dispose(bool disposing)
{
if (this.disposed)
{
// 已释放不在释放,记录通知事件
string doubleDisposeStack = null;
if (this.memoryManager.GenerateCallStacks)
{
doubleDisposeStack = Environment.StackTrace;
}
this.memoryManager.ReportStreamDoubleDisposed(this.id, this.tag, this.AllocationStack, this.DisposeStack, doubleDisposeStack);
return;
}
//标记已释放
this.disposed = true;
if (this.memoryManager.GenerateCallStacks)
{
this.DisposeStack = Environment.StackTrace;
}
this.memoryManager.ReportStreamDisposed(this.id, this.tag, this.AllocationStack, this.DisposeStack);
if (disposing)
{
//已释放,不用进入析构队列,不会触发析构函数。
GC.SuppressFinalize(this);
}
else
{
// We're being finalized.
this.memoryManager.ReportStreamFinalized(this.id, this.tag, this.AllocationStack);
//如果此应用程序域正在卸载,并且公共语言运行时已开始调用终止程序,则不执行归还池逻辑。
if (AppDomain.CurrentDomain.IsFinalizingForUnload())
{
// If we're being finalized because of a shutdown, don't go any further.
// We have no idea what's already been cleaned up. Triggering events may cause
// a crash.
base.Dispose(disposing);
return;
}
}
this.memoryManager.ReportStreamLength(this.length);
if (this.largeBuffer != null)
{
//归还大对象
this.memoryManager.ReturnLargeBuffer(this.largeBuffer, this.id, this.tag);
}
if (this.dirtyBuffers != null)
{
//再次尝试归还老的大对象列表
foreach (var buffer in this.dirtyBuffers)
{
this.memoryManager.ReturnLargeBuffer(buffer, this.id, this.tag);
}
}
//归还小对象块列表
this.memoryManager.ReturnBlocks(this.blocks, this.id, this.tag);
this.blocks.Clear();
base.Dispose(disposing);
}
//公共方法
/// <summary>
/// Equivalent to <c>Dispose</c>.
/// </summary>
public override void Close()
{
this.Dispose(true);
}
2.4、小对象使用内部类
标识位置信息,方便参数传递,操作blocks属性。
private struct BlockAndOffset
{
public int Block; //小对象块所在整体位置索引
public int Offset; //小对象块中未使用字节开始位置\已使用字节结束位置
public BlockAndOffset(int block, int offset)
{
this.Block = block;
this.Offset = offset;
}
}
2.5、不连续字节流
public ReadOnlySequence<byte> GetReadOnlySequence()
{
this.CheckDisposed();
if (this.largeBuffer != null)
{
//大对象,只有1个字节数组,连续的
AssertLengthIsSmall();
return new ReadOnlySequence<byte>(this.largeBuffer, 0, (int)this.length);
}
if (this.blocks.Count == 1)
{
//小对象1个块,只有1个字节数组,连续的
AssertLengthIsSmall();
return new ReadOnlySequence<byte>(this.blocks[0], 0, (int)this.length);
}
//小对象多个块,多个字节数据,不连续的
var first = new BlockSegment(this.blocks[0]);
var last = first;
//创建关联下一个块对象
for (int blockIdx = 1; last.RunningIndex + last.Memory.Length < this.length; blockIdx++)
{
last = last.Append(this.blocks[blockIdx]);
}
//首尾对象
return new ReadOnlySequence<byte>(first, 0, last, (int)(this.length - last.RunningIndex));
}
private sealed class BlockSegment : ReadOnlySequenceSegment<byte>
{
public BlockSegment(Memory<byte> memory) => Memory = memory;
public BlockSegment Append(Memory<byte> memory)
{
var nextSegment = new BlockSegment(memory) { RunningIndex = RunningIndex + Memory.Length };
Next = nextSegment;
return nextSegment;
}
}
2.6、IBufferWriter<T>接口实现
private byte[] bufferWriterTempBuffer;
private ArraySegment<byte> GetWritableBuffer(int sizeHint)
{
this.CheckDisposed();
if (sizeHint < 0)
{
throw new ArgumentOutOfRangeException(nameof(sizeHint), $"{nameof(sizeHint)} must be non-negative.");
}
var minimumBufferSize = Math.Max(sizeHint, 1);
this.EnsureCapacity(this.position + minimumBufferSize);
if (this.bufferWriterTempBuffer != null)
{
this.ReturnTempBuffer(this.bufferWriterTempBuffer);
this.bufferWriterTempBuffer = null;
}
if (this.largeBuffer != null)
{
return new ArraySegment<byte>(this.largeBuffer, (int)this.position, this.largeBuffer.Length - (int)this.position);
}
BlockAndOffset blockAndOffset = this.GetBlockAndRelativeOffset(this.position);
int remainingBytesInBlock = this.MemoryManager.BlockSize - blockAndOffset.Offset;
if (remainingBytesInBlock >= minimumBufferSize)
{
//分配小对象,范围属于一个block,则返回block连续段
return new ArraySegment<byte>(this.blocks[blockAndOffset.Block], blockAndOffset.Offset, this.MemoryManager.BlockSize - blockAndOffset.Offset);
}
//分配小对象,单位大于一个block块,则通过大对象/小对象分配byte[];记录赋值给属性bufferWriterTempBuffer;规避不好返回多个blocks中byte
this.bufferWriterTempBuffer = minimumBufferSize > this.memoryManager.BlockSize ?
this.memoryManager.GetLargeBuffer(minimumBufferSize, this.id, this.tag) :
this.memoryManager.GetBlock();
return new ArraySegment<byte>(this.bufferWriterTempBuffer);
}
public void Advance(int count)
{
this.CheckDisposed();
if (count < 0)
{
throw new ArgumentOutOfRangeException(nameof(count), $"{nameof(count)} must be non-negative.");
}
byte[] buffer = this.bufferWriterTempBuffer;
if (buffer != null)
{
if (count > buffer.Length)
{
throw new InvalidOperationException($"Cannot advance past the end of the buffer, which has a size of {buffer.Length}.");
}
//把bufferWriterTempBuffer属性中数据,写回小对象blocks
this.Write(buffer, 0, count);
this.ReturnTempBuffer(buffer);
this.bufferWriterTempBuffer = null;
}
else
{
long bufferSize = this.largeBuffer == null
? this.memoryManager.BlockSize - this.GetBlockAndRelativeOffset(this.position).Offset
: this.largeBuffer.Length - this.position;
if (count > bufferSize)
{
throw new InvalidOperationException($"Cannot advance past the end of the buffer, which has a size of {bufferSize}.");
}
this.position += count;
this.length = Math.Max(this.position, this.length);
}
}
2.7、GetBuffer()
如果使用大对象,则返回大对象数据
如果使用小对象+块1个,则直接返回这个块。如果大于1个,则申请新大对象返回,之前小对象归还。升级为使用大对象。
2.8、ToArray()
通过new byte[this.Length] 创建新byte数组,拷贝大对象/小对象数据过来。