网络库是NewLife系列最成功的作品,自2005年以来已经历过三代重构,最高取得2000万tps吞吐以及单机400万长连接的优秀成绩!

基于网络库的代表作包括:RPC框架ApiServer、HttpServer服务器、反向代理XProxy等。

Nuget包:NewLife.Core

源码地址:https://github.com/NewLifeX/X/blob/master/NewLife.Core/Net/NetServer.cs

例程地址:https://github.com/NewLifeX/NewLife.Zero/tree/master/Zero.TcpServer

快速入门

网络服务器的核心类是 NetServer,而不是TcpServer或UdpServer,前者比后者更高一层。

NetServer的典型用法是编写自定义应用服务器类及会话类,绝大部分业务操作在会话类中完成。每一个客户端连接,对应一个NetSession网络会话实例,主要包括连接OnConnected、断开OnDisconnected、接收OnReceive、错误OnError等核心事件,供用户按需重载。(Udp远程地址端口算一个连接会话)

新建.NET5.0控制台项目,添加MyNetServer.cs,使用以下代码:

using System;
using NewLife;
using NewLife.Net;

namespace Zero.TcpServer
{
    /// <summary>定义服务端,用于管理所有网络会话</summary>
    class MyNetServer : NetServer<MyNetSession>
    {
    }

    /// <summary>定义会话。每一个远程连接唯一对应一个网络会话,再次重复收发信息</summary>
    class MyNetSession : NetSession<MyNetServer>
    {
        /// <summary>客户端连接</summary>
        protected override void OnConnected()
        {
            // 发送欢迎语
            Send($"Welcome to visit {Environment.MachineName}!  [{Remote}]\r\n");

            base.OnConnected();
        }

        /// <summary>客户端断开连接</summary>
        protected override void OnDisconnected()
        {
            WriteLog("客户端{0}已经断开连接啦", Remote);

            base.OnDisconnected();
        }

        /// <summary>收到客户端数据</summary>
        /// <param name="e"></param>
        protected override void OnReceive(ReceivedEventArgs e)
        {
            WriteLog("收到:{0}", e.Packet.ToStr());

            //todo 这里是业务处理核心,解开数据包e.Packet并进行业务处理

            // 把收到的数据发回去
            Send(e.Packet);
        }

        /// <summary>出错</summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        protected override void OnError(Object sender, ExceptionEventArgs e)
        {
            WriteLog("[{0}]错误:{1}", e.Action, e.Exception?.GetTrue().Message);

            base.OnError(sender, e);
        }
    }
}

继续编写Main函数

using System;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
using NewLife;
using NewLife.Log;
using Stardust;

namespace Zero.TcpServer
{
    internal class Program
    {
        private static void Main(String[] args)
        {
            // 启用控制台日志,拦截所有异常
            XTrace.UseConsole();

            // 配置星尘。自动读取配置文件 config/star.config 中的服务器地址、应用标识、密钥
            var star = new StarFactory(null, null, null);
            if (star.Server.IsNullOrEmpty()) star = null;

            // 实例化网络服务端,指定端口,同时在Tcp/Udp/IPv4/IPv6上监听
            var server = new MyNetServer
            {
                Port = 12345,
                Log = XTrace.Log,
                SessionLog = XTrace.Log,
                Tracer = star?.Tracer,

#if DEBUG
                SocketLog = XTrace.Log,
                LogSend = true,
                LogReceive = true,
#endif
            };

            // 启动网络服务,监听端口,所有逻辑将在 MyNetSession 中处理
            server.Start();

            // 注册到星尘,非必须
            star?.Service.Register("MyNetServer", () => $"tcp://*:{server.Port},udp://*:{server.Port}");

            // 客户端测试,非服务端代码
            Task.Run(ClientTest);

            // 友好退出
            //ObjectContainer.Current.BuildHost().Run();
            Thread.Sleep(-1);
        }

        private static async void ClientTest()
        {
            await Task.Delay(1_000);

            var client = new TcpClient();
            await client.ConnectAsync("127.0.0.1", 12345);
            var ns = client.GetStream();

            // 接收服务端握手
            var buf = new Byte[1024];
            var count = await ns.ReadAsync(buf);
            XTrace.WriteLine("<={0}", buf.ToStr(null, 0, count));

            // 发送数据
            var str = "Hello NewLife";
            XTrace.WriteLine("=>{0}", str);
            await ns.WriteAsync(str.GetBytes());

            // 接收数据
            count = await ns.ReadAsync(buf);
            XTrace.WriteLine("<={0}", buf.ToStr(null, 0, count));
        }
    }
}

F5跑起来,可以看到网络服务同时在Tcp/Udp/IPv4/IPv6上监听12345端口。TcpClient客户端通过Tcp直连12345端口,实现了数据包的收发。

核心功能

启动服务

典型的服务端启动代码如下:

// 实例化网络服务端,指定端口,同时在Tcp/Udp/IPv4/IPv6上监听
var server = new MyNetServer
{
    Port = 12345,
    Log = XTrace.Log,
    SessionLog = XTrace.Log,
};

// 启动网络服务,监听端口,所有逻辑将在 MyNetSession 中处理
server.Start();

例程中其它代码,主要为了设置日志,以及配置星尘监控APM。

Start以后即监听端口开始提供服务,Start并不会阻塞主线程,因此Main函数后面需要用户自己处理阻塞。

需要停止服务端时,Stop或者Dispose即可。

会话连接

每一个客户端连接上来时,将会实例化MyNetSession对象,并调用OnConnected方法。这里可以做一些业务处理,或者调用Send方法向客户端发送一些数据。此时客户端仅仅完成三次握手,还没有向服务端发送任何数据。

/// <summary>客户端连接</summary>
protected override void OnConnected()
{
    // 发送欢迎语
    Send($"Welcome to visit {Environment.MachineName}!  [{Remote}]\r\n");

    base.OnConnected();
}

Remote属性,表示远程地址和端口。

可用Send方法向客户端发送消息,有以下几个重载:

INetSession Send(Packet data);
INetSession Send(Stream stream);
INetSession Send(string msg, Encoding encoding = null);

服务端启动后,也可以使用Telnet命令连接:telnet 127.0.0.1 12345

此时Telnet只是连上去,还没有发送数据,就已经收到了服务端发回来的数据。

接收数据

在Telnet连接状态下,按下 CTRL+] 快捷键,进入命令模式,即可使用Send命令发送消息。上述例程中,服务端会话 OnReceive 收到数据后,打印日志并把数据包原样发回去给客户端。

/// <summary>收到客户端数据</summary>
/// <param name="e"></param>
protected override void OnReceive(ReceivedEventArgs e)
{
    WriteLog("收到:{0}", e.Packet.ToStr());

    //todo 这里是业务处理核心,解开数据包e.Packet并进行业务处理

    // 把收到的数据发回去
    Send(e.Packet);
}

日志效果如下,可以看到收发的十六进制数据。


断开连接

关闭Telnet窗口,服务端将调用断开方法OnDisconnected。

/// <summary>客户端断开连接</summary>
protected override void OnDisconnected()
{
    WriteLog("客户端{0}已经断开连接啦", Remote);

    base.OnDisconnected();
}

可以看到两行日志,第一行是会话日志,第二行就是例程中的输出。


很多很多人在网络断开的处理中折翼!!!

上面例程中,客户端正常合理的断开,Tcp层会发送相应指令包给服务端,因此服务端得意“感知”链接已断开。但是,如果中间网线断了,或者客户端进程直接退出,来不及向服务端发送任何数据包,服务端是不会知道客户端网络已断开的。这个时候,服务端会以为客户端还在,直到服务端试图向客户端发送任意数据包,才会捕获到异常,知道连接已断开。同理,服务端异常断开时,客户端也无法感知,直到下一次发送数据。

因此,NetServer内部维持Sessions会话集合,然后定时检查超期不活跃的客户端(默认20分钟),并踢掉!


错误处理

在会话数据处理中,如果遇到错误,将会调用 OnError 方法

/// <summary>出错</summary>
/// <param name="sender"></param>
/// <param name="e"></param>
protected override void OnError(Object sender, ExceptionEventArgs e)
{
    WriteLog("[{0}]错误:{1}", e.Action, e.Exception?.GetTrue().Message);

    base.OnError(sender, e);
}

高级功能

管道处理器

NetServer 支持管道处理器,可以自定义消息处理逻辑。编写处理器如下:

    class EchoHandler : Handler
    {
        /// <summary>性能计数器</summary>
        public ICounter Counter { get; set; }

        public override Object Read(IHandlerContext context, Object message)
        {
            var ctx = context as NetHandlerContext;
            var session = ctx.Session;

            // 性能计数
            Counter?.Increment(1, 0);

            var pk = message as Packet;
#if DEBUG
            session.WriteLog("收到:{0}", pk.ToStr());
#endif

            // 把收到的数据发回去
            session.SendMessage(pk);

            return null;
        }
    }

服务端启动前添加该处理器即可

server.Add(new EchoHandler());

群发消息

既然每一个客户端连接对应一个NetSession会话,那么找到相应连接的会话,即可向该连接客户端发送消息。NetServer中为此还封装了群发接口:

/// <summary>异步群发数据给所有客户端</summary>
/// <param name="data"></param>
/// <returns>已群发客户端总数</returns>
public virtual Task<Int32> SendAllAsync(Packet data);

/// <summary>异步群发数据给所有客户端</summary>
/// <param name="data"></param>
/// <param name="predicate"></param>
/// <returns>已群发客户端总数</returns>
public virtual Task<Int32> SendAllAsync(Packet data, Func<INetSession, Boolean> predicate = null);

在NetSession会话中,Host属性指向NetServer,因此 Host.SendAllAsync 表示群发。

粘包处理

Tcp粘包是一个非常头疼的问题,内置有多个编码器可处理粘包。

StandardCodec,标准SRMP协议编码器,通过增加4字节头部来拆包;

LengthFieldCodec,定长头部编码器,通过在开头指定负载长度来拆包;

SplitDataCodec,字符分割编码器,通过指定头尾特征字符串来拆包;

其他场景使用示例

已有TCP服务再增加个同端口的UDP服务

在这个使用场景下,如要处理Session关闭的事件,需要在NewSession中去订阅Session关闭的OnDisposed事件。

var udpServer = new NetServer(8888);
udpServer.ProtocolType = NetType.Udp; //指定服务类型,如不指定即TCP/UDP一起
udpServer.Name = "UDP服务"; //服务名称任意
udpServer.SessionTimeout = 20;//UDP客户端无数据上传超时,默认有20分钟,根据业务需要这里配置了20秒
udpServer.NewSession += udpServer_NewSession; ;
udpServer.Received += udpServer_Received;
udpServer.Error += udpServer_Error;
udpServer.Start();

//以下是对订阅事件的实现,也可以在上面=>简写具体看个人喜好
//有新客户端接入时的事件订阅处理,注意:UDP只有当客户端发消息时才出发NewSession,然后再是Received。
private void _UdpServer_NewSession(object sender, NetSessionEventArgs e)
{
    XTrace.WriteLine("新UDP客户端接入:UDPSessionID:" + e.Session.ID);
    //重要:如果要对客户端无消息超时做后续处理,要在这里订阅Session的OnDisposed事件
    e.Session.OnDisposed += Session_OnDisposed;

}
//Session超时关闭等事件订阅处理
private void Session_OnDisposed(object sender, EventArgs e)
{
	var session = sender as NetSession;
	if (session != null)
	{
		XTrace.WriteLine($"SessionID:({session.ID})UDP释放!!!!!!!!!");
        //相关的业务逻辑代码处理无消息超时做后续处理
	}
}

//数据接收事件处理
private void _UdpServer_Received(object sender, ReceivedEventArgs e)
{
    //如要获取Session相关信息,从sender中获取
    var session = sender as NetSession; 
    //具体的业务代码
}
//服务异常事件处理
private void _UdpServer_Error(object sender, ExceptionEventArgs e)
{
     XTrace.WriteException(e.Exception);
}