网络库提供了数据包编码器架构,用于实现各种数据封包,灵活解决粘包问题。常见SRMP标准封包、固定长度封包、字符分隔封包等,具体应用有MQTT和RocketMQ协议实现。

Nuget包:NewLife.Core

源码地址:https://github.com/NewLifeX/X/blob/master/NewLife.Core/Model/IHandler.cs

Get Started

新建NET7控制台项目,并从Nuget引入 NewLife.Core,写入以下代码:

using NewLife;
using NewLife.Data;
using NewLife.Log;
using NewLife.Net;
using NewLife.Net.Handlers;
using NewLife.Serialization;

XTrace.UseConsole();

var server = new NetServer
{
    Port = 12345,
    Log = XTrace.Log,
    SessionLog = XTrace.Log,
    //SocketLog = XTrace.Log,
    //LogSend = true,
    //LogReceive = true
};
server.Add<LengthFieldCodec>();
server.Received += (s, e) =>
{
    XTrace.WriteLine("原始:{0}", e.Packet.ToHex(32, "-"));

    if (e.Message is IPacket pk)
        XTrace.WriteLine("收到:{0}", pk.ToStr());
};
server.Start();

var uri = new NetUri("tcp://127.0.0.1:12345");
var client = uri.CreateRemote();
client.Log = XTrace.Log;
client.Add<LengthFieldCodec>();
client.Open();

var str = "Stone";
var pk = new Packet(str.GetBytes());

client.SendMessage(pk);

var info = new LoginInfo { UserName = "Stone", Password = "NewLife" };
pk = new Packet(info.ToJson().GetBytes());
client.SendMessage(pk);

Console.ReadLine();

class LoginInfo
{
    public String UserName { get; set; }
    public String Password { get; set; }
}

执行结果

这是一个添加了长度封包编码器 LengthFieldCodec 的网络通信例程,客户端发送了两次消息,SendMessge内部经过编码器对消息进行编码,服务端的编码器对消息进行解码。

    • 发送字符串Stone,本应只有5个字节,服务端收到7个字节,开头多了 05-00,即后续数据长度5字节。
    • 发送一段Json字符串,服务端收到数据头部多了 29-00(0x0029的小端字节序),即后续数据长度41字节

由此可见,数据封包编码器,本质上就是在消息体外部再包装一层,增加一些标识以便于接收方判断消息体如何拆分,处理粘包问题。

长度封包编码器 LengthFieldCodec

常见协议使用指定字段表示负载数据长度,非常推荐使用。如上述例程。

可用属性项如下:

    • Offset。长度的偏移量,截取数据包时加上,否则将会漏掉长度之间的数据包,如MQTT
    • Size。长度占据字节数,1/2/4个字节,0表示压缩编码整数,默认2
    • Expire。过期时间,超过该时间后按废弃数据处理,默认5000ms

长度是2字节时,可传输最大字节数限制在65535。有些协议使用4字节,硬件通信时需要在头部增加4个字节。

字节分割编码器SplitDataCodec

按指定分割字节来处理粘包的处理器,常见于上位机到下位机通信,或者单片机串口通信场景。

可用属性项:

    • SplitData。粘包分割字节数据(默认0x0D,0x0A)
    • MaxCacheDataLength。最大缓存待处理数据,默认1024字节

标准封包编码器StandardCodec

标准网络封包。头部4字节定长。协议 《简易远程消息协议SRMP》

示例代码

using NewLife;
using NewLife.Data;
using NewLife.Log;
using NewLife.Net;
using NewLife.Net.Handlers;
using NewLife.Serialization;

XTrace.UseConsole();

var server = new NetServer
{
    Port = 12345,
    Log = XTrace.Log,
    SessionLog = XTrace.Log,
};
server.Add<StandardCodec>();
server.Received += (s, e) =>
{
    XTrace.WriteLine("原始:{0}", e.Packet.ToHex(32, "-"));

    if (e.Message is IPacket pk)
        XTrace.WriteLine("收到:{0}", pk.ToStr());
};
server.Start();

var uri = new NetUri("tcp://127.0.0.1:12345");
var client = uri.CreateRemote();
client.Log = XTrace.Log;
client.Add<StandardCodec>();
client.Open();

var str = "Stone";
var pk = new Packet(str.GetBytes());

client.SendMessage(pk);

var info = new LoginInfo { UserName = "Stone", Password = "NewLife" };
pk = new Packet(info.ToJson().GetBytes());
client.SendMessage(pk);

Console.ReadLine();

class LoginInfo
{
    public String UserName { get; set; }
    public String Password { get; set; }
}

执行结果

自定义编码器IHandler

可以建立自己的编码器,实现IHandler接口即可,一般通过继承Handler类来简化实现过程。

多个IHandler按照先后顺序形成管道IPipeline,接收网络数据就是顺序通过管道内的各个处理器Read方法,发送网络数据就是逆序通过管道内的各个处理器Write方法。

IHandler接口的完整定义

/// <summary>处理器</summary>
public interface IHandler
{
    /// <summary>上一个处理器</summary>
    IHandler? Prev { get; set; }

    /// <summary>下一个处理器</summary>
    IHandler? Next { get; set; }

    /// <summary>读取数据,返回结果作为下一个处理器消息</summary>
    /// <remarks>
    /// 最终处理器决定如何使用消息。
    /// 处理得到单个消息时,调用一次下一级处理器,返回下级结果给上一级;
    /// 处理得到多个消息时,调用多次下一级处理器,返回null给上一级;
    /// </remarks>
    /// <param name="context">上下文</param>
    /// <param name="message">消息</param>
    Object? Read(IHandlerContext context, Object message);

    /// <summary>写入数据,返回结果作为下一个处理器消息</summary>
    /// <param name="context">上下文</param>
    /// <param name="message">消息</param>
    Object? Write(IHandlerContext context, Object message);

    /// <summary>打开连接</summary>
    /// <param name="context">上下文</param>
    Boolean Open(IHandlerContext context);

    /// <summary>关闭连接</summary>
    /// <param name="context">上下文</param>
    /// <param name="reason">原因</param>
    Boolean Close(IHandlerContext context, String reason);

    /// <summary>发生错误</summary>
    /// <param name="context">上下文</param>
    /// <param name="exception">异常</param>
    Boolean Error(IHandlerContext context, Exception exception);
}

其中核心读写操作:

1. Read 读取数据

接收网络数据,返回结果作为下一个处理器消息。

    • 第一个处理器拿到message就是原始网络数据包,IPacket类型
    • 最终处理器拿到最终解码处理好的消息,一般是应用协议报文(如MqttMessage),并决定如何使用消息。
    • 处理得到单个消息时,调用一次下一级处理器,返回下级结果给上一级;
    • 处理得到多个消息时,调用多次下一级处理器,返回null给上一级;
    • 接收数据进入Read管道时,每个处理器可能调用一次或多次下一级处理器,也可能不调用;
    • 可以在最终处理器里面写业务逻辑,也可以继续用Received事件里访问Message字段得到消息,Packet字段是经过编码器之前的原始数据包;

2. Write 写入数据

发送网络数据,逆序使用IPipeline管道,返回结果作为下一个处理器消息。

    • 第一个处理器拿到message就是用户层发送的应用协议报文(如MqttMessage),处理器需要把该报文转为数据包,例如MqttCodec负责把MqttMessage转为IPacket;
    • 最终处理器拿到的message一定是IPacket,然后调用底层网络接口把数据包发送出去;
    • 在MQTT协议中,一般就只有MqttCodec这一个编码器,一步到位把MqttMessage编码为IPacket然后通过网络发送出去;
    • 在西门子S7协议中,应用层协议需要经过多次封包,就可以设计多层编码器,A->B,B->C,C->IPacket;

数据包编码器PacketCodec

编码器的设计目标是作为网络粘包处理的基础实现。核心功能:在接收数据包时,调用数据包编码器,根据协议规范尝试解析,如果能够解析得到一个完整报文直接返回,如果不够一个完整报文,则加入缓存,待后续数据包到达再重新尝试解析。

完美的协议设计是每次解析都直接得到报文,不在编码器缓存中驻留任何数据!

编码器缓存驻留的数据越多,说明协议设计越糟糕,不仅浪费内存,还影响吞吐。

可用属性项:

    • Expire。缓存有效期。超过该时间后仍未匹配数据包的缓存数据将被抛弃,默认5000ms;
    • MaxCache。最大缓存待处理数据。默认1M;
    • GetLength。获取长度的委托。本包所应该拥有的总长度,满足该长度后解除一个封包;
    • GetLength2。获取长度的委托。本包所应该拥有的总长度,满足该长度后解除一个封包;

两个GetLength,提供其一即可,GetLength2基于Span<Byte>实现,性能更好,优先推荐使用。

总结

编码器是设计网络协议的核心基石,默认实现的几个编码器通过参数设计能够满足95%以上自定义需求。对于MQTT/RocketMQ等协议,编码器架构也能很好实现。