MQTT协议是物联网领域最流行的通信协议!NewLife.MQTT包含了MQTT的完整实现,并实现了客户端MqttClient,以及服务端MqttServer。其中MqttServer仅实现基本网络框架,支持消息收发,完整的消息交换功能位于商用版IoT平台NewLife.IoT中。


Nuget包:NewLife.MQTT

源码地址:https://github.com/NewLifeX/NewLife.MQTT


快速入门

提前准备EMQX,作为MQTT服务端,可通过docker安装。

打开vs2022,新建.NET6.0控制台项目,从nuget引入NewLife.MQTT,写入以下代码:

var client = new MqttClient
{
    Server = "tcp://127.0.0.1:1883",
    Log = XTrace.Log
};

await client.ConnectAsync();

var rt = await client.SubscribeAsync("/test/#", (e) =>
{
    XTrace.WriteLine("sub:" + "/test/# =>" + e.Topic + ":" + e.Payload.ToStr());
});

for (var i = 0; i < 10; i++)
{
    var qos = (QualityOfService)(i % 3);

    await client.PublishAsync("test", new { name = "p" + i, value = Rand.Next() }, qos);

    await Task.Delay(1000);
}

await client.DisconnectAsync();

以上代码实现了MQTT客户端的基本功能,连接、订阅、发布、断开。MQTT作为消息队列的一种,核心功能仍然是发布于订阅。

发布:就是把信息发送到队列中指定Topic主题。

订阅:就是“监听”指定Topic主题的消息(Topic支持通配符),收到消息时执行回调函数。

绝大多数时候,发布和订阅的消息体就是Json字符串。由于MQTT属于轻量级消息队列。常见于嵌入式硬件中集成使用,低端硬件甚至可能连Json都难以支持,因此有时候会出现字符串拼接的情况,消费者根据实际情况拆分即可。即使再难,也没见过有嵌入式硬件直接使用MQTT传递二进制,基本都是字符串,即使要传二进制也是base64形式。

MQTT协议


最流行的物联网通信协议MQTT,包括客户端、服务端和Web管理平台。


提供订阅/发布模式,更为简约、轻量,易于使用,针对受限环境(带宽低、网络延迟高、网络通信不稳定),可以简单概括为物联网打造,官方总结特点如下:
1.使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合。
2.对负载内容屏蔽的消息传输。
3.使用 TCP/IP 提供网络连接。
4.有三种消息发布服务质量:
“至多一次”,消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。
“至少一次”,确保消息到达,但消息重复可能会发生。
“只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。
5.小型传输,开销很小(固定长度的头部是 2 字节),协议交换最小化,以降低网络流量。
6.使用 Last Will 和 Testament 特性通知有关各方客户端异常中断的机制。


MQTT 发布与订阅


发布时,指定消息Qos,broker保存的消息包含了Qos;
订阅时,指定这次订阅要求的Qos,broker回复授权使用的Qos,一般就是申请那个;
消费时,消息的Qos取发布订阅中较小者!


详细场景:
订阅Qos=0,不管发布什么消息,消费到的消息Qos都是0;
订阅Qos=1,发布消息Qos=0时,消费得到Qos=0,发布消息Qos=1或2时,消费得到Qos=1;
订阅Qos=2,消费得到的消息Qos,就是发布时的Qos;
发布Qos=0,broker不做任何答复,理论上中途丢了都不知道,但是因为Tcp,如果网络异常客户端能发现;
发布Qos=1,broker答复PubAck,表示已经收到消息;
发布Qos=2,broker答复PubRec,客户端再次发送PubRel,broker答复PubComp,消息才算发布完成;
订阅Qos=2,broker推送Qos=2消息,客户端先回PubRec,broker再次发送PubRel,客户端答复PubComp,消息才算消费完成;
发布Qos=2消息时,双重确认流程不需要等消费端在线,仅限于发布者与broker之间即可完成。

MQTT客户端

为了安全,MQTT通信一般要求 UserName+Password+ClientId 这三个要素。

以下示例演示了如何设置这三个要素,以及通过事件统一接收消息。

var mc = new MqttClient
{
    Log = XTrace.Log,
    Server = "tcp://127.0.0.1:1883",

    ClientId = Environment.MachineName,
    UserName = "stone",
    Password = "Pass@word",
};

if (_client == null)
{
    mc.Received += (s, e) =>
    {
        var pm = e.Arg;
        var msg = pm.Payload.ToStr();
        _mq.Enqueue(msg);

        XTrace.WriteLine("消费消息:[{0}] {1}", pm.Topic, msg);
    };

    _client = mc;
}

await _client.SubscribeAsync(new[] { "newlifeTopic", "QosTopic" });

UserName+Password用于验证接入权限,ClientId用于标识客户端唯一性,因为有可能多客户端共用UserName,例如各种智能硬件,它们就必须使用不同的ClientId。同一个ClientId上线时,一般MqttBroker会把该ClientId的前一个登录会话给踢下线。

接入阿里云MQTT

阿里云物联网平台支持MQTT接入,只是UserName和ClientId传值有所不同。

var client = new AliyunMqttClient("a18RQ72tLHD", "dev1", "6oSl3CjHKM13J50DVVWNF3WbWWJjhAUf");
client.Log = XTrace.Log;
client.Server = $"tcp://{client.ProductKey}.iot-as-mqtt.cn-shanghai.aliyuncs.com:443";

await client.ConnectAsync();
await client.SyncTime();
await client.PostProperty(new
{
    // 温度
    Temperature = Rand.Next(-4000, 120_00) / 100d,
    // 相对湿度
    RelativeHumidity = Rand.Next(0, 100_00) / 100d,
    // 风向
    WindDirection = Rand.Next(0, 360_00) / 100d,
    // 氟化物浓度
    Fluoride = Rand.Next(0, 10000_00) / 100d,
    // 空气质量指数
    AQI = Rand.Next(0, 500),
    // 首要污染物
    PrimaryItem = Rand.NextString(32),
    // 地理位置
    GeoLocation = new
    {
        Longitude = Rand.Next(-180_00, 180_00) / 100d,
        Latitude = Rand.Next(-180_00, 180_00) / 100d,
        Altitude = Rand.Next(0, 10000_00) / 100d,
        // 1=WGS_84, 2=GCJ_02
        CoordinateSystem = 1,
    },
});

这里通过指定阿里云IOT的产品信息和设备信息,完成接入阿里云IOT平台,并发布设备属性数据。

MQTT服务端

MqttServer是内置的服务端实现,我们需要写一个消息处理器,在该处理器内部,可以完成数据接收等工作。

private class MyHandler : MqttHandler
{
    private readonly ILog _log;

    public MyHandler(ILog log) => _log = log;

    protected override ConnAck OnConnect(INetSession session, ConnectMessage message)
    {
        _log.Info("客户端[{0}]连接 user={0} pass={1} clientId={2}", session.Remote.EndPoint, message.Username, message.Password, message.ClientId);

        return base.OnConnect(session, message);
    }

    protected override MqttMessage OnDisconnect(INetSession session, DisconnectMessage message)
    {
        _log.Info("客户端[{0}]断开", session.Remote);

        return base.OnDisconnect(session, message);
    }

    protected override MqttIdMessage OnPublish(INetSession session, PublishMessage message)
    {
        _log.Info("发布[{0}:qos={1}]: {2}", message.Topic, (Int32)message.QoS, message.Payload.ToStr());

        return base.OnPublish(session, message);
    }
}

然后再写个启动程序,这里通过ioc依赖注入来注册处理器。

var ioc = ObjectContainer.Current;
ioc.AddSingleton<ILog>(XTrace.Log);
ioc.AddTransient<IMqttHandler, MyHandler>();

var server = new MqttServer
{
    Provider = ioc.BuildServiceProvider(),

    Log = XTrace.Log,
    SessionLog = XTrace.Log,
};
//server.AddHandler(new MyHandler());
server.Start();

MqttServer默认监听1883端口,可以通过Port属性设置。服务端启动后,即可监听端口,登录Mqtt客户端连接。

我们在同一个项目里启动服务端和客户端,可以看到双方的消息交换。其中向左箭头表示收到,向右箭头表示发出。Mqtt[1]表示服务端的1号会话,[Mqtt]表示客户端。