在日常的開發中,運行定時任務基本上已經是很普遍的需求了,可以通過windows服務+timer組件來實現,也可以使用第三方框架來集成,Quartz.NET就是一款從JAVA的Quartz移植過來的一個不錯的作業調度組件,但是當我們把作業都寫好,並部署完成的時候,管理成為了很麻煩的事情,因此我基於Quartz.NET,又簡單做了一下封裝,來實現作業動態管理。
首先作業動態管理包含以下幾個核心點
Quzrtz.NET怎麼用我這裡就不再講解了,百度上很多。
主要有三個核心模塊,Job,Trigger和Schedule,
Job就是每一個作業,Trigger就是作業執行策略(多長時間執行一次等),Schedule則把Job和Tigger裝載起來
Job和Tigger可以隨意搭配裝載到Schedule裡面運行
接下來講解實現的思路
先定義一個類庫,類庫只包含一個類,BaseJob ,裡面只有一個Run()方法
之後我們實現的每一個作業都是繼承自這個類,實現Run()方法即可(每個作業都作為一個獨立的類庫,引用這個只有一個類的類庫)
public abstract class BaseJob:MarshalByRefObject,IDisposable { public abstract void Run(); }
接下來建立我們的作業管理核心類庫Job.Service nuget安裝Quartz.NET
然後新建類JobImplement.cs實現Quartz.NET的IJob接口
這樣我們就可以在裡面通過我們自己寫的作業調度容器獲取到動態加載的Job信息,並運行Job的run方法,來實現動態調度了(作業調度容器裡的作業如何裝載進去的在文章後面講解)
jobRuntimeInfo是我們自己定義的實體類,裡面包含了BaseJob,AppDomain,JobInfo 三個信息
JobInfo是作業在上傳到作業動態調度框架時所需要填寫的作業基本信息
public class JobImplement : IJob { public void Execute(IJobExecutionContext context) { try { long jobId = context.JobDetail.JobDataMap.GetLong("JobId"); //從作業調度容器裡查找,如果找到,則運行 var jobRuntimeInfo = JobPoolManager.Instance.Get(jobId); try { jobRuntimeInfo.Job.TryRun(); } catch (Exception ex) { //寫日志,任務調用失敗 ConnectionFactory.GetInstance<Provider.JobStateRepository>() .Update(new Provider.Tables.JobState() { JobId = jobId, RunState = (int) Provider.DirectiveType.Stop, UpdateTime = DateTime.Now }); Common.Logging.LogManager.GetLogger(this.GetType()).Error(ex.Message, ex); } } catch (Exception ex) { Common.Logging.LogManager.GetLogger(this.GetType()).Error(ex.Message, ex); //調用的時候失敗,寫日志,這裡錯誤,屬於系統級錯誤,嚴重錯誤 } } }
JobRuntimeInfo
public class JobRuntimeInfo { public AppDomain AppDomain; public BaseJob Job { get; set; } public JobInfo JobModel { get; set; } }
JobInfo
public class JobInfo { public long JobId { get; set; } public string JobName { get; set; }public string TaskCron { get; set; } public string Namespace { get; set; } public string MainDllName { get; set; } public string Remark { get; set; } public string ZipFileName { get; set; } public string Version { get; set; } public DateTime? CreateTime { get; set; } }
接下來我們來講解這個作業是如何執行的
1.通過一個上傳頁面把作業類庫打包為zip或者rar上傳到服務器,並填寫Job運行的相關信息,添加到數據庫裡
2.上傳完成之後發布一條廣播消息給所有的作業調度框架
3.作業調度框架接收到廣播消息,從數據庫獲取JobInfo,自動根據上傳的時候填寫的信息(見上面的JobInfo類的屬性),自動解壓,裝載到AppDomain裡
public class AppDomainLoader { /// <summary> /// 加載應用程序,獲取相應實例 /// </summary> /// <param name="dllPath"></param> /// <param name="classPath"></param> /// <param name="appDomain"></param> /// <returns></returns> public static BaseJob Load(string dllPath, string classPath, out AppDomain appDomain) where T : class { AppDomainSetup setup = new AppDomainSetup(); if (System.IO.File.Exists($"{dllPath}.config")) setup.ConfigurationFile = $"{dllPath}.config"; setup.ShadowCopyFiles = "true"; setup.ApplicationBase = System.IO.Path.GetDirectoryName(dllPath); appDomain = AppDomain.CreateDomain(System.IO.Path.GetFileName(dllPath), null, setup); AppDomain.MonitoringIsEnabled = true; BaseJob obj = (BaseJob) appDomain.CreateInstanceFromAndUnwrap(dllPath, classPath); return obj; } /// <summary> /// 卸載應用程序 /// </summary> /// <param name="appDomain"></param> public static void UnLoad(AppDomain appDomain) { AppDomain.Unload(appDomain); appDomain = null; } }
4.因為作業都繼承了BaseJob類,所以AppDomain裡的入口程序就是JobInfo.Namespace,反射實例化之後強制轉換為BaseJob,然後創建一個JobRuntime對象,添加到JobPoolManager裡,JobPoolManager裡維護所有的正在運行的Job
5.根據JobInfo.TaskCron(時間表達式)創建Trigger,創建一個JobImplement,並在Context裡加一個JobId,保證在JobImplement的Run運行的時候能夠從JobPoolManager裡獲取到Job的基本信息,以及BaseJob的事例,並調用JobRuntime=>BaseJob=>Run()方法來運行實際的作業
public class JobPoolManager:IDisposable { private static ConcurrentDictionary<long, JobRuntimeInfo> JobRuntimePool = new ConcurrentDictionary<long, JobRuntimeInfo>(); private static IScheduler _scheduler; private static JobPoolManager _jobPollManager; private JobPoolManager(){} static JobPoolManager() { _jobPollManager = new JobPoolManager(); _scheduler = StdSchedulerFactory.GetDefaultScheduler(); _scheduler.Start(); } public static JobPoolManager Instance { get { return _jobPollManager; } } static object _lock=new object(); public bool Add(long jobId, JobRuntimeInfo jobRuntimeInfo) { lock (_lock) { if (!JobRuntimePool.ContainsKey(jobId)) { if (JobRuntimePool.TryAdd(jobId, jobRuntimeInfo)) { IDictionary<string, object> data = new Dictionary<string, object>() { ["JobId"]=jobId }; IJobDetail jobDetail = JobBuilder.Create<JobImplement>() .WithIdentity(jobRuntimeInfo.JobModel.JobName, jobRuntimeInfo.JobModel.Group) .SetJobData(new JobDataMap(data)) .Build(); var tiggerBuilder = TriggerBuilder.Create() .WithIdentity(jobRuntimeInfo.JobModel.JobName, jobRuntimeInfo.JobModel.Group); if (string.IsNullOrWhiteSpace(jobRuntimeInfo.JobModel.TaskCron)) { tiggerBuilder = tiggerBuilder.WithSimpleSchedule((simple) => { simple.WithInterval(TimeSpan.FromSeconds(1)); }); } else { tiggerBuilder = tiggerBuilder .StartNow() .WithCronSchedule(jobRuntimeInfo.JobModel.TaskCron); } var trigger = tiggerBuilder.Build(); _scheduler.ScheduleJob(jobDetail, trigger); return true; } } return false; } } public JobRuntimeInfo Get(long jobId) { if (!JobRuntimePool.ContainsKey(jobId)) { return null; } lock (_lock) { if (JobRuntimePool.ContainsKey(jobId)) { JobRuntimeInfo jobRuntimeInfo = null; JobRuntimePool.TryGetValue(jobId, out jobRuntimeInfo); return jobRuntimeInfo; } return null; } } public bool Remove(long jobId) { lock (_lock) { if (JobRuntimePool.ContainsKey(jobId)) { JobRuntimeInfo jobRuntimeInfo = null; JobRuntimePool.TryGetValue(jobId, out jobRuntimeInfo); if (jobRuntimeInfo != null) { var tiggerKey = new TriggerKey(jobRuntimeInfo.JobModel.JobName, jobRuntimeInfo.JobModel.Group); _scheduler.PauseTrigger(tiggerKey); _scheduler.UnscheduleJob(tiggerKey); _scheduler.DeleteJob(new JobKey(jobRuntimeInfo.JobModel.JobName, jobRuntimeInfo.JobModel.Group)); JobRuntimePool.TryRemove(jobId, out jobRuntimeInfo); return true; } } return false; } } public virtual void Dispose() { if (_scheduler != null && !_scheduler.IsShutdown) { foreach (var jobId in JobRuntimePool.Keys) { var jobState = ConnectionFactory.GetInstance<Job.Provider.JobStateRepository>().Get(jobId); if (jobState != null) { jobState.RunState = (int) DirectiveType.Stop; jobState.UpdateTime = DateTime.Now; ConnectionFactory.GetInstance<Job.Provider.JobStateRepository>().Update(jobState); } } _scheduler.Shutdown(); } } }
然後我們除了做了一個web版的上傳界面之外,還可以做所有的job列表,用來做Start|Stop|Restart等,思路就是發布一條廣播給所有的作業調度框架,作業調度框架根據廣播消息來進行作業的裝載,啟動,停止,卸載等操作。
至此,一個基本的動態作業調度框架就結束了。