网络库提供了数据包编码器架构,用于实现各种数据封包,灵活解决粘包问题。常见SRMP标准封包、固定长度封包、字符分隔封包等,具体应用有MQTT和RocketMQ协议实现。
Nuget包:NewLife.Core
源码地址:https://github.com/NewLifeX/X/blob/master/NewLife.Core/Model/IHandler.cs
Get Started
新建NET7控制台项目,并从Nuget引入 NewLife.Core,写入以下代码:
using NewLife;
using NewLife.Data;
using NewLife.Log;
using NewLife.Net;
using NewLife.Net.Handlers;
using NewLife.Serialization;
XTrace.UseConsole();
var server = new NetServer
{
Port = 12345,
Log = XTrace.Log,
SessionLog = XTrace.Log,
//SocketLog = XTrace.Log,
//LogSend = true,
//LogReceive = true
};
server.Add<LengthFieldCodec>();
server.Received += (s, e) =>
{
XTrace.WriteLine("原始:{0}", e.Packet.ToHex(32, "-"));
if (e.Message is IPacket pk)
XTrace.WriteLine("收到:{0}", pk.ToStr());
};
server.Start();
var uri = new NetUri("tcp://127.0.0.1:12345");
var client = uri.CreateRemote();
client.Log = XTrace.Log;
client.Add<LengthFieldCodec>();
client.Open();
var str = "Stone";
var pk = new Packet(str.GetBytes());
client.SendMessage(pk);
var info = new LoginInfo { UserName = "Stone", Password = "NewLife" };
pk = new Packet(info.ToJson().GetBytes());
client.SendMessage(pk);
Console.ReadLine();
class LoginInfo
{
public String UserName { get; set; }
public String Password { get; set; }
}
执行结果
这是一个添加了长度封包编码器 LengthFieldCodec 的网络通信例程,客户端发送了两次消息,SendMessge内部经过编码器对消息进行编码,服务端的编码器对消息进行解码。
- 发送字符串Stone,本应只有5个字节,服务端收到7个字节,开头多了 05-00,即后续数据长度5字节。
- 发送一段Json字符串,服务端收到数据头部多了 29-00(0x0029的小端字节序),即后续数据长度41字节
由此可见,数据封包编码器,本质上就是在消息体外部再包装一层,增加一些标识以便于接收方判断消息体如何拆分,处理粘包问题。
长度封包编码器 LengthFieldCodec
常见协议使用指定字段表示负载数据长度,非常推荐使用。如上述例程。
可用属性项如下:
- Offset。长度的偏移量,截取数据包时加上,否则将会漏掉长度之间的数据包,如MQTT
- Size。长度占据字节数,1/2/4个字节,0表示压缩编码整数,默认2
- Expire。过期时间,超过该时间后按废弃数据处理,默认5000ms
长度是2字节时,可传输最大字节数限制在65535。有些协议使用4字节,硬件通信时需要在头部增加4个字节。
字节分割编码器SplitDataCodec
按指定分割字节来处理粘包的处理器,常见于上位机到下位机通信,或者单片机串口通信场景。
可用属性项:
- SplitData。粘包分割字节数据(默认0x0D,0x0A)
- MaxCacheDataLength。最大缓存待处理数据,默认1024字节
标准封包编码器StandardCodec
标准网络封包。头部4字节定长。协议 《简易远程消息协议SRMP》
示例代码
using NewLife;
using NewLife.Data;
using NewLife.Log;
using NewLife.Net;
using NewLife.Net.Handlers;
using NewLife.Serialization;
XTrace.UseConsole();
var server = new NetServer
{
Port = 12345,
Log = XTrace.Log,
SessionLog = XTrace.Log,
};
server.Add<StandardCodec>();
server.Received += (s, e) =>
{
XTrace.WriteLine("原始:{0}", e.Packet.ToHex(32, "-"));
if (e.Message is IPacket pk)
XTrace.WriteLine("收到:{0}", pk.ToStr());
};
server.Start();
var uri = new NetUri("tcp://127.0.0.1:12345");
var client = uri.CreateRemote();
client.Log = XTrace.Log;
client.Add<StandardCodec>();
client.Open();
var str = "Stone";
var pk = new Packet(str.GetBytes());
client.SendMessage(pk);
var info = new LoginInfo { UserName = "Stone", Password = "NewLife" };
pk = new Packet(info.ToJson().GetBytes());
client.SendMessage(pk);
Console.ReadLine();
class LoginInfo
{
public String UserName { get; set; }
public String Password { get; set; }
}
执行结果
自定义编码器IHandler
可以建立自己的编码器,实现IHandler接口即可,一般通过继承Handler类来简化实现过程。
多个IHandler按照先后顺序形成管道IPipeline,接收网络数据就是顺序通过管道内的各个处理器Read方法,发送网络数据就是逆序通过管道内的各个处理器Write方法。
IHandler接口的完整定义
/// <summary>处理器</summary>
public interface IHandler
{
/// <summary>上一个处理器</summary>
IHandler? Prev { get; set; }
/// <summary>下一个处理器</summary>
IHandler? Next { get; set; }
/// <summary>读取数据,返回结果作为下一个处理器消息</summary>
/// <remarks>
/// 最终处理器决定如何使用消息。
/// 处理得到单个消息时,调用一次下一级处理器,返回下级结果给上一级;
/// 处理得到多个消息时,调用多次下一级处理器,返回null给上一级;
/// </remarks>
/// <param name="context">上下文</param>
/// <param name="message">消息</param>
Object? Read(IHandlerContext context, Object message);
/// <summary>写入数据,返回结果作为下一个处理器消息</summary>
/// <param name="context">上下文</param>
/// <param name="message">消息</param>
Object? Write(IHandlerContext context, Object message);
/// <summary>打开连接</summary>
/// <param name="context">上下文</param>
Boolean Open(IHandlerContext context);
/// <summary>关闭连接</summary>
/// <param name="context">上下文</param>
/// <param name="reason">原因</param>
Boolean Close(IHandlerContext context, String reason);
/// <summary>发生错误</summary>
/// <param name="context">上下文</param>
/// <param name="exception">异常</param>
Boolean Error(IHandlerContext context, Exception exception);
}
其中核心读写操作:
1. Read 读取数据
接收网络数据,返回结果作为下一个处理器消息。
- 第一个处理器拿到message就是原始网络数据包,IPacket类型
- 最终处理器拿到最终解码处理好的消息,一般是应用协议报文(如MqttMessage),并决定如何使用消息。
- 处理得到单个消息时,调用一次下一级处理器,返回下级结果给上一级;
- 处理得到多个消息时,调用多次下一级处理器,返回null给上一级;
- 接收数据进入Read管道时,每个处理器可能调用一次或多次下一级处理器,也可能不调用;
- 可以在最终处理器里面写业务逻辑,也可以继续用Received事件里访问Message字段得到消息,Packet字段是经过编码器之前的原始数据包;
2. Write 写入数据
发送网络数据,逆序使用IPipeline管道,返回结果作为下一个处理器消息。
- 第一个处理器拿到message就是用户层发送的应用协议报文(如MqttMessage),处理器需要把该报文转为数据包,例如MqttCodec负责把MqttMessage转为IPacket;
- 最终处理器拿到的message一定是IPacket,然后调用底层网络接口把数据包发送出去;
- 在MQTT协议中,一般就只有MqttCodec这一个编码器,一步到位把MqttMessage编码为IPacket然后通过网络发送出去;
- 在西门子S7协议中,应用层协议需要经过多次封包,就可以设计多层编码器,A->B,B->C,C->IPacket;
数据包编码器PacketCodec
编码器的设计目标是作为网络粘包处理的基础实现。核心功能:在接收数据包时,调用数据包编码器,根据协议规范尝试解析,如果能够解析得到一个完整报文直接返回,如果不够一个完整报文,则加入缓存,待后续数据包到达再重新尝试解析。
完美的协议设计是每次解析都直接得到报文,不在编码器缓存中驻留任何数据!
编码器缓存驻留的数据越多,说明协议设计越糟糕,不仅浪费内存,还影响吞吐。
可用属性项:
- Expire。缓存有效期。超过该时间后仍未匹配数据包的缓存数据将被抛弃,默认5000ms;
- MaxCache。最大缓存待处理数据。默认1M;
- GetLength。获取长度的委托。本包所应该拥有的总长度,满足该长度后解除一个封包;
- GetLength2。获取长度的委托。本包所应该拥有的总长度,满足该长度后解除一个封包;
两个GetLength,提供其一即可,GetLength2基于Span<Byte>实现,性能更好,优先推荐使用。
总结
编码器是设计网络协议的核心基石,默认实现的几个编码器通过参数设计能够满足95%以上自定义需求。对于MQTT/RocketMQ等协议,编码器架构也能很好实现。