ASP.NET Core具备一个完整的Hosting架构,允许用户注入各种服务,并提供日志、配置等大量基础组件。该Hosting架构适用于Web项目和WebApi项目,而对于非Web项目(定时任务、消息队列消费处理、大数据计算等)则显得没那么合适了。官方推出了 辅助角色服务 的项目模板,它的项目SDK是 Microsoft.NET.Sdk.Worker ,它会引入一个包 Microsoft.Extensions.Hosting 。尽管它努力的去Web化,但还是大量引入了各种Web包,最后输出程序集非常多,还不如直接用Web项目模板。

此时此刻,我们创建了轻量级应用主机Host,作为非Web应用模板!在微服务时代,定时任务、消息消费、大数据计算类型项目将会比Web项目多得多。


Nuget包:NewLife.Core

源码:https://github.com/NewLifeX/X/blob/master/NewLife.Core/Model/Host.cs

视频:https://www.bilibili.com/video/BV1be41157kA


基本用法

新建普通.NETCore控制台应用,引入 NewLife.Core 即可使用轻量级主机。示例代码如下:

static void Main(string[] args)
{
    // 启用控制台日志,拦截所有异常
    XTrace.UseConsole();

    var services = ObjectContainer.Current;
    services.AddSingleton(XTrace.Log);

    // 初始化星尘跟踪器
    var tracer = InitTracer(services);

    InitRedis(services, tracer);
    InitMqtt(services, tracer);
    InitRocketMq(services, tracer);

    var host = services.BuildHost();
    host.Add<Worker>();
    host.Add<RedisWorker>();
    //host.Add<RocketMqWorker>();
    //host.Add<MqttWorker>();

    host.Run();
}

我们以对象容器开始,借助它的依赖注入能力,通过services注册各种服务。然后一个BuildHost得到主机host,再注册需要后台运行的各种Worker,最后Run起来即可。上面InitRedis/InitMqtt/InitRocketMq等方法,只是为了注册Redis、MQTT和RocketMQ等服务,无需介意,完整代码可访问github。

零代脚手架:AppWorker


对象容器ObjectContainer可参考前文:

对象容器ObjectContainer

AppWorker跑起来是这样子:

项目编译后的程序集极少,除了NewLife.Core,其它都是项目需要。


服务一(发布消息)

写一个定时发布消息的后台任务Worker

public class Worker : IHostedService
{
    private readonly ILog _logger;
    private readonly FullRedis _redis;
    //private readonly MqttClient _mqtt;
    //private readonly Producer _producer;

    public Worker(ILog logger, FullRedis redis/*, MqttClient mqtt, Producer producer*/)
    {
        _logger = logger;
        _redis = redis;
        //_mqtt = mqtt;
        //_producer = producer;
    }

    public Task StartAsync(CancellationToken cancellationToken)
    {
        var task = ExecuteAsync(cancellationToken);
        return task.IsCompleted ? task : Task.CompletedTask;
    }

    protected async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        // Redis 可信消息队列,支持消费确认
        var rdsQueue = _redis.GetReliableQueue<Object>("rdsTopic");

        //// RocketMQ 生产者
        //_producer.Start();

        while (!stoppingToken.IsCancellationRequested)
        {
            // Redis 发布
            rdsQueue.Add(new Area { Code = 110000, Name = "北京市" });

            //// MQTT 发布
            //await _mqtt.PublicAsync("mqttTopic", new Area { Code = 110000, Name = "北京市" });

            //// RocketMQ 发布
            //_producer.Publish(new Area { Code = 110000, Name = "北京市" });

            _logger.Info("Worker running at: {0}", DateTimeOffset.Now);
            await Task.Delay(1000, stoppingToken);
        }
    }

    public Task StopAsync(CancellationToken cancellationToken) => Task.CompletedTask;
}

注意构造函数,ObjectContainer支持依赖注入

服务二(消费消息)

设计一个从Redis队列消费消息的RedisWorker

public class RedisWorker : IHostedService
{
    private readonly FullRedis _redis;
    private readonly ILog _log;
    private RedisReliableQueue<String> _queue;

    public RedisWorker(FullRedis redis, ILog log)
    {
        _redis = redis;
        _log = log;
    }

    public Task StartAsync(CancellationToken cancellationToken)
    {
        var task = ExecuteAsync(cancellationToken);
        return task.IsCompleted ? task : Task.CompletedTask;
    }

    protected async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        await Task.Yield();

        // Redis 可信消息队列,支持消费确认
        _queue = _redis.GetReliableQueue<String>("rdsTopic");

        await _queue.ConsumeAsync<Area>(area =>
        {
            XTrace.WriteLine("RedisQueue.Consume {0} {1}", area.Code, area.Name);
        }, stoppingToken, _log);
    }

    public Task StopAsync(CancellationToken cancellationToken) => Task.CompletedTask;
}