分布式任务调度系统,纯NET打造的重量级大数据实时计算平台,万亿级调度经验积累!面向中小企业大数据分析场景。
开源地址:https://github.com/NewLifeX/AntJob
使用教程:https://newlifex.com/blood/antjob
功能特点
AntJob的核心是蚂蚁算法:把任意大数据拆分成为小块,采用蚂蚁搬家策略计算每一块!
(蚂蚁搬家,一个馒头掉在地上,众多小蚂蚁会把馒头掰成小块小块往家里般!)
该算法设计于2008年,最开始用于处理基金公司的短信/邮件/传真群发(每批两百万)和电话话费分析(上百种国际长途计费规则),数据量不算大,但是有一定复杂度,并且要求支持持续处理(实时计算)以及出错重试。
2016年在中通快递某产品项目中使用该算法进行大数据实时计算,成功挑战每日1200万的订单。并进一步发展衍生成为重量级实时计算平台,集分布式计算、集群调度、配置中心、负载均衡、故障转移、跨机房冗余、作业监控告警、百亿级数据清洗、超大Redis缓存(>2T)于一身,于2019年达到每年万亿级计算量(2019年双十一日订单量破亿)。
AntJob是开源简化版,仅提供分布式计算和集中调度能力,支持百亿级调度(需要改造)。
AntJob主要功能点:
- 作业处理器。每一个最小业务模块实现一个处理器类,用于处理这一类作业。例如同步数据表时,每张表写一个处理器类,并在调度中心注册一个作业,调度中心按照作业时间切片得到任务,然后把任务(主要包含时间区间)分派给各个计算节点上的处理器类执行。又如,每天汇总计算是一个作业,而每月汇总计算又是另一个作业;
- 任务上下文。作业处理器类实例化以后,将反复向调度中心申请任务来执行,每个任务的上下文核心数据是时间区间(数据调度)、时间点(定时调度)、消息体(消息调度)。调度中心记录任务处理结果;
- 数据切片。支持按照时间区间(如5秒)把大数据切分为小片,也即是数据调度,处理过最大单表60亿行;
- 定时调度。支持定时执行(秒级)指定业务逻辑,每个执行时间点得到一个任务;
- 任务重试。每个任务完整记录处理结果,失败任务在延迟一段时间后将会自动重新分派(可能由原节点或其它节点执行);
- 任务重置。支持批量重置已执行完成的任务,让其再次执行处理;
- 作业面板。在Web控制台上可查看每个应用所有作业的运行状态,或修改参数;
- 作业重置。调整作业参数,让其再次处理某段时间的任务数据,例如重算过去一个月的数据;
其它细节功能将穿插在以下各主要功能点中进行讲解。
定时调度
以下源码位于 https://github.com/NewLifeX/AntJob/tree/master/Samples/HisAgent
新建项目
新建.net core 3.1项目,从nuget引用 AntJob。实例化一个调度器Scheduler,配置网络提供者。
using System; using AntJob; using AntJob.Providers; using NewLife.Log; namespace HisAgent { class Program { static void Main(string[] args) { XTrace.UseConsole(); var set = AntSetting.Current; // 实例化调度器 var sc = new Scheduler(); // 使用分布式调度引擎替换默认的本地文件调度 sc.Provider = new NetworkJobProvider { Server = set.Server, AppID = set.AppID, Secret = set.Secret, }; // 添加作业处理器 sc.Handlers.Add(new HelloJob()); // 启动调度引擎,调度器内部多线程处理 sc.Start(); Console.WriteLine("OK!"); Console.ReadKey(); } } }
然后添加第一个定时调度的作业处理器
using System; using AntJob; namespace HisAgent { internal class HelloJob : Handler { public HelloJob() { // 今天零点开始,每10秒一次 var job = Job; job.Start = DateTime.Today; job.Step = 10; } protected override Int32 Execute(JobContext ctx) { // 当前任务时间 var time = ctx.Task.Start; WriteLog("新生命蚂蚁调度系统!当前任务时间:{0}", time); // 成功处理数据量 return 1; } } }
作业处理器必须继承自Handler,并且重写Execute实现业务逻辑。
我们这里的业务逻辑就是输出一行日志,其中的ctx.Task就是切分得到的任务上下文,Start是时间点。
构造函数中设定的开始时间和步进Step,仅用于首次注册作业到调度中心,后面就没有用处了。
为了编译观察,修改项目输出目录,在项目文件上点右键选“编辑项目文件”
<PropertyGroup> <OutputType>Exe</OutputType> <TargetFramework>netcoreapp3.1</TargetFramework> <AssemblyVersion>1.0.*</AssemblyVersion> <Deterministic>false</Deterministic> <OutputPath>..\..\Bin\HisAgent</OutputPath> <AppendTargetFrameworkToOutputPath>false</AppendTargetFrameworkToOutputPath> </PropertyGroup>
编译执行
代码能编译通过,先跑起来看看
可以看到,调度器首先连接 tcp://127.0.0.1:9999,其次 tcp://ant.newlifex.com:9999 ,而上面代码中并没有提及这两个地址。其实这就是调度中心地址,默认本地用于调试,如果链接失败再连接公开版调度中心,位于配置文件中:
/// <summary>蚂蚁配置。主要用于网络型调度系统</summary> [Config("Ant")] public class AntSetting : Config<AntSetting> { #region 属性 /// <summary>调试开关。默认false</summary> [Description("调试开关。默认false")] public Boolean Debug { get; set; } /// <summary>调度中心。逗号分隔多地址,主备架构</summary> [Description("调度中心。逗号分隔多地址,主备架构")] public String Server { get; set; } = "tcp://127.0.0.1:9999,tcp://ant.newlifex.com:9999"; /// <summary>应用标识。调度中心以此隔离应用,默认当前应用</summary> [Description("应用标识。调度中心以此隔离应用,默认当前应用")] public String AppID { get; set; } /// <summary>应用密钥。</summary> [Description("应用密钥。")] public String Secret { get; set; } #endregion #region 方法 /// <summary>重载</summary> protected override void OnLoaded() { if (AppID.IsNullOrEmpty()) { var asm = Assembly.GetEntryAssembly(); if (asm != null) AppID = asm.GetName().Name; } base.OnLoaded(); } #endregion }
其实上面Main函数中已经看到从配置文件里面读取Server+AppID+Secret,该配置类读取的配置文件在这:
AppID默认取本应用名,Secret由调度中心生成并下发。
调度中心默认打开自动注册AutoRegistry,任意应用登录时自动注册,省去人工配置应用账号的麻烦。
企业内部正式场景使用时,为安全期间,建议关闭自动注册。
再来看看前面跑起来的日志
21:33:08.470 1 N - 启动任务调度引擎[AntJob.Providers.NetworkJobProvider],作业[1]项,定时5秒 21:33:08.471 1 N - HelloJob 开始工作 False 区间(2020-04-09 00:00:00, 0001-01-01 00:00:00) Offset=15 Step=10 MaxTask=8 21:33:08.587 5 Y Job HelloJob 停止工作 21:33:09.467 7 Y T [180.174.185.180:53926]上线!X3
启动了调度引擎,带有一个作业;
作业HelloJob,就是我们通过 sc.Handlers.Add(new HelloJob())
添加进去的作业处理器实例;
HelloJob状态False,处于停止工作状态,那是因为作业注册后,默认都是停止状态,需要去web控制台配置参数后手工开启;
最后一个xxx上线,这是蚂蚁调度的Peers功能,可以探测得到当前应用下所有已连接节点的状态。当HisAgent部署于多个服务器时,每个进程都可以通过Peers得知其它节点的存在;
作业管理
不用关闭HistAgent客户端窗口,我们去线上web控制台看看 http://ant.newlifex.com/
可以看到应用节点在线,点击应用名进去作业面板
这就是我们的HelloJob作业,对应HisAgent中的HelloJob作业处理器。
它处于停用状态,下一次执行时间是 00:00:00 ,也就是今天零点,加上10秒步进,也远小于当前时间,因此,只要启用该作业,调度中心将会马上开始切分任务,并分派给客户去执行。
我们来点击红色叉叉,让它改变为启用状态
几秒后,客户端HisAgent欢快地跑起来!它正在以10秒间隔不断切分并执行任务。
刷新作业面板,可以看到,开始时间已经变为当前附近的时间,右边也有了执行次数。
点击作业名HelloJob,进去查看任务明细
任务切分后,插入作业任务表,此时状态为“就绪”,等待分发给客户端执行。
客户端执行后,向调度中心报告执行结果,可能“完成”,可能“错误”。
错误的任务,会在1分钟后,重新执行,最多连续错误10次。
随系统自动启动
至今我们仍然使用控制台来跑调度程序,怎么样实现稳定可靠的自动化处理呢?
那就必须解决随系统自动启动,以及进程守护(包括Windows和Linux)的问题。
这里推荐 NewLife.Agent,可以把调度程序包装成为一个 Windows服务,或者Linux守护进程,支持看门狗守护。
多节点部署时,推荐 星尘Stardust 中的星尘代理 StarAgent,调度程序无需修改继续使用控制台,由StarAgent负责拉起进程并守护,同时Stardust支持远程多节点部署以及集中监控。
双跑,沸腾吧,分布式计算
再开两个HisAgent进程,查看应用在线表,可以看到有三个节点在线。
HisAgent控制台中,可以看到各自都有机会分配了任务,每个任务有且仅有一个节点执行。
刷新作业HelloJob的任务列表,可以看到不同客户端执行了不同的任务。
调度中心
公开版调度中心 http://ant.newlifex.com 仅用于开发测试,不建议用于生产场景。各企业内部应该自己部署调度中心。
获取AntJob源码 https://github.com/NewLifeX/AntJob ,编译 AntJob.Server,然后跑起来 AntServer.exe
这是一个标准的NewLife.Agent应用,可以选择2安装为Windows服务(需要管理员权限),或者Linux守护进程(需要root权限)。这里仅为了测试,选择5循环调试,直接跑起来核心业务:
可以看到AntServer在tcp/udp/ipv6上监听了9999接口,下方是它所使用的RPC接口。除了前三个内置接口意外,AntJob的接口也就7个,非常简单!
ApiServer的具体内容可参考
再启动一个HisAgent
可以看到,它自动连接了本机这个调度中心,因为配置文件Server里面,127写在第一位!
AntJob客户端支持调度中心的故障转移,配置多个服务端,其中一个断开后,自动选择下一个。
配置文件 Config\AntJob.config 很简单,只有端口和自动注册开关。
<?xml version="1.0" encoding="utf-8"?> <Setting> <!--调试开关。默认true--> <Debug>true</Debug> <!--端口--> <Port>9999</Port> <!--自动注册。任意应用登录时自动注册,省去人工配置应用账号的麻烦,默认true--> <AutoRegistry>true</AutoRegistry> </Setting>
多年使用经验来看,还没遇到过需要关闭自动注册的情况,毕竟都是在企业内网。
推荐部署两套调度中心,一套Web控制台,共用MySql数据库!
如果服务器足够多,或者为了跨机房,部署4套8套也是可以的。
Web控制台
为了查看作业任务状态,以及调整参数,控制作业启停,需要借助控制台。
获取源码 https://github.com/NewLifeX/AntJob ,编译 AntJob.Web,执行 AntWeb.exe。也可以访问公开版AntJob控制台 http://ant.newlifex.com/ 。
首先可以看到应用管理,点击应用名进去应用面板,管理该应用底下的作业。
双击应用所在行空白处,可查看修改应用信息
应用在线记录每个应用实例(应用可以多跑)的实时状态,应用历史记录操作历史。
任务处理过程中,如果抛出异常,将会上报给调度中心,标记任务为“错误”状态,同时把错误信息记录到作业错误中来。也可以通过应用或作业的快捷方式链接进来。
应用消息用于消息调度,消息生产者把消息推送给调度中心时,就是存储在“应用消息”数据表中,消费的时候取出来,创建消息型任务,并从“应用消息”表中删除。
数据调度
数据调度时AntJob毫无疑问的首席角色,它的使用占比超过70%,可见其重要程度。
为了方便处理大数据,我们需要新建一些辅助项目,数据结构来自某医院。
新建数据集项目
新建 .netstandard2.0 类库项目,nuget引用 NewLife.XCode,准备医院的模型文件:
<?xml version="1.0" encoding="utf-8"?> <Tables Version="9.16.7398.1902" xmlns:xs="http://www.w3.org/2001/XMLSchema-instance" xs:schemaLocation="http://www.newlifex.com http://www.newlifex.com/Model2020.xsd" NameSpace="HisData" ConnName="His" Output="Entity" BaseClass="Entity" IgnoreNameCase="True" xmlns="http://www.newlifex.com/Model2020.xsd"> <Table Name="ZYBH0" Description="病人基本信息" IgnoreNameCase="False"> <Columns> <Column Name="ID" DataType="Int32" Identity="True" PrimaryKey="True" Description="编号" /> <Column Name="Bhid" ColumnName="BHID" DataType="Int32" Master="True" Description="病人ID" /> <Column Name="XM" DataType="String" Description="姓名" /> <Column Name="Ryrq" ColumnName="RYRQ" DataType="Int32" Description="入院日期" /> <Column Name="Cyrq" ColumnName="CYRQ" DataType="Int32" Description="出院日期" /> <Column Name="Sfzh" ColumnName="SFZH" DataType="String" Description="身份证号" /> <Column Name="FB" DataType="String" Description="费用类别" /> <Column Name="State" ColumnName="STATE" DataType="Int32" Description="状态" /> <Column Name="Flag" ColumnName="FLAG" DataType="Int32" Description="标记" /> <Column Name="Remark" DataType="String" Length="500" Description="内容" /> <Column Name="CreateUser" DataType="String" Description="创建者" /> <Column Name="CreateUserID" DataType="Int32" Description="创建者" /> <Column Name="CreateTime" DataType="DateTime" Description="创建时间" /> <Column Name="CreateIP" DataType="String" Description="创建地址" /> <Column Name="UpdateUser" DataType="String" Description="更新者" /> <Column Name="UpdateUserID" DataType="Int32" Description="更新者" /> <Column Name="UpdateTime" DataType="DateTime" Description="更新时间" /> <Column Name="UpdateIP" DataType="String" Description="更新地址" /> </Columns> <Indexes> <Index Columns="BHID" Unique="True" /> </Indexes> </Table> <Table Name="ZYBHYZ0" Description="病人医嘱信息" IgnoreNameCase="False"> <Columns> <Column Name="ID" DataType="Int32" Identity="True" PrimaryKey="True" Description="编号" /> <Column Name="Bhid" ColumnName="BHID" DataType="Int32" Description="病人ID" /> <Column Name="Mgroupid" ColumnName="MGROUPID" DataType="Int32" Master="True" Description="医嘱组号" /> <Column Name="Kyzrq" ColumnName="KYZRQ" DataType="Int32" Description="开医嘱日期" /> <Column Name="Tyzrq" ColumnName="TYZRQ" DataType="Int32" Description="停医嘱日期" /> <Column Name="Kyzys" ColumnName="KYZYS" DataType="String" Description="开医嘱医生" /> <Column Name="State" ColumnName="STATE" DataType="Int32" Description="状态" /> <Column Name="CreateUser" DataType="String" Description="创建者" /> <Column Name="CreateUserID" DataType="Int32" Description="创建者" /> <Column Name="CreateTime" DataType="DateTime" Description="创建时间" /> <Column Name="CreateIP" DataType="String" Description="创建地址" /> <Column Name="UpdateUser" DataType="String" Description="更新者" /> <Column Name="UpdateUserID" DataType="Int32" Description="更新者" /> <Column Name="UpdateTime" DataType="DateTime" Description="更新时间" /> <Column Name="UpdateIP" DataType="String" Description="更新地址" /> </Columns> <Indexes> <Index Columns="BHID,MGROUPID" Unique="True" /> <Index Columns="BHID" /> </Indexes> </Table> <Table Name="ZYBHYZ1" Description="病人医嘱明细信息" IgnoreNameCase="False"> <Columns> <Column Name="ID" DataType="Int32" Identity="True" PrimaryKey="True" Description="编号" /> <Column Name="Dgroupid" ColumnName="DGROUPID" DataType="Int32" Master="True" Description="医嘱组号" /> <Column Name="Yzbm" ColumnName="YZBM" DataType="String" Description="医嘱编码" /> <Column Name="Yzmc" ColumnName="YZMC" DataType="String" Description="医嘱名称" /> <Column Name="DJ" DataType="Decimal" Description="单价" /> <Column Name="SL" DataType="Double" Description="数量" /> <Column Name="FY" DataType="Decimal" Description="费用" /> <Column Name="State" ColumnName="STATE" DataType="Int32" Description="状态" /> <Column Name="CreateUser" DataType="String" Description="创建者" /> <Column Name="CreateUserID" DataType="Int32" Description="创建者" /> <Column Name="CreateTime" DataType="DateTime" Description="创建时间" /> <Column Name="CreateIP" DataType="String" Description="创建地址" /> <Column Name="UpdateUser" DataType="String" Description="更新者" /> <Column Name="UpdateUserID" DataType="Int32" Description="更新者" /> <Column Name="UpdateTime" DataType="DateTime" Description="更新时间" /> <Column Name="UpdateIP" DataType="String" Description="更新地址" /> </Columns> <Indexes> <Index Columns="DGROUPID,YZBM" Unique="True" /> <Index Columns="DGROUPID" /> </Indexes> </Table> <Table Name="ZYYFQLD" Description="病人药房请领单分月表202001" IgnoreNameCase="False"> <Columns> <Column Name="ID" DataType="Int32" Identity="True" PrimaryKey="True" Description="编号" /> <Column Name="Qlrq" ColumnName="QLRQ" DataType="Int32" Description="请领日期" /> <Column Name="Qlsj" ColumnName="QLSJ" DataType="Int32" Description="请领时间" /> <Column Name="Ksbm" ColumnName="KSBM" DataType="String" Description="请领科室" /> <Column Name="Yzgroupid" ColumnName="YZGROUPID" DataType="Int32" Description="医嘱ID" /> <Column Name="Bhid" ColumnName="BHID" DataType="Int32" Description="病人ID" /> <Column Name="Yzbm" ColumnName="YZBM" DataType="String" Description="药品编码" /> <Column Name="DJ" DataType="Decimal" Description="单价" /> <Column Name="SL" DataType="Double" Description="请领数量" /> <Column Name="Yfbm" ColumnName="YFBM" DataType="String" Description="发药药房" /> <Column Name="Fyrq" ColumnName="FYRQ" DataType="Int32" Description="发药日期" /> <Column Name="State" ColumnName="STATE" DataType="Int32" Description="状态" /> <Column Name="Remark" DataType="String" Length="500" Description="内容" /> <Column Name="CreateUser" DataType="String" Description="创建者" /> <Column Name="CreateUserID" DataType="Int32" Description="创建者" /> <Column Name="CreateTime" DataType="DateTime" Description="创建时间" /> <Column Name="CreateIP" DataType="String" Description="创建地址" /> <Column Name="UpdateUser" DataType="String" Description="更新者" /> <Column Name="UpdateUserID" DataType="Int32" Description="更新者" /> <Column Name="UpdateTime" DataType="DateTime" Description="更新时间" /> <Column Name="UpdateIP" DataType="String" Description="更新地址" /> </Columns> <Indexes> <Index Columns="BHID" /> </Indexes> </Table> <Table Name="ZDSF" Description="收费字典" IgnoreNameCase="False"> <Columns> <Column Name="ID" DataType="Int32" Identity="True" PrimaryKey="True" Description="编号" /> <Column Name="BM" DataType="String" Master="True" Nullable="False" Description="编码" /> <Column Name="DH" DataType="String" Description="拼音码" /> <Column Name="MC" DataType="String" Description="名称" /> <Column Name="DJ" DataType="Decimal" Description="单价" /> <Column Name="DW" DataType="String" Description="单位" /> <Column Name="Mzyflb" ColumnName="MZYFLB" DataType="Int32" Description="门诊费用类别" /> <Column Name="Zyfylb" ColumnName="ZYFYLB" DataType="Int32" Description="住院费用类别" /> <Column Name="Zfbl" ColumnName="ZFBL" DataType="Double" Description="自费比例" /> <Column Name="CreateUser" DataType="String" Description="创建者" /> <Column Name="CreateUserID" DataType="Int32" Description="创建者" /> <Column Name="CreateTime" DataType="DateTime" Description="创建时间" /> <Column Name="CreateIP" DataType="String" Description="创建地址" /> <Column Name="UpdateUser" DataType="String" Description="更新者" /> <Column Name="UpdateUserID" DataType="Int32" Description="更新者" /> <Column Name="UpdateTime" DataType="DateTime" Description="更新时间" /> <Column Name="UpdateIP" DataType="String" Description="更新地址" /> </Columns> <Indexes> <Index Columns="BM" Unique="True" /> </Indexes> </Table> </Tables>
再去找一个 build_netcore.tt 的T4模板,可以这里下载 http://x.newlifex.com/XCode_BuildModel.zip 。也可以从AntJob.Web项目中拷贝一个。
记得把build_netcore.tt在vs文件属性的自定义工具设置为TextTemplatingFileGenerator。
由于netstandard项目输出目录中不包括XCode.dll等引用程序集,因此build.tt需要改一下
<#@ template language="C#" hostSpecific="true" debug="true" #> <#@ assembly name="netstandard" #> <#@ assembly name="$(ProjectDir)\..\..\DLL\NewLife.Core.dll" #> <#@ assembly name="$(ProjectDir)\..\..\DLL\XCode.dll" #> <#@ import namespace="System.Diagnostics" #> <#@ import namespace="System.IO" #> <#@ import namespace="XCode.Code" #> <#@ output extension=".log" #> <# // 设置当前工作目录 PathHelper.BasePath = Host.ResolvePath("."); // 导入模型文件并生成实体类,模型文件、输出目录、命名空间、连接名、中文文件名、表名字段名大小写 //EntityBuilder.Build(String xmlFile = null, String output = null, String nameSpace = null, String connName = null, Boolean? chineseFileName = true,Boolean? nameIgnoreCase = null); EntityBuilder.Build(); //var tables = DAL.ImportFrom("Company.Project.xml"); //EntityBuilder.Build(tables); #>
我们从AntJob.Web中拷贝两个文件 NewLife.Core.dll 和 XCode.dll 到外面的DLL目录中,供build.tt调用。如果实际目录不同,可以修改build.tt文件的指向。
在build.tt文件上右键,执行自定义工具,即可生成一批实体类。
编译通过
新建Web项目
新建 .netcore3.1 的web项目,Nuget引用 NewLife.Cube.Core,并引用项目HisData。
该Web项目主要用于查看和管理那些数据表的数据。
修改Main函数,增加 XTrace.UseConsole,用于拦截所有日志
public class Program { public static void Main(string[] args) { XTrace.UseConsole(); CreateHostBuilder(args).Build().Run(); } public static IHostBuilder CreateHostBuilder(string[] args) => Host.CreateDefaultBuilder(args) .ConfigureWebHostDefaults(webBuilder => { webBuilder.UseStartup<Startup>(); }); }
Startup.cs 中引用魔方,services.AddCube()/app.UseCube()
编译运行,浏览器访问 http://localhost:5000/Admin
可以看到魔方跑起来了,但是还没有我们需要的数据页面。
新建His控制器区域
区域文件内容:
using System; using System.ComponentModel; using NewLife.Cube; namespace HisWeb.Areas.His { [DisplayName("医院管理")] public class HisArea : AreaBase { public HisArea() : base(nameof(HisArea).TrimEnd("Area")) { } static HisArea() => RegisterArea<HisArea>(); } }
为每个实体类新建一个控制器,如下
using HisData; using NewLife.Cube; namespace HisWeb.Areas.His.Controllers { [HisArea] public class ZYBH0Controller : EntityController<ZYBH0> { static ZYBH0Controller() => MenuOrder = 100; } }
编译项目,执行 HisWeb.exe,刷新浏览器页面,即可看到每张数据表对应了一个页面。
生成病人数据
在引用项目HisData。
新建一个作业处理器 BuildPatient 用于随机生成病人
using System; using System.Collections.Generic; using AntJob; using HisData; using NewLife.Security; using XCode; namespace HisAgent { internal class BuildPatient : Handler { public BuildPatient() { var job = Job; job.Start = DateTime.Today; job.Step = 15; } protected override Int32 Execute(JobContext ctx) { // 随机造几个病人 var count = Rand.Next(1, 9); var list = new List<ZYBH0>(); for (var i = 0; i < count; i++) { var time = DateTime.Now.AddSeconds(Rand.Next(-30 * 24 * 3600, 0)); var time2 = time.AddSeconds(Rand.Next(3600, 10 * 24 * 3600)); var pi = new ZYBH0 { Bhid = Rand.Next(999999), XM = Rand.NextString(8), Ryrq = time, Cyrq = time2, Sfzh = Rand.NextString(18), FB = Rand.NextString(6), State = Rand.Next(8), Flag = Rand.Next(2), }; list.Add(pi); } list.Insert(true); // 成功处理数据量 return count; } } }
主函数中把该处理器添加到调度器
编译运行,HisAgent将在控制台新增一个作业,把它启用
很快,作业处理器就跑起来了
每次定时任务所添加病人数时随机的,我们通过Execute返回,记录着控制台作业任务表的“成功”字段。
去HisWeb中看看数据
很不幸,啥也没有……
原来,我们并没有配置数据库连接字符串,各个应用就会默认使用SQLite数据库,位于自己目录中,HisAgent生成的数据,HisWeb自然就无法访问了。
修改HisWeb输出目录,让它跟HisAgent并排
<PropertyGroup> <TargetFramework>netcoreapp3.1</TargetFramework> <AssemblyVersion>1.0.*</AssemblyVersion> <Deterministic>false</Deterministic> <OutputPath>..\..\Bin\HisWeb</OutputPath> <AppendTargetFrameworkToOutputPath>false</AppendTargetFrameworkToOutputPath> </PropertyGroup>
修改配置文件appsettings.json给的链接字符串
{ "Logging": { "LogLevel": { "Default": "Information", "Microsoft": "Warning", "Microsoft.Hosting.Lifetime": "Information" } }, "AllowedHosts": "*", "ConnectionStrings": { "His": { "connectionString": "Data Source=..\\Hisagent\\Data\\His.db", "providerName": "SQLite" } } }
重新跑起来后,成功看到病人数据
清洗病人数据
由于HisAgent需要使用数据调度,除了AntJob,我们还需要从nuget引用AntJob.Extensions。
这一次,我们来实时消费病人数据,为其生成医嘱,高仿数据清洗过程。
新增生成医嘱的作业处理器 BuildWill
using System; using AntJob; using HisData; using NewLife.Security; using XCode; namespace HisAgent { class BuildWill : DataHandler { public BuildWill() { var job = Job; job.Start = DateTime.Today; job.Step = 30; } public override Boolean Start() { // 指定要抽取数据的实体类以及时间字段 Factory = ZYBH0.Meta.Factory; Field = ZYBH0._.CreateTime; return base.Start(); } protected override Boolean ProcessItem(JobContext ctx, IEntity entity) { var pi = entity as ZYBH0; // 创建医嘱信息 var will = new ZYBHYZ0 { Bhid = pi.Bhid, Mgroupid = Rand.Next(9999), Kyzrq = pi.Ryrq.AddHours(1), Tyzrq = pi.Cyrq.AddHours(-3), Kyzys = Rand.NextString(8), State = pi.State, }; will.Insert(); return true; } } }
数据调度的处理器基类是DataHandler,并且需要在Start之前指定实体工厂以及时间字段。调度系统将会从该表抽取数据,根据调度中心分派的时间区间(StartTime+EndTime),对时间字段进行查询。
处理函数ProcessItem就是业务核心代码了,也可以重写Execute,实现批量处理。
从今天零点开始消费处理数据,步进30秒,也就是每次抽取30秒的数据来分析处理。
不要忘了在Main中把该处理器加入调度器。
跑起来,去控制台启用作业
可以看到,在(00:30:00, 00:30:30)区间内,得到4个病人,创建了4个医嘱。
运行HisWeb查看数据
消息调度
设计概要
计算型应用(继承Handler)
系统架构
调度中心主从架构