RocketMQ源自于阿里,实用性较强且伸缩性很不错,适合大中小企业使用。

RocketMQ用Java开发,其非Java的客户端SDK比较弱,特别是.NET版客户端SDK,仅仅是对C++的包装,稳定性较差,在中通使用中发现会有几率导致整个应用进程崩溃。后来需要消费10亿~100亿的RocketMQ消息,用纯托管.NET开发了NewLife.RocketMQ,还支持.NETCore。RocketMQ功能很多,通过简单查看资料以及网络抓包搞定封包协议后,实现常见的发布、消费、平衡等基础指令即可。

NewLife.RocketMQ 开源于 https://github.com/NewLifeX/NewLife.RocketMQ ,Nuget包名 NewLife.RocketMQ

本文将带领读者快速入门,使用RocketMQ的发布与消费基础功能,不包括RocketMQ集群搭建与维护。

基础知识

使用RocketMQ之前,首先得了解一些基础知识,传送门

引用网络上一张架构图来说明RocketMQ各部分之间的关系

从开发使用者角度来讲,消息都是归类到主题Topic之下。在RocketMQ中,一个Topic之下的消息,均匀分布到多个Queue之中存储,而Queue位于各Broker物理机器中。因此,不管一个Topic有多么海量的消息,只要Queue分得足够多,Broker横向扩展,都能通过扩容解决问题。

为了可靠性以及性能,Broker一般采用主从架构,常见一主一从,也可以一主多从。生产者把消息发布到Master上,消费者从Slave上消费读取。通过增加从机,理论上可以支持无限多的消费者。同时也避免了部分Broker主机故障导致丢失消息数据的情况。


名称服务器

想要接入一个RocketMQ集群,首先需要知道名称服务器NameServer,其中存储了所有Topic信息,划分为多少个Queue,分别存储在哪些Broker中。因此,指定NameServer后,不管是生产者还是消费者,NewLife.RocketMQ都会首先查找Topic相关信息,得到Queue和Broker信息后,再去连接各Broker服务器。

很多RocketMQ初学者都在这里狠狠的摔了一跤!NameServer返回Topic不存在!也就是找不到Topic怎么办?在中大型企业,往往有相应的消息队列中间件管理团队,负责Topic创建维护工作。因此RocketMQ并不会自动帮助你创建Topic,需要NewLife.RocketMQ用户通过Producer/Consumer类的CreateTopic方法主动创建Topic。当然,也可以通过RocketMQ控制台创建。创建Topic时就要指定Queue数量,后续不好修改,慎重!

NameServer一般也是集群部署,只要任意节点可用,集群就可以正常提供服务。


创建Topic例子,RocketMQ比较申请,想要创建Topic需要先连接名为TBW102的Topic

var mq = new Producer
{
    //Topic = "nx_test",
    NameServerAddress = "127.0.0.1:9876",

    Log = XTrace.Log,
};

mq.Start();

// 创建topic时,start前不能指定topic,让其使用默认TBW102
Assert.Equal("TBW102", mq.Topic);

mq.CreateTopic("nx_test", 2);

生产者

生产者通过指定NameServer和Topic即可连接RocketMQ集群,发布消息

var mq = new Producer
{
    Topic = "nx_test",
    NameServerAddress = "127.0.0.1:9876",

    Log = XTrace.Log,
};

mq.Start();

for (var i = 0; i < 10; i++)
{
    var str = "学无先后达者为师" + i;
    //var str = Rand.NextString(1337);

    var sr = mq.Publish(str, "TagA");
}

Publish内部将采用RoundRobin均衡算法,轮流把消息发布到不同Queue上,因此Queue内消息均匀主要通过生产者实现。

每个Topic的Producer对象需要保存单例,避免频繁创建。


消费者

消费者除了指定NameServer和Topic外,还需要指定消费组Group。相同消费组共同消费所有消息,一个消息在消费组内只会消费一次,因此可以增加机器来加强该消费组的消息处理能力。同一个消息可以被多个消费组消费。

var consumer = new Consumer
{
    Topic = "nx_test",
    Group = "test",
    NameServerAddress = "127.0.0.1:9876",

    FromLastOffset = true,
    SkipOverStoredMsgCount = 0,
    BatchSize = 20,

    Log = XTrace.Log,
};

consumer.OnConsume = (q, ms) =>
{
    XTrace.WriteLine("[{0}@{1}]收到消息[{2}]", q.BrokerName, q.QueueId, ms.Length);

    foreach (var item in ms.ToList())
    {
        XTrace.WriteLine($"消息:主键【{item.Keys}】,产生时间【{item.BornTimestamp.ToDateTime()}】,内容【{item.Body.ToStr()}】");
    }

    return true;
};

consumer.Start();

每个Topic的Consumer可以创建多个实例,以加强消费处理能力,但总的实例个数不得超过该Topic的Queue个数。

收到消息时,由OnConsume事件触发,该事件内成功处理消息后,Consumer内部将会自动向Broker提交确认。如果消息处理过程中抛出异常,Consumer则不会确认,该消息将会在延迟一定时间后被重新处理。

作为最佳实践,建议消息处理出错时,把消息放入另一个队列去处理,主队列确保正常工作。

RocketMQ的每个Queue默认会记住每个消费组Group消费的偏移量,下次启动时继续从该偏移量开始。指定FromLastOffset后,将会忽略保存的偏移量,直接从最新一条消息开始。

RocketMQ为每一条消息生成一个MsgId,支持各个消费组共同消费,本质上就是记录各个消费组的消费位置。因此,Topic下的Queue,只能有一个客户端消费,否则偏移量就会乱套。

Consumer消费者实现上比较复杂,连接RocketMQ集群后,取得该Topic下共有多少个Queue,还得知道该Group内还有多少个其它消费者,然后把Queue分一分,你一个我一个,最后每个Consumer分得一个或者多个Queue,这就是均衡Rebalance的过程。这里特别重要,一个Queue不能分给多个Consumer,否则会出现重复消费。这也是为什么Consumer实例个数不能超过Queue个数,那样会导致超出部分Consumer分布到Queue而饿死!

阿里云

NewLife.RocketMQ还支持阿里云RocketMQ,设置上多了Server、AccessKey和SecretKey,基本生产和消费用法相同。

using var mq = new Producer
{
    Topic = "test1",
};
mq.Server = "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet";
mq.Configure(MqSetting.Current);

mq.Log = XTrace.Log;

mq.Start();

for (var i = 0; i < 10; i++)
{
    var str = "学无先后达者为师" + i;
    //var str = Rand.NextString(1337);

    var sr = mq.Publish(str, "TagA");
}

阿里云RocketMQ指定一个额外的http地址,用于路由到真正的NameServer,同时还会验证访问密钥。


总结

RocketMQ的扩展能力很不错,吞吐能力强,有阿里背书,这是它很大的优势!

由于Queue个数制约了消费者个数,很容易出现消费倾斜,部分消费者忙死,其它消费者闲死,又不能帮忙,只能干瞪眼。再加上它不支持默认自动创建Topic,大大提升了入门门槛。不适用于中小规模团队,这是比较主要的劣势!


针对中小规模团队,这里强烈推荐Redis消息队列 NewLife.Redis ,它在生产和消费的时候都不需要遭受Queue个数的制约,想开多少就开多少,非常灵活!并且其性能是RocketMQ的数倍。

此处为内容卡片,点击链接查看:https://newlifex.com/go/doc/23445967