网络库是NewLife系列最成功的作品,自2005年以来已经历过三代重构,最高取得2000万tps吞吐以及单机400万长连接的优秀成绩!
基于网络库的代表作包括:RPC框架ApiServer、HttpServer服务器、反向代理XProxy等。
Nuget包:NewLife.Core
源码地址:https://github.com/NewLifeX/X/blob/master/NewLife.Core/Net/NetServer.cs
最新例程:https://github.com/NewLifeX/X/tree/master/Samples/Zero.Server
例程地址:https://github.com/NewLifeX/NewLife.Zero/tree/master/Zero.TcpServer
快速入门
网络服务器的核心类是 NetServer,而不是TcpServer或UdpServer,前者比后者更高一层。
NetServer的典型用法是编写自定义应用服务器类及会话类,绝大部分业务操作在会话类中完成。每一个客户端连接,对应一个NetSession网络会话实例,主要包括连接OnConnected、断开OnDisconnected、接收OnReceive、错误OnError等核心事件,供用户按需重载。(Udp远程地址端口算一个连接会话)
新建.NET8.0控制台项目,从Nuget引用NewLife.Core,添加MyNetServer.cs,使用以下代码:
using NewLife;
using NewLife.Data;
using NewLife.Model;
using NewLife.Net;
using Zero.TcpServer.Handlers;
namespace Zero.TcpServer;
/// <summary>定义服务端,用于管理所有网络会话</summary>
class MyNetServer : NetServer<MyNetSession>
{
}
/// <summary>定义会话。每一个远程连接唯一对应一个网络会话,再次重复收发信息</summary>
class MyNetSession : NetSession<MyNetServer>
{
private IList<IMsgHandler> _handlers;
/// <summary>客户端连接</summary>
protected override void OnConnected()
{
_handlers = ServiceProvider.GetServices<IMsgHandler>().ToList();
// 发送欢迎语
Send($"Welcome to visit {Environment.MachineName}! [{Remote}]\r\n");
base.OnConnected();
}
/// <summary>客户端断开连接</summary>
protected override void OnDisconnected(String reason)
{
WriteLog("客户端{0}已经断开连接啦。{1}", Remote, reason);
base.OnDisconnected(reason);
}
/// <summary>收到客户端数据</summary>
/// <param name="e"></param>
protected override void OnReceive(ReceivedEventArgs e)
{
if (e.Packet.Total == 0) return;
WriteLog("收到:{0}", e.Packet.ToStr());
//todo 这里是业务处理核心,解开数据包e.Packet并进行业务处理
// 把收到的数据发回去
Send(e.Packet.ToStr().Reverse().Join(null));
}
/// <summary>出错</summary>
/// <param name="sender"></param>
/// <param name="e"></param>
protected override void OnError(Object sender, ExceptionEventArgs e)
{
WriteLog("[{0}]错误:{1}", e.Action, e.Exception?.GetTrue().Message);
base.OnError(sender, e);
}
}
继续编写Main函数
using NewLife.Caching;
using NewLife.Caching.Services;
using NewLife.Data;
using NewLife.Log;
using NewLife.Model;
using Stardust;
using Zero.Server;
using Zero.TcpServer;
using Zero.TcpServer.Handlers;
// 启用控制台日志,拦截所有异常
XTrace.UseConsole();
//var services = new ServiceCollection();
var services = ObjectContainer.Current;
// 配置星尘。自动读取配置文件 config/star.config 中的服务器地址、应用标识、密钥
var star = services.AddStardust();
// 默认内存缓存,如有配置RedisCache可使用Redis缓存
services.AddSingleton<ICacheProvider, RedisCacheProvider>();
// 引入Redis,用于消息队列和缓存,单例,带性能跟踪。一般使用上面的ICacheProvider替代
//services.AddRedis("127.0.0.1:6379", "123456", 3, 5000);
// 注入消息处理器,可注入多个
services.AddTransient<IMsgHandler, MyHandler>();
var provider = services.BuildServiceProvider();
// 实例化网络服务端,指定端口,同时在Tcp/Udp/IPv4/IPv6上监听
var server = new MyNetServer
{
Port = 12345,
ServiceProvider = provider,
Name = "大网服务端",
Log = XTrace.Log,
SessionLog = XTrace.Log,
Tracer = star?.Tracer,
#if DEBUG
SocketLog = XTrace.Log,
LogSend = true,
LogReceive = true,
#endif
};
// 启动网络服务,监听端口,所有逻辑将在 MyNetSession 中处理
server.Start();
XTrace.WriteLine("服务端启动完成!");
// 注册到星尘,非必须
star?.Service?.Register("MyNetServer", () => $"tcp://*:{server.Port},udp://*:{server.Port}");
// 客户端测试,非服务端代码,正式使用时请注释掉
_ = Task.Run(ClientTest.TcpClientTest);
_ = Task.Run(ClientTest.UdpClientTest);
_ = Task.Run(ClientTest.TcpSessionTest);
_ = Task.Run(ClientTest.UdpSessionTest);
// 阻塞,等待友好退出
var host = services.BuildHost();
await host.RunAsync();
新增ClientTest类:
using System.Net.Sockets;
using System.Text;
using NewLife;
using NewLife.Data;
using NewLife.Log;
using NewLife.Net;
namespace Zero.Server;
static class ClientTest
{
/// <summary>TcpClient连接NetServer</summary>
public static async void TcpClientTest()
{
await Task.Delay(1_000);
XTrace.WriteLine("");
XTrace.WriteLine("Tcp客户端开始连接!");
// 连接服务端
var client = new TcpClient();
await client.ConnectAsync("127.0.0.2", 12345);
var ns = client.GetStream();
// 接收服务端握手。连接服务端后,服务端会主动发送数据
var buf = new Byte[1024];
var count = await ns.ReadAsync(buf);
XTrace.WriteLine("<={0}", buf.ToStr(null, 0, count));
// 发送数据
var str = "Hello NewLife";
XTrace.WriteLine("=>{0}", str);
await ns.WriteAsync(str.GetBytes());
// 接收数据
count = await ns.ReadAsync(buf);
XTrace.WriteLine("<={0}", buf.ToStr(null, 0, count));
// 关闭连接
client.Close();
}
/// <summary>UdpClient连接NetServer</summary>
public static async void UdpClientTest()
{
await Task.Delay(2_000);
XTrace.WriteLine("");
XTrace.WriteLine("Udp客户端开始连接!");
// 无需连接服务端
var uri = new NetUri("udp://127.0.0.3:12345");
var client = new UdpClient();
//client.Connect(uri.EndPoint);
// 发送数据。服务端收到第一个包才建立会话
var str = "Hello NewLife";
XTrace.WriteLine("=>{0}", str);
var buf = str.GetBytes();
await client.SendAsync(buf, buf.Length, uri.EndPoint);
// 接收数据。建立会话后,服务端会主动发送握手数据
var result = await client.ReceiveAsync();
XTrace.WriteLine("<={0}", result.Buffer.ToStr());
result = await client.ReceiveAsync();
XTrace.WriteLine("<={0}", result.Buffer.ToStr());
// 发送空包。服务端收到空包后,会关闭连接
buf = new Byte[0];
await client.SendAsync(buf, buf.Length, uri.EndPoint);
// 关闭连接
client.Close();
}
/// <summary>ISocketClient(TCP)连接NetServer</summary>
public static async void TcpSessionTest()
{
await Task.Delay(3_000);
XTrace.WriteLine("");
XTrace.WriteLine("Tcp会话开始连接!");
// 创建客户端,关闭默认的异步模式(MaxAsync=0)
var uri = new NetUri("tcp://127.0.0.4:12345");
var client = uri.CreateRemote();
client.Name = "小tcp客户";
client.Log = XTrace.Log;
if (client is TcpSession tcp) tcp.MaxAsync = 0;
// 接收服务端握手。内部自动建立连接
var rs = await client.ReceiveAsync(default);
client.WriteLog("<={0}", rs.ToStr());
// 发送数据
var str = "Hello NewLife";
client.WriteLog("=>{0}", str);
client.Send(str);
// 接收数据
rs = await client.ReceiveAsync(default);
client.WriteLog("<={0}", rs.ToStr());
// 关闭连接
client.Close("测试完成");
}
/// <summary>ISocketClient(UDP)连接NetServer</summary>
public static async void UdpSessionTest()
{
await Task.Delay(4_000);
XTrace.WriteLine("");
XTrace.WriteLine("Udp会话开始连接!");
// 创建客户端,关闭默认的异步模式(MaxAsync=0)
var uri = new NetUri("udp://127.0.0.4:12345");
var client = uri.CreateRemote();
client.Name = "小udp客户";
client.Log = XTrace.Log;
if (client is UdpServer udp) udp.MaxAsync = 0;
// 发送数据。服务端收到第一个包才建立会话
var str = "Hello NewLife";
client.WriteLog("=>{0}", str);
client.Send(str);
// 接收服务端握手
var rs = await client.ReceiveAsync(default);
client.WriteLog("<={0}", rs.ToStr());
// 接收数据
rs = await client.ReceiveAsync(default);
client.WriteLog("<={0}", rs.ToStr());
// 关闭连接
client.Close("测试完成");
}
}
F5跑起来,可以看到网络服务同时在Tcp/Udp/IPv4/IPv6上监听12345端口。TcpClient客户端通过Tcp直连12345端口,实现了数据包的收发。
核心功能
启动服务
典型的服务端启动代码如下:
// 实例化网络服务端,指定端口,同时在Tcp/Udp/IPv4/IPv6上监听
var server = new MyNetServer
{
Port = 12345,
Log = XTrace.Log,
SessionLog = XTrace.Log,
};
// 启动网络服务,监听端口,所有逻辑将在 MyNetSession 中处理
server.Start();
例程中其它代码,主要为了设置日志,以及配置星尘监控APM。
Start以后即监听端口开始提供服务,Start并不会阻塞主线程,因此Main函数后面需要用户自己处理阻塞。
需要停止服务端时,Stop或者Dispose即可。
会话连接
每一个客户端连接上来时,将会实例化MyNetSession对象,并调用OnConnected方法。这里可以做一些业务处理,或者调用Send方法向客户端发送一些数据。此时客户端仅仅完成三次握手,还没有向服务端发送任何数据。每一个Tcp连接,有且禁用一个NetSession对象实例,可以在此存放一些跟当前连接有关的数据。
/// <summary>客户端连接</summary>
protected override void OnConnected()
{
_handlers = ServiceProvider.GetServices<IMsgHandler>().ToList();
// 发送欢迎语
Send($"Welcome to visit {Environment.MachineName}! [{Remote}]\r\n");
base.OnConnected();
}
Remote属性,表示远程地址和端口。
可用Send方法向客户端发送消息,有以下几个重载:
INetSession Send(IPacket data);
INetSession Send(Stream stream);
INetSession Send(string msg, Encoding encoding = null);
服务端启动后,也可以使用Telnet命令连接:telnet 127.0.0.1 12345
此时Telnet只是连上去,还没有发送数据,就已经收到了服务端发回来的数据。
接收数据
在Telnet连接状态下,按下 CTRL+] 快捷键,进入命令模式,即可使用Send命令发送消息。上述例程中,服务端会话 OnReceive 收到数据后,打印日志并把数据包原样发回去给客户端。
/// <summary>收到客户端数据</summary>
/// <param name="e"></param>
protected override void OnReceive(ReceivedEventArgs e)
{
WriteLog("收到:{0}", e.Packet.ToStr());
//todo 这里是业务处理核心,解开数据包e.Packet并进行业务处理
// 把收到的数据发回去
Send(e.Packet);
}
日志效果如下,可以看到收发的十六进制数据。
OnReceive是最重要的数据处理核心,我们将在这里解析客户端发送过来的数据并根据业务作出响应。
e.Packet保存着这一次接收到数据的原始数据包,里面的Data就是网络缓冲区,但并非整个缓冲区都是当前数据帧。可以通过e.Packet.ReadBytes(-1)拿到字节数组,强烈推荐用e.Packet.GetSpan()得到现代化的Span<Byte>。
在这里,绝大部分用户会遇到Tcp网络粘包问题,也就是一个数据帧里面包含客户端多次发送的数据,“粘”到一帧数据里,被服务端一次性接收了。UDP通信时不会遇到这个问题,TCP通信两次间隔较长时也不会遇到这个问题。NetServer解决Tcp网络粘包问题的标准方案是编写数据编码器,后文有提到内置的多种编码器,用户可根据需要编写自己的编码器,服务端接收到数据后由编码器对数据进行解码拆分。
OnReceive里的e.Message就保存着编码器解码后得到的消息。NewLife.MQTT / NewLife.RocketMQ 以及HttpServer中大量使用编码器。
断开连接
关闭Telnet窗口,服务端将调用断开方法OnDisconnected。
/// <summary>客户端断开连接</summary>
protected override void OnDisconnected()
{
WriteLog("客户端{0}已经断开连接啦", Remote);
base.OnDisconnected();
}
可以看到两行日志,第一行是会话日志,第二行就是例程中的输出。
很多很多人在网络断开的处理中折翼!!!
上面例程中,客户端正常合理的断开,Tcp层会发送相应指令包给服务端,因此服务端得以“感知”网络连接已断开。但是,如果中间网线断了,或者客户端进程直接退出,来不及向服务端发送任何数据包,服务端并不会知道客户端网络已断开。这个时候,服务端会以为客户端还在,直到服务端试图向客户端发送任意数据包,才会捕获到异常,知道连接已断开。同理,服务端异常断开时,客户端也无法感知,直到下一次发送数据。
因此,NetServer内部维持Sessions会话集合,然后定时检查超期不活跃的客户端(默认20分钟),并踢掉!
错误处理
在会话数据处理中,如果遇到错误,将会调用 OnError 方法
/// <summary>出错</summary>
/// <param name="sender"></param>
/// <param name="e"></param>
protected override void OnError(Object sender, ExceptionEventArgs e)
{
WriteLog("[{0}]错误:{1}", e.Action, e.Exception?.GetTrue().Message);
base.OnError(sender, e);
}
高级功能
管道处理器
NetServer 支持管道处理器,可以自定义消息处理逻辑。网络库底层接收到数据包以后,会通过进入管道,经过一系列处理器来对数据进行分层处理。数据包编码器本身也属于管道处理器。
编写处理器如下:
class EchoHandler : Handler
{
/// <summary>性能计数器</summary>
public ICounter Counter { get; set; }
public override Object Read(IHandlerContext context, Object message)
{
var ctx = context as NetHandlerContext;
var session = ctx.Session;
// 性能计数
Counter?.Increment(1, 0);
var pk = message as Packet;
#if DEBUG
session.WriteLog("收到:{0}", pk.ToStr());
#endif
// 把收到的数据发回去
session.SendMessage(pk);
return null;
}
}
服务端启动前添加该处理器即可
server.Add(new EchoHandler());
群发消息
既然每一个客户端连接对应一个NetSession会话,那么找到相应连接的会话,即可向该连接客户端发送消息。NetServer中为此还封装了群发接口:
/// <summary>异步群发数据给所有客户端</summary>
/// <param name="data"></param>
/// <returns>已群发客户端总数</returns>
public virtual Task<Int32> SendAllAsync(Packet data);
/// <summary>异步群发数据给所有客户端</summary>
/// <param name="data"></param>
/// <param name="predicate"></param>
/// <returns>已群发客户端总数</returns>
public virtual Task<Int32> SendAllAsync(Packet data, Func<INetSession, Boolean> predicate = null);
在NetSession会话中,Host属性指向NetServer,因此 Host.SendAllAsync 表示群发。
粘包处理
Tcp粘包是一个非常头疼的问题,内置有多个编码器可处理粘包。粘包编码器本身就属于管道处理器。
StandardCodec,标准SRMP协议编码器,通过增加4字节头部来拆包;
LengthFieldCodec,定长头部编码器,通过在开头指定负载长度来拆包;
SplitDataCodec,字符分割编码器,通过指定头尾特征字符串来拆包;
其他场景使用示例
已有TCP服务再增加个同端口的UDP服务
在这个使用场景下,如要处理Session关闭的事件,需要在NewSession中去订阅Session关闭的OnDisposed事件。
var udpServer = new NetServer(8888);
udpServer.ProtocolType = NetType.Udp; //指定服务类型,如不指定即TCP/UDP一起
udpServer.Name = "UDP服务"; //服务名称任意
udpServer.SessionTimeout = 20;//UDP客户端无数据上传超时,默认有20分钟,根据业务需要这里配置了20秒
udpServer.NewSession += udpServer_NewSession; ;
udpServer.Received += udpServer_Received;
udpServer.Error += udpServer_Error;
udpServer.Start();
//以下是对订阅事件的实现,也可以在上面=>简写具体看个人喜好
//有新客户端接入时的事件订阅处理,注意:UDP只有当客户端发消息时才出发NewSession,然后再是Received。
private void _UdpServer_NewSession(object sender, NetSessionEventArgs e)
{
XTrace.WriteLine("新UDP客户端接入:UDPSessionID:" + e.Session.ID);
//重要:如果要对客户端无消息超时做后续处理,要在这里订阅Session的OnDisposed事件
e.Session.OnDisposed += Session_OnDisposed;
}
//Session超时关闭等事件订阅处理
private void Session_OnDisposed(object sender, EventArgs e)
{
var session = sender as NetSession;
if (session != null)
{
XTrace.WriteLine($"SessionID:({session.ID})UDP释放!!!!!!!!!");
//相关的业务逻辑代码处理无消息超时做后续处理
}
}
//数据接收事件处理
private void _UdpServer_Received(object sender, ReceivedEventArgs e)
{
//如要获取Session相关信息,从sender中获取
var session = sender as NetSession;
//具体的业务代码
}
//服务异常事件处理
private void _UdpServer_Error(object sender, ExceptionEventArgs e)
{
XTrace.WriteException(e.Exception);
}