ICache基础上,进一步封装了缓存架构ICacheProvider,用于业务应用中常用的分布式缓存、本地缓存和消息队列。强烈推荐使用ICacheProvider替代ICache

Nuget包:NewLife.Core

源码:https://github.com/NewLifeX/X/blob/master/NewLife.Core/Caching/ICacheProvider.cs

快速入门

NET项目从Nuget引入 NewLife.Redis ,启动代码加入以下代码,服务层直取ICacheProvider。

// 分布式服务,使用配置中心RedisCache配置
services.AddSingleton<ICacheProvider, RedisCacheProvider>();

或者从Nuget引入 NewLife.Redis.Extensions ,注入以下代码:

// 分布式缓存,使用appsettings.json中名为RedisCache的连接字符串 server=127.0.0.1:6379;password=pass;db=3
// 注入 ICacheProvider / FullRedis / Redis / ICache
services.AddRedis();

该代码注入了ICacheProvider的Redis实现,其中Cache即是FullRedis实现,GetQueue得到的也是Redis消息队列。RedisCacheProvider将从配置中心或本地配置文件读取名为RedisCache的redis连接字符串来实例化FullRedis,如果没有配置该连接字符串,则内部自动降级为内存缓存。完美实现了代码同时兼容单机部署(无Redis)和集群部署(有Redis)。

连接字符串配置参考(最后的RedisCache/RedisQueue):

{
  "Logging": {
    "LogLevel": {
      "Default": "Information",
      "Microsoft": "Warning",
      "Microsoft.Hosting.Lifetime": "Information"
    }
  },
  "AllowedHosts": "*",
  "Urls": "http://*:1881",
  "ConnectionStrings": {
    //"IoT": "Data Source=..\\Data\\IoT.db;Provider=Sqlite",
    //"IoTData": "Data Source=..\\Data\\IoTData.db;ShowSql=false;Provider=Sqlite",
    //"Membership": "Data Source=..\\Data\\Membership.db;Provider=Sqlite",

    //"IoT_MySql": "server=.;database=iot;user=iot;password=iot;Provider=MySql",
    //"IoTData_MySql": "server=.;database=iotdata;user=iot;password=iot;Provider=MySql",
    //"Membership_MySql": "server=.;database=iotdata;user=iot;password=iot;Provider=MySql"
  },
  "RedisCache": "server=127.0.0.1;password=123456;db=3",
  "RedisQueue": "server=127.0.0.1;password=123456;db=5"
}


设计思路

根据实际开发经验,即使在分布式系统中,也有大量的数据是不需要跨进程共享的,因此本接口提供了两级缓存。

进程内缓存使用InnerCache,可以规避对象序列化成本,跨进程缓存使用Cache

借助该缓存架构,可以实现各功能模块跨进程共享数据,分布式部署时可用Redis,需要考虑序列化成本。

使用队列时,可根据是否设置消费组来决定使用简单队列还是完整队列。

简单队列(如RedisQueue)可用作命令队列,Topic很多,但几乎没有消息。

完整队列(如RedisStream)可用作消息队列,Topic很少,但消息很多,并且支持多消费组。

消息队列

根据所使用ICache的不同,GetQueue返回的消息队列也不同。

IProducerConsumer<T> GetQueue<T>(String topic, String? group = null);
IProducerConsumer<T> GetInnerQueue<T>(String topic);

GetQueue有两个参数,topic是队列主题,group是消费组。对于RedisCacheProvider实现,未指定group时返回RedisQueue,指定group时返回RedisStream。

GetInnerQueue只有topic这一个参数,默认就是内存队列。

案例分析

星尘StarServer节点管理NodeController,通过引入ICacheProvider实现了内部命令所使用的消息队列。

private async Task ConsumeMessage(WebSocket socket, Node node, String ip, CancellationTokenSource source)
{
    DefaultSpan.Current = null;
    var cancellationToken = source.Token;
    var queue = _cacheProvider.GetQueue<String>($"nodecmd:{node.Code}");
    try
    {
        while (!cancellationToken.IsCancellationRequested && socket.State == WebSocketState.Open)
        {
            ISpan span = null;
            var mqMsg = await queue.TakeOneAsync(30);
            if (mqMsg != null)
            {
                // 埋点
                span = _tracer?.NewSpan($"mq:NodeCommand", mqMsg);

                // 解码
                var dic = JsonParser.Decode(mqMsg);
                var msg = JsonHelper.Convert<CommandModel>(dic);
                span.Detach(dic);

                if (msg == null || msg.Id == 0 || msg.Expire.Year > 2000 && msg.Expire < DateTime.Now)
                    WriteHistory(node, "WebSocket发送", false, "消息无效或已过期。" + mqMsg, ip);
                else
                {
                    WriteHistory(node, "WebSocket发送", true, mqMsg, ip);

                    // 向客户端传递埋点信息,构建完整调用链
                    msg.TraceId = span + "";

                    var log = NodeCommand.FindById(msg.Id);
                    if (log != null)
                    {
                        if (log.TraceId.IsNullOrEmpty()) log.TraceId = span?.TraceId;
                        log.Times++;
                        log.Status = CommandStatus.处理中;
                        log.UpdateTime = DateTime.Now;
                        log.Update();
                    }

                    await socket.SendAsync(mqMsg.GetBytes(), WebSocketMessageType.Text, true, cancellationToken);
                }

                span?.Dispose();
            }
            else
            {
                await Task.Delay(1_000, cancellationToken);
            }
        }
    }
    catch (TaskCanceledException) { }
    catch (Exception ex)
    {
        XTrace.WriteLine("WebSocket异常 node={0} ip={1}", node, ip);
        XTrace.WriteException(ex);
        WriteHistory(node, "WebSocket断开", false, ex.ToString(), ip);
    }
    finally
    {
        source.Cancel();
    }
}

这里的_cacheProvider.GetQueue只有topic没有group,而Startup里面注入了RedisCacheProvider。

在单机部署时,使用内存队列,StarWeb向StarServer发送的节点指令,进入队列后,这里能够直接消费得到。

在集群部署时,StarServer运行有多实例,都要配置相同的RedisCache连接字符串。StarWeb调用接口发送指令时,可能节点的WebSocket刚好不在这台服务器上。接收StarWeb接口指令的StarServer实例,把指令放入队列(此时是Redis),目标节点WebSocket所在的StarServer实例将能够从Redis队列中得到这个指令,并通过WebSocket下发给节点客户端。