在ICache基础上,进一步封装了缓存架构ICacheProvider,用于业务应用中常用的分布式缓存、本地缓存和消息队列。强烈推荐使用ICacheProvider替代ICache。
Nuget包:NewLife.Core
源码:https://github.com/NewLifeX/X/blob/master/NewLife.Core/Caching/ICacheProvider.cs
快速入门
NET项目从Nuget引入NewLife.Redis,启动代码加入以下代码,服务层直取ICacheProvider。
// 分布式服务,使用配置中心RedisCache配置
services.AddSingleton<ICacheProvider, RedisCacheProvider>();
该代码注入了ICacheProvider的Redis实现,其中Cache即是FullRedis实现,GetQueue得到的也是Redis消息队列。RedisCacheProvider将从配置中心或本地配置文件读取名为RedisCache的redis连接字符串来实例化FullRedis,如果没有配置该连接字符串,则内部自动降级为内存缓存。完美实现了代码同时兼容单机部署(无Redis)和集群部署(有Redis)。
连接字符串配置参考(最后的RedisCache/RedisQueue):
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft": "Warning",
"Microsoft.Hosting.Lifetime": "Information"
}
},
"AllowedHosts": "*",
"Urls": "http://*:1881",
"ConnectionStrings": {
//"IoT": "Data Source=..\\Data\\IoT.db;Provider=Sqlite",
//"IoTData": "Data Source=..\\Data\\IoTData.db;ShowSql=false;Provider=Sqlite",
//"Membership": "Data Source=..\\Data\\Membership.db;Provider=Sqlite",
//"IoT_MySql": "server=.;database=iot;user=iot;password=iot;Provider=MySql",
//"IoTData_MySql": "server=.;database=iotdata;user=iot;password=iot;Provider=MySql",
//"Membership_MySql": "server=.;database=iotdata;user=iot;password=iot;Provider=MySql"
},
"RedisCache": "server=127.0.0.1;password=123456;db=3",
"RedisQueue": "server=127.0.0.1;password=123456;db=5"
}
设计思路
根据实际开发经验,即使在分布式系统中,也有大量的数据是不需要跨进程共享的,因此本接口提供了两级缓存。
进程内缓存使用InnerCache,可以规避对象序列化成本,跨进程缓存使用Cache。
借助该缓存架构,可以实现各功能模块跨进程共享数据,分布式部署时可用Redis,需要考虑序列化成本。
使用队列时,可根据是否设置消费组来决定使用简单队列还是完整队列。
简单队列(如RedisQueue)可用作命令队列,Topic很多,但几乎没有消息。
完整队列(如RedisStream)可用作消息队列,Topic很少,但消息很多,并且支持多消费组。
消息队列
根据所使用ICache的不同,GetQueue返回的消息队列也不同。
IProducerConsumer<T> GetQueue<T>(String topic, String? group = null);
IProducerConsumer<T> GetInnerQueue<T>(String topic);
GetQueue有两个参数,topic是队列主题,group是消费组。对于RedisCacheProvider实现,未指定group时返回RedisQueue,指定group时返回RedisStream。
GetInnerQueue只有topic这一个参数,默认就是内存队列。
案例分析
星尘StarServer节点管理NodeController,通过引入ICacheProvider实现了内部命令所使用的消息队列。
private async Task ConsumeMessage(WebSocket socket, Node node, String ip, CancellationTokenSource source)
{
DefaultSpan.Current = null;
var cancellationToken = source.Token;
var queue = _cacheProvider.GetQueue<String>($"nodecmd:{node.Code}");
try
{
while (!cancellationToken.IsCancellationRequested && socket.State == WebSocketState.Open)
{
ISpan span = null;
var mqMsg = await queue.TakeOneAsync(30);
if (mqMsg != null)
{
// 埋点
span = _tracer?.NewSpan($"mq:NodeCommand", mqMsg);
// 解码
var dic = JsonParser.Decode(mqMsg);
var msg = JsonHelper.Convert<CommandModel>(dic);
span.Detach(dic);
if (msg == null || msg.Id == 0 || msg.Expire.Year > 2000 && msg.Expire < DateTime.Now)
WriteHistory(node, "WebSocket发送", false, "消息无效或已过期。" + mqMsg, ip);
else
{
WriteHistory(node, "WebSocket发送", true, mqMsg, ip);
// 向客户端传递埋点信息,构建完整调用链
msg.TraceId = span + "";
var log = NodeCommand.FindById(msg.Id);
if (log != null)
{
if (log.TraceId.IsNullOrEmpty()) log.TraceId = span?.TraceId;
log.Times++;
log.Status = CommandStatus.处理中;
log.UpdateTime = DateTime.Now;
log.Update();
}
await socket.SendAsync(mqMsg.GetBytes(), WebSocketMessageType.Text, true, cancellationToken);
}
span?.Dispose();
}
else
{
await Task.Delay(1_000, cancellationToken);
}
}
}
catch (TaskCanceledException) { }
catch (Exception ex)
{
XTrace.WriteLine("WebSocket异常 node={0} ip={1}", node, ip);
XTrace.WriteException(ex);
WriteHistory(node, "WebSocket断开", false, ex.ToString(), ip);
}
finally
{
source.Cancel();
}
}
这里的_cacheProvider.GetQueue只有topic没有group,而Startup里面注入了RedisCacheProvider。
在单机部署时,使用内存队列,StarWeb向StarServer发送的节点指令,进入队列后,这里能够直接消费得到。
在集群部署时,StarServer运行有多实例,都要配置相同的RedisCache连接字符串。StarWeb调用接口发送指令时,可能节点的WebSocket刚好不在这台服务器上。接收StarWeb接口指令的StarServer实例,把指令放入队列(此时是Redis),目标节点WebSocket所在的StarServer实例将能够从Redis队列中得到这个指令,并通过WebSocket下发给节点客户端。