网络库是NewLife系列最成功的作品,自2005年以来已经历过三代重构,最高取得2000万tps吞吐以及单机400万长连接的优秀成绩!

基于网络库的代表作包括:RPC框架ApiServer、HttpServer服务器、反向代理XProxy等。

Nuget包:NewLife.Core

源码地址:https://github.com/NewLifeX/X/blob/master/NewLife.Core/Net/NetServer.cs

最新例程:https://github.com/NewLifeX/X/tree/master/Samples/Zero.Server

例程地址:https://github.com/NewLifeX/NewLife.Zero/tree/master/Zero.TcpServer

快速入门

网络服务器的核心类是 NetServer,而不是TcpServer或UdpServer,前者比后者更高一层。

NetServer的典型用法是编写自定义应用服务器类及会话类,绝大部分业务操作在会话类中完成。每一个客户端连接,对应一个NetSession网络会话实例,主要包括连接OnConnected、断开OnDisconnected、接收OnReceive、错误OnError等核心事件,供用户按需重载。(Udp远程地址端口算一个连接会话)

新建.NET8.0控制台项目,从Nuget引用NewLife.Core,添加MyNetServer.cs,使用以下代码:

using NewLife;
using NewLife.Data;
using NewLife.Model;
using NewLife.Net;
using Zero.TcpServer.Handlers;

namespace Zero.TcpServer;

/// <summary>定义服务端,用于管理所有网络会话</summary>
class MyNetServer : NetServer<MyNetSession>
{
}

/// <summary>定义会话。每一个远程连接唯一对应一个网络会话,再次重复收发信息</summary>
class MyNetSession : NetSession<MyNetServer>
{
    private IList<IMsgHandler> _handlers;

    /// <summary>客户端连接</summary>
    protected override void OnConnected()
    {
        _handlers = ServiceProvider.GetServices<IMsgHandler>().ToList();

        // 发送欢迎语
        Send($"Welcome to visit {Environment.MachineName}!  [{Remote}]\r\n");

        base.OnConnected();
    }

    /// <summary>客户端断开连接</summary>
    protected override void OnDisconnected(String reason)
    {
        WriteLog("客户端{0}已经断开连接啦。{1}", Remote, reason);

        base.OnDisconnected(reason);
    }

    /// <summary>收到客户端数据</summary>
    /// <param name="e"></param>
    protected override void OnReceive(ReceivedEventArgs e)
    {
        if (e.Packet.Total == 0) return;

        WriteLog("收到:{0}", e.Packet.ToStr());

        //todo 这里是业务处理核心,解开数据包e.Packet并进行业务处理

        // 把收到的数据发回去
        Send(e.Packet.ToStr().Reverse().Join(null));
    }

    /// <summary>出错</summary>
    /// <param name="sender"></param>
    /// <param name="e"></param>
    protected override void OnError(Object sender, ExceptionEventArgs e)
    {
        WriteLog("[{0}]错误:{1}", e.Action, e.Exception?.GetTrue().Message);

        base.OnError(sender, e);
    }
}

继续编写Main函数

using NewLife.Caching;
using NewLife.Caching.Services;
using NewLife.Data;
using NewLife.Log;
using NewLife.Model;
using Stardust;
using Zero.Server;
using Zero.TcpServer;
using Zero.TcpServer.Handlers;

// 启用控制台日志,拦截所有异常
XTrace.UseConsole();

//var services = new ServiceCollection();
var services = ObjectContainer.Current;

// 配置星尘。自动读取配置文件 config/star.config 中的服务器地址、应用标识、密钥
var star = services.AddStardust();

// 默认内存缓存,如有配置RedisCache可使用Redis缓存
services.AddSingleton<ICacheProvider, RedisCacheProvider>();

// 引入Redis,用于消息队列和缓存,单例,带性能跟踪。一般使用上面的ICacheProvider替代
//services.AddRedis("127.0.0.1:6379", "123456", 3, 5000);

// 注入消息处理器,可注入多个
services.AddTransient<IMsgHandler, MyHandler>();

var provider = services.BuildServiceProvider();

// 实例化网络服务端,指定端口,同时在Tcp/Udp/IPv4/IPv6上监听
var server = new MyNetServer
{
    Port = 12345,
    ServiceProvider = provider,
    Name = "大网服务端",

    Log = XTrace.Log,
    SessionLog = XTrace.Log,
    Tracer = star?.Tracer,

#if DEBUG
    SocketLog = XTrace.Log,
    LogSend = true,
    LogReceive = true,
#endif
};

// 启动网络服务,监听端口,所有逻辑将在 MyNetSession 中处理
server.Start();
XTrace.WriteLine("服务端启动完成!");

// 注册到星尘,非必须
star?.Service?.Register("MyNetServer", () => $"tcp://*:{server.Port},udp://*:{server.Port}");

// 客户端测试,非服务端代码,正式使用时请注释掉
_ = Task.Run(ClientTest.TcpClientTest);
_ = Task.Run(ClientTest.UdpClientTest);
_ = Task.Run(ClientTest.TcpSessionTest);
_ = Task.Run(ClientTest.UdpSessionTest);

// 阻塞,等待友好退出
var host = services.BuildHost();
await host.RunAsync();

新增ClientTest类:

using System.Net.Sockets;
using System.Text;
using NewLife;
using NewLife.Data;
using NewLife.Log;
using NewLife.Net;

namespace Zero.Server;

static class ClientTest
{
    /// <summary>TcpClient连接NetServer</summary>
    public static async void TcpClientTest()
    {
        await Task.Delay(1_000);
        XTrace.WriteLine("");
        XTrace.WriteLine("Tcp客户端开始连接!");

        // 连接服务端
        var client = new TcpClient();
        await client.ConnectAsync("127.0.0.2", 12345);
        var ns = client.GetStream();

        // 接收服务端握手。连接服务端后,服务端会主动发送数据
        var buf = new Byte[1024];
        var count = await ns.ReadAsync(buf);
        XTrace.WriteLine("<={0}", buf.ToStr(null, 0, count));

        // 发送数据
        var str = "Hello NewLife";
        XTrace.WriteLine("=>{0}", str);
        await ns.WriteAsync(str.GetBytes());

        // 接收数据
        count = await ns.ReadAsync(buf);
        XTrace.WriteLine("<={0}", buf.ToStr(null, 0, count));

        // 关闭连接
        client.Close();
    }

    /// <summary>UdpClient连接NetServer</summary>
    public static async void UdpClientTest()
    {
        await Task.Delay(2_000);
        XTrace.WriteLine("");
        XTrace.WriteLine("Udp客户端开始连接!");

        // 无需连接服务端
        var uri = new NetUri("udp://127.0.0.3:12345");
        var client = new UdpClient();
        //client.Connect(uri.EndPoint);

        // 发送数据。服务端收到第一个包才建立会话
        var str = "Hello NewLife";
        XTrace.WriteLine("=>{0}", str);
        var buf = str.GetBytes();
        await client.SendAsync(buf, buf.Length, uri.EndPoint);

        // 接收数据。建立会话后,服务端会主动发送握手数据
        var result = await client.ReceiveAsync();
        XTrace.WriteLine("<={0}", result.Buffer.ToStr());

        result = await client.ReceiveAsync();
        XTrace.WriteLine("<={0}", result.Buffer.ToStr());

        // 发送空包。服务端收到空包后,会关闭连接
        buf = new Byte[0];
        await client.SendAsync(buf, buf.Length, uri.EndPoint);

        // 关闭连接
        client.Close();
    }

    /// <summary>ISocketClient(TCP)连接NetServer</summary>
    public static async void TcpSessionTest()
    {
        await Task.Delay(3_000);
        XTrace.WriteLine("");
        XTrace.WriteLine("Tcp会话开始连接!");

        // 创建客户端,关闭默认的异步模式(MaxAsync=0)
        var uri = new NetUri("tcp://127.0.0.4:12345");
        var client = uri.CreateRemote();
        client.Name = "小tcp客户";
        client.Log = XTrace.Log;
        if (client is TcpSession tcp) tcp.MaxAsync = 0;

        // 接收服务端握手。内部自动建立连接
        var rs = await client.ReceiveAsync(default);
        client.WriteLog("<={0}", rs.ToStr());

        // 发送数据
        var str = "Hello NewLife";
        client.WriteLog("=>{0}", str);
        client.Send(str);

        // 接收数据
        rs = await client.ReceiveAsync(default);
        client.WriteLog("<={0}", rs.ToStr());

        // 关闭连接
        client.Close("测试完成");
    }

    /// <summary>ISocketClient(UDP)连接NetServer</summary>
    public static async void UdpSessionTest()
    {
        await Task.Delay(4_000);
        XTrace.WriteLine("");
        XTrace.WriteLine("Udp会话开始连接!");

        // 创建客户端,关闭默认的异步模式(MaxAsync=0)
        var uri = new NetUri("udp://127.0.0.4:12345");
        var client = uri.CreateRemote();
        client.Name = "小udp客户";
        client.Log = XTrace.Log;
        if (client is UdpServer udp) udp.MaxAsync = 0;

        // 发送数据。服务端收到第一个包才建立会话
        var str = "Hello NewLife";
        client.WriteLog("=>{0}", str);
        client.Send(str);

        // 接收服务端握手
        var rs = await client.ReceiveAsync(default);
        client.WriteLog("<={0}", rs.ToStr());

        // 接收数据
        rs = await client.ReceiveAsync(default);
        client.WriteLog("<={0}", rs.ToStr());

        // 关闭连接
        client.Close("测试完成");
    }
}

F5跑起来,可以看到网络服务同时在Tcp/Udp/IPv4/IPv6上监听12345端口。TcpClient客户端通过Tcp直连12345端口,实现了数据包的收发。

核心功能

启动服务

典型的服务端启动代码如下:

// 实例化网络服务端,指定端口,同时在Tcp/Udp/IPv4/IPv6上监听
var server = new MyNetServer
{
    Port = 12345,
    Log = XTrace.Log,
    SessionLog = XTrace.Log,
};

// 启动网络服务,监听端口,所有逻辑将在 MyNetSession 中处理
server.Start();

例程中其它代码,主要为了设置日志,以及配置星尘监控APM。

Start以后即监听端口开始提供服务,Start并不会阻塞主线程,因此Main函数后面需要用户自己处理阻塞。

需要停止服务端时,Stop或者Dispose即可。

会话连接

每一个客户端连接上来时,将会实例化MyNetSession对象,并调用OnConnected方法。这里可以做一些业务处理,或者调用Send方法向客户端发送一些数据。此时客户端仅仅完成三次握手,还没有向服务端发送任何数据。每一个Tcp连接,有且禁用一个NetSession对象实例,可以在此存放一些跟当前连接有关的数据。

/// <summary>客户端连接</summary>
protected override void OnConnected()
{
    _handlers = ServiceProvider.GetServices<IMsgHandler>().ToList();
    
    // 发送欢迎语
    Send($"Welcome to visit {Environment.MachineName}!  [{Remote}]\r\n");

    base.OnConnected();
}

Remote属性,表示远程地址和端口。

可用Send方法向客户端发送消息,有以下几个重载:

INetSession Send(IPacket data);
INetSession Send(Stream stream);
INetSession Send(string msg, Encoding encoding = null);

服务端启动后,也可以使用Telnet命令连接:telnet 127.0.0.1 12345

此时Telnet只是连上去,还没有发送数据,就已经收到了服务端发回来的数据。

接收数据

在Telnet连接状态下,按下 CTRL+] 快捷键,进入命令模式,即可使用Send命令发送消息。上述例程中,服务端会话 OnReceive 收到数据后,打印日志并把数据包原样发回去给客户端。

/// <summary>收到客户端数据</summary>
/// <param name="e"></param>
protected override void OnReceive(ReceivedEventArgs e)
{
    WriteLog("收到:{0}", e.Packet.ToStr());

    //todo 这里是业务处理核心,解开数据包e.Packet并进行业务处理

    // 把收到的数据发回去
    Send(e.Packet);
}

日志效果如下,可以看到收发的十六进制数据。

OnReceive是最重要的数据处理核心,我们将在这里解析客户端发送过来的数据并根据业务作出响应。

e.Packet保存着这一次接收到数据的原始数据包,里面的Data就是网络缓冲区,但并非整个缓冲区都是当前数据帧。可以通过e.Packet.ReadBytes(-1)拿到字节数组,强烈推荐用e.Packet.GetSpan()得到现代化的Span<Byte>。

在这里,绝大部分用户会遇到Tcp网络粘包问题,也就是一个数据帧里面包含客户端多次发送的数据,“粘”到一帧数据里,被服务端一次性接收了。UDP通信时不会遇到这个问题,TCP通信两次间隔较长时也不会遇到这个问题。NetServer解决Tcp网络粘包问题的标准方案是编写数据编码器,后文有提到内置的多种编码器,用户可根据需要编写自己的编码器,服务端接收到数据后由编码器对数据进行解码拆分。

OnReceive里的e.Message就保存着编码器解码后得到的消息。NewLife.MQTT / NewLife.RocketMQ 以及HttpServer中大量使用编码器。


断开连接

关闭Telnet窗口,服务端将调用断开方法OnDisconnected。

/// <summary>客户端断开连接</summary>
protected override void OnDisconnected()
{
    WriteLog("客户端{0}已经断开连接啦", Remote);

    base.OnDisconnected();
}

可以看到两行日志,第一行是会话日志,第二行就是例程中的输出。


很多很多人在网络断开的处理中折翼!!!

上面例程中,客户端正常合理的断开,Tcp层会发送相应指令包给服务端,因此服务端得以“感知”网络连接已断开。但是,如果中间网线断了,或者客户端进程直接退出,来不及向服务端发送任何数据包,服务端并不会知道客户端网络已断开。这个时候,服务端会以为客户端还在,直到服务端试图向客户端发送任意数据包,才会捕获到异常,知道连接已断开。同理,服务端异常断开时,客户端也无法感知,直到下一次发送数据。

因此,NetServer内部维持Sessions会话集合,然后定时检查超期不活跃的客户端(默认20分钟),并踢掉


错误处理

在会话数据处理中,如果遇到错误,将会调用 OnError 方法

/// <summary>出错</summary>
/// <param name="sender"></param>
/// <param name="e"></param>
protected override void OnError(Object sender, ExceptionEventArgs e)
{
    WriteLog("[{0}]错误:{1}", e.Action, e.Exception?.GetTrue().Message);

    base.OnError(sender, e);
}

高级功能

管道处理器

NetServer 支持管道处理器,可以自定义消息处理逻辑。网络库底层接收到数据包以后,会通过进入管道,经过一系列处理器来对数据进行分层处理。数据包编码器本身也属于管道处理器。

编写处理器如下:

    class EchoHandler : Handler
    {
        /// <summary>性能计数器</summary>
        public ICounter Counter { get; set; }

        public override Object Read(IHandlerContext context, Object message)
        {
            var ctx = context as NetHandlerContext;
            var session = ctx.Session;

            // 性能计数
            Counter?.Increment(1, 0);

            var pk = message as Packet;
#if DEBUG
            session.WriteLog("收到:{0}", pk.ToStr());
#endif

            // 把收到的数据发回去
            session.SendMessage(pk);

            return null;
        }
    }

服务端启动前添加该处理器即可

server.Add(new EchoHandler());

群发消息

既然每一个客户端连接对应一个NetSession会话,那么找到相应连接的会话,即可向该连接客户端发送消息。NetServer中为此还封装了群发接口:

/// <summary>异步群发数据给所有客户端</summary>
/// <param name="data"></param>
/// <returns>已群发客户端总数</returns>
public virtual Task<Int32> SendAllAsync(Packet data);

/// <summary>异步群发数据给所有客户端</summary>
/// <param name="data"></param>
/// <param name="predicate"></param>
/// <returns>已群发客户端总数</returns>
public virtual Task<Int32> SendAllAsync(Packet data, Func<INetSession, Boolean> predicate = null);

在NetSession会话中,Host属性指向NetServer,因此 Host.SendAllAsync 表示群发。

粘包处理

Tcp粘包是一个非常头疼的问题,内置有多个编码器可处理粘包。粘包编码器本身就属于管道处理器。

StandardCodec,标准SRMP协议编码器,通过增加4字节头部来拆包;

LengthFieldCodec,定长头部编码器,通过在开头指定负载长度来拆包;

SplitDataCodec,字符分割编码器,通过指定头尾特征字符串来拆包;

其他场景使用示例

已有TCP服务再增加个同端口的UDP服务

在这个使用场景下,如要处理Session关闭的事件,需要在NewSession中去订阅Session关闭的OnDisposed事件。

var udpServer = new NetServer(8888);
udpServer.ProtocolType = NetType.Udp; //指定服务类型,如不指定即TCP/UDP一起
udpServer.Name = "UDP服务"; //服务名称任意
udpServer.SessionTimeout = 20;//UDP客户端无数据上传超时,默认有20分钟,根据业务需要这里配置了20秒
udpServer.NewSession += udpServer_NewSession; ;
udpServer.Received += udpServer_Received;
udpServer.Error += udpServer_Error;
udpServer.Start();

//以下是对订阅事件的实现,也可以在上面=>简写具体看个人喜好
//有新客户端接入时的事件订阅处理,注意:UDP只有当客户端发消息时才出发NewSession,然后再是Received。
private void _UdpServer_NewSession(object sender, NetSessionEventArgs e)
{
    XTrace.WriteLine("新UDP客户端接入:UDPSessionID:" + e.Session.ID);
    //重要:如果要对客户端无消息超时做后续处理,要在这里订阅Session的OnDisposed事件
    e.Session.OnDisposed += Session_OnDisposed;

}
//Session超时关闭等事件订阅处理
private void Session_OnDisposed(object sender, EventArgs e)
{
	var session = sender as NetSession;
	if (session != null)
	{
		XTrace.WriteLine($"SessionID:({session.ID})UDP释放!!!!!!!!!");
        //相关的业务逻辑代码处理无消息超时做后续处理
	}
}

//数据接收事件处理
private void _UdpServer_Received(object sender, ReceivedEventArgs e)
{
    //如要获取Session相关信息,从sender中获取
    var session = sender as NetSession; 
    //具体的业务代码
}
//服务异常事件处理
private void _UdpServer_Error(object sender, ExceptionEventArgs e)
{
     XTrace.WriteException(e.Exception);
}