RocketMQ源自于阿里,实用性较强且伸缩性很不错,适合大中小企业使用。
RocketMQ用Java开发,其非Java的客户端SDK比较弱,特别是.NET版客户端SDK,仅仅是对C++的包装,稳定性较差,在中通使用中发现会有几率导致整个应用进程崩溃。后来需要消费10亿~100亿的RocketMQ消息,用纯托管.NET开发了NewLife.RocketMQ,还支持.NETCore。RocketMQ功能很多,通过简单查看资料以及网络抓包搞定封包协议后,实现常见的发布、消费、平衡等基础指令即可。
NewLife.RocketMQ 开源于 https://github.com/NewLifeX/NewLife.RocketMQ ,Nuget包名 NewLife.RocketMQ 。
本文将带领读者快速入门,使用RocketMQ的发布与消费基础功能,不包括RocketMQ集群搭建与维护。
基础知识
使用RocketMQ之前,首先得了解一些基础知识,传送门。
引用网络上一张架构图来说明RocketMQ各部分之间的关系
从开发使用者角度来讲,消息都是归类到主题Topic之下。在RocketMQ中,一个Topic之下的消息,均匀分布到多个Queue之中存储,而Queue位于各Broker物理机器中。因此,不管一个Topic有多么海量的消息,只要Queue分得足够多,Broker横向扩展,都能通过扩容解决问题。
为了可靠性以及性能,Broker一般采用主从架构,常见一主一从,也可以一主多从。生产者把消息发布到Master上,消费者从Slave上消费读取。通过增加从机,理论上可以支持无限多的消费者。同时也避免了部分Broker主机故障导致丢失消息数据的情况。
名称服务器
想要接入一个RocketMQ集群,首先需要知道名称服务器NameServer,其中存储了所有Topic信息,划分为多少个Queue,分别存储在哪些Broker中。因此,指定NameServer后,不管是生产者还是消费者,NewLife.RocketMQ都会首先查找Topic相关信息,得到Queue和Broker信息后,再去连接各Broker服务器。
很多RocketMQ初学者都在这里狠狠的摔了一跤!NameServer返回Topic不存在!也就是找不到Topic怎么办?在中大型企业,往往有相应的消息队列中间件管理团队,负责Topic创建维护工作。因此RocketMQ并不会自动帮助你创建Topic,需要NewLife.RocketMQ用户通过Producer/Consumer类的CreateTopic方法主动创建Topic。当然,也可以通过RocketMQ控制台创建。创建Topic时就要指定Queue数量,后续不好修改,慎重!
NameServer一般也是集群部署,只要任意节点可用,集群就可以正常提供服务。
创建Topic例子,RocketMQ比较申请,想要创建Topic需要先连接名为TBW102的Topic
var mq = new Producer { //Topic = "nx_test", NameServerAddress = "127.0.0.1:9876", Log = XTrace.Log, }; mq.Start(); // 创建topic时,start前不能指定topic,让其使用默认TBW102 Assert.Equal("TBW102", mq.Topic); mq.CreateTopic("nx_test", 2);
生产者
生产者通过指定NameServer和Topic即可连接RocketMQ集群,发布消息
var mq = new Producer { Topic = "nx_test", NameServerAddress = "127.0.0.1:9876", Log = XTrace.Log, }; mq.Start(); for (var i = 0; i < 10; i++) { var str = "学无先后达者为师" + i; //var str = Rand.NextString(1337); var sr = mq.Publish(str, "TagA"); }
Publish内部将采用RoundRobin均衡算法,轮流把消息发布到不同Queue上,因此Queue内消息均匀主要通过生产者实现。
每个Topic的Producer对象需要保存单例,避免频繁创建。
消费者
消费者除了指定NameServer和Topic外,还需要指定消费组Group。相同消费组共同消费所有消息,一个消息在消费组内只会消费一次,因此可以增加机器来加强该消费组的消息处理能力。同一个消息可以被多个消费组消费。
var consumer = new Consumer { Topic = "nx_test", Group = "test", NameServerAddress = "127.0.0.1:9876", FromLastOffset = true, SkipOverStoredMsgCount = 0, BatchSize = 20, Log = XTrace.Log, }; consumer.OnConsume = (q, ms) => { XTrace.WriteLine("[{0}@{1}]收到消息[{2}]", q.BrokerName, q.QueueId, ms.Length); foreach (var item in ms.ToList()) { XTrace.WriteLine($"消息:主键【{item.Keys}】,产生时间【{item.BornTimestamp.ToDateTime()}】,内容【{item.Body.ToStr()}】"); } return true; }; consumer.Start();
每个Topic的Consumer可以创建多个实例,以加强消费处理能力,但总的实例个数不得超过该Topic的Queue个数。
收到消息时,由OnConsume事件触发,该事件内成功处理消息后,Consumer内部将会自动向Broker提交确认。如果消息处理过程中抛出异常,Consumer则不会确认,该消息将会在延迟一定时间后被重新处理。
作为最佳实践,建议消息处理出错时,把消息放入另一个队列去处理,主队列确保正常工作。
RocketMQ的每个Queue默认会记住每个消费组Group消费的偏移量,下次启动时继续从该偏移量开始。指定FromLastOffset后,将会忽略保存的偏移量,直接从最新一条消息开始。
RocketMQ为每一条消息生成一个MsgId,支持各个消费组共同消费,本质上就是记录各个消费组的消费位置。因此,Topic下的Queue,只能有一个客户端消费,否则偏移量就会乱套。
Consumer消费者实现上比较复杂,连接RocketMQ集群后,取得该Topic下共有多少个Queue,还得知道该Group内还有多少个其它消费者,然后把Queue分一分,你一个我一个,最后每个Consumer分得一个或者多个Queue,这就是均衡Rebalance的过程。这里特别重要,一个Queue不能分给多个Consumer,否则会出现重复消费。这也是为什么Consumer实例个数不能超过Queue个数,那样会导致超出部分Consumer分布到Queue而饿死!
阿里云
NewLife.RocketMQ还支持阿里云RocketMQ,设置上多了Server、AccessKey和SecretKey,基本生产和消费用法相同。
using var mq = new Producer { Topic = "test1", }; mq.Server = "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet"; mq.Configure(MqSetting.Current); mq.Log = XTrace.Log; mq.Start(); for (var i = 0; i < 10; i++) { var str = "学无先后达者为师" + i; //var str = Rand.NextString(1337); var sr = mq.Publish(str, "TagA"); }
阿里云RocketMQ指定一个额外的http地址,用于路由到真正的NameServer,同时还会验证访问密钥。
总结
RocketMQ的扩展能力很不错,吞吐能力强,有阿里背书,这是它很大的优势!
由于Queue个数制约了消费者个数,很容易出现消费倾斜,部分消费者忙死,其它消费者闲死,又不能帮忙,只能干瞪眼。再加上它不支持默认自动创建Topic,大大提升了入门门槛。不适用于中小规模团队,这是比较主要的劣势!
针对中小规模团队,这里强烈推荐Redis消息队列 NewLife.Redis ,它在生产和消费的时候都不需要遭受Queue个数的制约,想开多少就开多少,非常灵活!并且其性能是RocketMQ的数倍。