概述
PooledByteBufferWriter
是一个基于 ArrayPool<byte>
的高性能缓冲区写入器,实现了 IBufferWriter<byte>
接口。它专为需要动态扩容、避免频繁内存分配的大块连续写入场景而设计,如网络协议构建、序列化操作等。
核心特性
- 池化内存管理:基于
ArrayPool<byte>.Shared
进行数组租借与归还,减少 GC 压力
- 动态扩容:支持自动扩容,近似2倍增长策略
- IBufferWriter接口:与 .NET 标准序列化库完全兼容
- 零拷贝操作:提供直接的内存视图,避免不必要的数据复制
- 流操作支持:直接写入到
Stream
,支持同步和异步
- 安全边界检查:防止缓冲区溢出和内存泄露
构造和生命周期
基础构造
// 指定初始容量(从数组池租用)
using var writer = new PooledByteBufferWriter(1024);
// 也可以不使用 using,但必须手动调用 Dispose
var writer2 = new PooledByteBufferWriter(4096);
// ... 使用 writer2 ...
writer2.Dispose(); // 必须调用以归还内存到池中
重用实例
var writer = new PooledByteBufferWriter(1024);
// 第一次使用
WriteData1(writer);
var result1 = writer.WrittenMemory.ToArray();
writer.Clear(); // 清空内容,但保持内存不归还
// 第二次使用(复用相同的内存)
WriteData2(writer);
var result2 = writer.WrittenMemory.ToArray();
// 使用完毕,归还内存
writer.Dispose();
主要属性
属性 | 类型 | 说明 |
|
| 已写入的内存(只读) |
|
| 已写入的数据段 |
|
| 已写入字节数 |
|
| 当前缓冲区总容量 |
|
| 剩余可写容量 |
IBufferWriter 接口实现
获取写入缓冲区
var writer = new PooledByteBufferWriter(512);
// 获取至少256字节的可写内存
Memory<byte> buffer = writer.GetMemory(256);
// 获取至少256字节的可写Span
Span<byte> span = writer.GetSpan(256);
// 写入一些数据到span
span[0] = 0x01;
span[1] = 0x02;
span[2] = 0x03;
// 通知已写入3字节
writer.Advance(3);
与序列化库配合使用
// 与 System.Text.Json 配合
var writer = new PooledByteBufferWriter(1024);
using var jsonWriter = new Utf8JsonWriter(writer);
jsonWriter.WriteStartObject();
jsonWriter.WriteString("name", "张三");
jsonWriter.WriteNumber("age", 25);
jsonWriter.WriteEndObject();
jsonWriter.Flush();
// 获取JSON字节
ReadOnlyMemory<byte> jsonBytes = writer.WrittenMemory;
string json = Encoding.UTF8.GetString(jsonBytes.Span);
writer.Dispose();
与 MessagePack 配合
var writer = new PooledByteBufferWriter(1024);
// 使用 MessagePack 序列化
MessagePackSerializer.Serialize(writer, new { Name = "张三", Age = 25 });
// 获取序列化结果
ReadOnlyMemory<byte> msgPackBytes = writer.WrittenMemory;
writer.Dispose();
直接操作方法
清空操作
var writer = new PooledByteBufferWriter(1024);
// 写入一些数据
var span = writer.GetSpan(100);
span.Fill(0x42);
writer.Advance(100);
Console.WriteLine(writer.WrittenCount); // 输出: 100
// 仅清空内容(不归还内存,可继续使用)
writer.Clear();
Console.WriteLine(writer.WrittenCount); // 输出: 0
// 清空并归还内存到池中(之后不能再使用)
writer.ClearAndReturnBuffers();
重新初始化
var writer = new PooledByteBufferWriter(1024);
// ... 使用writer ...
// 清空并归还原有内存
writer.ClearAndReturnBuffers();
// 重新初始化为新的实例
writer.InitializeEmptyInstance(2048);
// 现在可以继续使用writer
流操作
写入到流(同步)
var writer = new PooledByteBufferWriter(1024);
// 写入一些数据
var span = writer.GetSpan(20);
Encoding.UTF8.GetBytes("Hello World".AsSpan(), span);
writer.Advance(11);
// 直接写入到流
using var fileStream = File.Create("output.txt");
writer.WriteToStream(fileStream);
writer.Dispose();
写入到流(异步)
var writer = new PooledByteBufferWriter(1024);
// 写入数据...
WriteData(writer);
// 异步写入到流
using var networkStream = GetNetworkStream();
await writer.WriteToStreamAsync(networkStream);
writer.Dispose();
完整的异步示例
public async Task SerializeAndSendAsync<T>(T data, Stream stream)
{
using var writer = new PooledByteBufferWriter(4096);
// 使用JSON序列化
using var jsonWriter = new Utf8JsonWriter(writer);
JsonSerializer.Serialize(jsonWriter, data);
jsonWriter.Flush();
// 异步发送到网络
await writer.WriteToStreamAsync(stream);
}
扩容机制
扩容策略
// 内部扩容逻辑(简化版):
// 1. 当剩余空间不足时,计算新大小 = 当前长度 + max(请求大小, 当前长度)
// 2. 从数组池租用新数组
// 3. 复制已有数据到新数组
// 4. 清理并归还旧数组
var writer = new PooledByteBufferWriter(100);
Console.WriteLine(writer.Capacity); // 可能输出: 128 (数组池分配的实际大小)
// 写入数据触发扩容
var largeSpan = writer.GetSpan(500); // 需要500字节,但当前容量不足
// 内部会自动扩容到合适大小(通常是当前大小的2倍左右)
Console.WriteLine(writer.Capacity); // 可能输出: 512 或更大
容量限制
// 最大容量接近 Array.MaxLength (约2GB)
const int MaxArrayLength = 0x7FFFFFC7; // 2,147,483,591 字节
// 当接近上限时,扩容策略会更保守
var writer = new PooledByteBufferWriter(1024);
try
{
// 对于超大数据,可能会抛出 OutOfMemoryException
var hugeSpan = writer.GetSpan(int.MaxValue / 2);
}
catch (OutOfMemoryException ex)
{
Console.WriteLine($"内存不足: {ex.Message}");
}
性能优化
预分配合适的初始容量
// 不好的做法:频繁扩容
var writer1 = new PooledByteBufferWriter(16); // 初始容量太小
for (int i = 0; i < 1000; i++)
{
var span = writer1.GetSpan(100);
// 每次都可能触发扩容...
writer1.Advance(100);
}
// 好的做法:预估合适的初始容量
var writer2 = new PooledByteBufferWriter(100 * 1000); // 预分配足够空间
for (int i = 0; i < 1000; i++)
{
var span = writer2.GetSpan(100);
// 不会触发扩容
writer2.Advance(100);
}
批量写入
public void EfficientWrite(PooledByteBufferWriter writer, byte[][] dataChunks)
{
// 计算总大小
int totalSize = dataChunks.Sum(chunk => chunk.Length);
// 一次性获取足够空间
var span = writer.GetSpan(totalSize);
// 批量复制
int offset = 0;
foreach (var chunk in dataChunks)
{
chunk.CopyTo(span.Slice(offset));
offset += chunk.Length;
}
writer.Advance(totalSize);
}
错误处理
常见异常
var writer = new PooledByteBufferWriter(100);
try
{
// ArgumentOutOfRangeException: 负数advance
writer.Advance(-1);
}
catch (ArgumentOutOfRangeException ex)
{
Console.WriteLine($"无效的advance值: {ex.Message}");
}
try
{
// ArgumentOutOfRangeException: advance超出容量
writer.Advance(writer.Capacity + 1);
}
catch (ArgumentOutOfRangeException ex)
{
Console.WriteLine($"advance超出容量: {ex.Message}");
}
try
{
// OutOfMemoryException: 请求超大空间
writer.GetSpan(int.MaxValue);
}
catch (OutOfMemoryException ex)
{
Console.WriteLine($"内存不足: {ex.Message}");
}
资源泄露防护
// 使用using确保资源释放
public void SafeUsage()
{
using var writer = new PooledByteBufferWriter(1024);
// 即使发生异常,Dispose也会被调用
DoSomeWork(writer);
} // 自动调用 Dispose,归还内存到池中
// 手动管理(需要确保异常安全)
public void ManualUsage()
{
var writer = new PooledByteBufferWriter(1024);
try
{
DoSomeWork(writer);
}
finally
{
writer.Dispose(); // 必须在finally中调用
}
}
使用场景
HTTP响应构建
public async Task<byte[]> BuildHttpResponseAsync(object data)
{
using var writer = new PooledByteBufferWriter(4096);
// 写入HTTP头部
var headerSpan = writer.GetSpan(256);
var headerBytes = Encoding.ASCII.GetBytes("HTTP/1.1 200 OK\r\nContent-Type: application/json\r\n\r\n");
headerBytes.CopyTo(headerSpan);
writer.Advance(headerBytes.Length);
// 写入JSON正文
using var jsonWriter = new Utf8JsonWriter(writer);
JsonSerializer.Serialize(jsonWriter, data);
jsonWriter.Flush();
return writer.WrittenMemory.ToArray();
}
网络数据包构建
public ReadOnlyMemory<byte> BuildNetworkPacket(int messageId, byte[] payload)
{
using var writer = new PooledByteBufferWriter(payload.Length + 64);
// 写入包头
var headerSpan = writer.GetSpan(16);
BinaryPrimitives.WriteInt32LittleEndian(headerSpan, 0x12345678); // Magic
BinaryPrimitives.WriteInt32LittleEndian(headerSpan[4..], messageId);
BinaryPrimitives.WriteInt32LittleEndian(headerSpan[8..], payload.Length);
writer.Advance(12);
// 写入负载
var payloadSpan = writer.GetSpan(payload.Length);
payload.CopyTo(payloadSpan);
writer.Advance(payload.Length);
return writer.WrittenMemory;
}
文件格式生成
public void GenerateBinaryFile(string filePath, IEnumerable<Record> records)
{
using var writer = new PooledByteBufferWriter(8192);
using var fileStream = File.Create(filePath);
// 写入文件头
var magic = "MYFORMAT"u8;
var headerSpan = writer.GetSpan(magic.Length + 4);
magic.CopyTo(headerSpan);
BinaryPrimitives.WriteInt32LittleEndian(headerSpan[magic.Length..], records.Count());
writer.Advance(magic.Length + 4);
// 写入记录
foreach (var record in records)
{
WriteRecord(writer, record);
// 当缓冲区积累足够数据时,刷新到文件
if (writer.WrittenCount > 4096)
{
writer.WriteToStream(fileStream);
writer.Clear(); // 重置缓冲区继续使用
}
}
// 写入剩余数据
if (writer.WrittenCount > 0)
{
writer.WriteToStream(fileStream);
}
}
与其他缓冲区类型对比
vs. MemoryStream
// MemoryStream: 更高级但有GC压力
using var ms = new MemoryStream();
// 写入操作...
byte[] result1 = ms.ToArray(); // 需要复制数据
// PooledByteBufferWriter: 更底层但零拷贝
using var writer = new PooledByteBufferWriter(1024);
// 写入操作...
ReadOnlyMemory<byte> result2 = writer.WrittenMemory; // 无复制
vs. ArrayBufferWriter
// ArrayBufferWriter: 使用托管数组
var arrayWriter = new ArrayBufferWriter<byte>();
// 写入操作...
// 内存不会自动归还,依赖GC回收
// PooledByteBufferWriter: 使用数组池
using var pooledWriter = new PooledByteBufferWriter(1024);
// 写入操作...
// 使用完毕自动归还到池中,减少GC压力
最佳实践
- 合理预估初始容量:避免频繁扩容影响性能
- 及时释放资源:使用
using
语句或确保调用Dispose
- 批量操作:尽量减少
GetSpan
/Advance
调用次数
- 异常安全:在
finally
块中确保资源释放
- 复用实例:对于频繁操作,可以使用
Clear()
复用同一实例
- 监控内存使用:在高并发场景下监控数组池的使用情况
限制与注意事项
- 非线程安全:单个实例不能跨线程并发使用
- 生命周期管理:必须正确调用
Dispose
以归还内存
- 最大容量限制:接近 2GB 的理论上限
- 数组池依赖:依赖
ArrayPool<byte>.Shared
的实现质量
- 内存清理:归还前会清理敏感数据,可能有轻微性能开销
相关类型
SpanReader
- 高性能字节流读取器
SpanWriter
- 高性能字节流写入器(固定缓冲区)
SpanHelper
- Span相关的辅助方法