HttpServer是一个轻量级Web服务器,用于在嵌入式设备以及客户端环境中提供简单Web服务,同时也支持标准WebSocket服务。
本文例程基于vs2022,基础例程可参考:
WebSocket服务端
WebSocket服务端功能由HttpServer提供,只是映射到WebSocket特有的处理器上。
vs2022新建.NET6.0控制台项目,Nuget引用 NewLife.Core,使用以下例程:
using NewLife.Http; using NewLife.Log; using System; XTrace.UseConsole(); var server = new HttpServer { Port = 8080, Log = XTrace.Log, SessionLog = XTrace.Log }; server.Map("/ws", new MyWebSocket()); server.Start(); Console.ReadLine(); class MyWebSocket : IHttpHandler { /// <summary>处理请求</summary> /// <param name="context"></param> public virtual void ProcessRequest(IHttpContext context) { var ws = context.WebSocket; ws.Handler = ProcessMessage; WriteLog("WebSocket连接 {0}", context.Connection.Remote); } /// <summary>处理消息</summary> /// <param name="socket"></param> /// <param name="message"></param> public virtual void ProcessMessage(WebSocket socket, WebSocketMessage message) { var remote = socket.Context.Connection.Remote; var msg = message.Payload?.ToStr(); switch (message.Type) { case WebSocketMessageType.Text: WriteLog("WebSocket收到[{0}] {1}", message.Type, msg); // 群发所有客户端 socket.SendAll($"[{remote}]说,{msg}"); break; case WebSocketMessageType.Close: WriteLog("WebSocket关闭[{0}] [{1}] {2}", remote, message.CloseStatus, message.StatusDescription); break; case WebSocketMessageType.Ping: case WebSocketMessageType.Pong: WriteLog("WebSocket心跳[{0}] {1}", message.Type, msg); break; default: WriteLog("WebSocket收到[{0}] {1}", message.Type, msg); break; } } private void WriteLog(String format, params Object[] args) => XTrace.WriteLine(format, args); }
映射路由/ws到一个自定义处理器MyWebSocket上,该处理器包括了 ProcessRequest 和 ProcessMessage 。
- ProcessRequest。收到WebSocket请求时触发一次,此时可验证访问者是否合法,例如借助JWT等Token技术。Handler属性设置为ProcessMessage,用于处理后续WebSocket消息。
- ProcessMessage。建立WebSocket握手后,每次收到WebSocket消息(数据帧),都将调用该方法,包括二进制、文本、心跳和断开等多种消息类型。
- Send。发送消息给客户端。
- SendAll。群发消息给所有客户端。
- Close。关闭连接。
跑起来:
可以看到,仍然是普通HttpServer监听8080端口。保持打开,不要关闭,下面客户端测试需要用到
WebClient客户端
借助.NET自身的ClientWebSocket,可以轻松构建WebSocket通信。
vs2022新建.NET6.0控制台项目,Nuget引用 NewLife.Core,使用以下例程:
using NewLife; using NewLife.Data; using NewLife.Log; using System; using System.Net.WebSockets; XTrace.UseConsole(); var client = new ClientWebSocket(); await client.ConnectAsync(new Uri("ws://127.0.0.1:8080/ws"), default); await client.SendAsync("Hello NewLife".GetBytes(), WebSocketMessageType.Text, true, default); var buf = new Byte[1024]; var rs = await client.ReceiveAsync(buf, default); XTrace.WriteLine(new Packet(buf, 0, rs.Count).ToStr()); await client.CloseAsync(WebSocketCloseStatus.NormalClosure, "通信完成", default); XTrace.WriteLine("Close [{0}] {1}", client.CloseStatus, client.CloseStatusDescription); Console.ReadLine();
建立到服务端的连接后,向服务端发送字符串“Hello NewLife”,然后使用1024缓冲区接收一次响应数据,接着友好断开连接。
跑起来:
查看服务端:
可以看到,服务端ProcessRequest收到了客户端的WebSocket连接请求。两次ProcessMessage,第一次收到Text数据帧,也就是文本“Hello NewLife”,第二次是Close数据帧。
客户端也收到了服务端SendAll群发的数据,感兴趣的同学可以多开几个客户端试试。
物联网平台中使用
在物联网平台中,设备与服务端建立WebSocket长连接后,可以实时下发通知。
我们使用消息队列架构,如果队列中有消息,则通过WebSocket推给设备端。
消息大循环结合WebSocket如下:
private async Task consumeMessage(WebSocket socket, String node, CancellationTokenSource source) { var cancellationToken = source.Token; var queue = QueueHost.GetQueue<String>($"cmd:{node}"); try { while (!cancellationToken.IsCancellationRequested) { var msg = await queue.TakeOneAsync(10_000); if (msg != null) { XTrace.WriteLine("WebSocket发送 {0} {1}", node, msg); socket.Send(msg.GetBytes(), WebSocketMessageType.Text); } else { await Task.Delay(1_000, cancellationToken); } } } catch (Exception ex) { XTrace.WriteException(ex); } finally { source.Cancel(); } }
队列主机QueueHost是ICache类型,可以使用FullRedis(nuget包NewLife.Redis),简单测试时使用MemoryCache。
每个客户端设备有一个独立的Topic,例如上面的 cmd:{node},从队列主机QueueHost中获取该Topic对应的队列,即可消费。
核心原理:拉取式消费消息队列(异步阻塞),然后逐个消息通过WebSocket连接下发!
其它模块或其它应用,如果需要下发通知消息给客户端设备,可以向队列主机QueueHost的该Topic发送消息,这里的大循环消费到以后,发送给设备客户端。
如果需要跨应用系统推送下行通知消息,队列主机必须使用Redis等支持多应用访问的消息队列中间件。
再修改 ProcessRequest ,握手后异步启动大循环:
/// <summary>处理请求</summary> /// <param name="context"></param> public virtual void ProcessRequest(IHttpContext context) { var ws = context.WebSocket; ws.Handler = ProcessMessage; var source = new CancellationTokenSource(); Task.Run(() => consumeMessage(ws, "nodeCode", source)); WriteLog("WebSocket连接 {0}", context.Connection.Remote); }
借助Redis消息队列,每个设备一个Topic,对应一个WebSocket连接和消费大循环。
注:以上代码来自星尘 StarServer。
整体架构如下: