家乡网站设计目的互联网公司排名名单
家乡网站设计目的,互联网公司排名名单,个人建网站步骤,重庆有几个区几个县点击上方蓝字关注我们0. 简介在某些时候我们可能会需要执行后台任务#xff0c;或者是执行一些周期性的任务。比如说可能每隔 1 个小时要清除某个临时文件夹内的数据#xff0c;可能用户会要针对某一个用户群来群发一组短信。前面这些就是典型的应用场景#xff0c;在 Abp 框… 点击上方蓝字关注我们0. 简介在某些时候我们可能会需要执行后台任务或者是执行一些周期性的任务。比如说可能每隔 1 个小时要清除某个临时文件夹内的数据可能用户会要针对某一个用户群来群发一组短信。前面这些就是典型的应用场景在 Abp 框架里面为我们准备了后台作业和后台工作者来帮助我们解决这个问题。后台作业与后台工作者的区别是前者主要用于某些耗时较长的任务而不想阻塞用户的时候所使用。后者主要用于周期性的执行某些任务从 “工作者” 的名字可以看出来就是一个个工人而且他们每个工人都拥有单独的后台线程。0.1 典型场景后台作业某个用户按下了报表按钮来生成一个需要长时间等待的报表。你添加这个工作到队列中当报表生成完毕后发送报表结果到该用户的邮箱。在后台作业中发送一封邮件有些问题可能会导致发送失败(网络连接异常或者主机宕机)由于有后台作业以及持久化机制在问题排除后可以重试以保证任务的成功执行。后台工作者后台工作者能够周期性地执行旧日志的删除。后台工作者可以周期性地筛选出非活跃性用户并且发送回归邮件给这些用户。1. 启动流程后台作业与后台工作者都是通过各自的 Manager(IBackgroundJobManager/IBackgroundWorkerManager) 来进行管理的。而这两个 Manager 分别继承了 ISingletonDependency 接口所以在启动的时候就会自动注入这两个管理器以便开发人员管理操作。这里值得注意的一点是IBackgroundJobManager 接口是 IBackgroundWorker 的派生接口而 IBackgroudWorker 是归属于 IBackgroundWorkerManager 进行管理的。所以你可以在 AbpKernelModule 里面看到如下代码public sealed class AbpKernelModule : AbpModule
{public override void PostInitialize(){// 注册可能缺少的组件RegisterMissingComponents();// ... 忽略的代码// 各种管理器的初始化操作// 从配置项中读取是否启用了后台作业功能if (Configuration.BackgroundJobs.IsJobExecutionEnabled){var workerManager IocManager.ResolveIBackgroundWorkerManager();// 开始启动后台工作者workerManager.Start();// 增加后台作业管理器workerManager.Add(IocManager.ResolveIBackgroundJobManager());}}
}
可以看到后台作业管理器是作为一个后台工作者被添加到了 IBackgroundWorkerManager 当中来执行的。2. 代码分析2.1 后台工作者2.1.1 后台工作者管理器Abp 通过后台工作者管理器来管理后台作业队列所以我们首先来看一下后台工作者管理器接口的定义是什么样子的。public interface IBackgroundWorkerManager : IRunnable
{void Add(IBackgroundWorker worker);
}
还是相当简洁的就一个 Add 方法用来添加一个新的后台工作者对象。只是在这个地方可以看到该接口又是集成自 IRunnable 接口那么该接口的作用又是什么呢转到其定义可以看到IRunable 接口定义了三个基本的方法Start()、Stop()、WaitStop() 而且他拥有一个默认实现 RunableBase其实就是用来标识一个任务的运行状态。public interface IRunnable
{// 开始执行任务void Start();// 停止执行任务void Stop();// 阻塞线程等待任务执行完成后标识为停止。void WaitToStop();
}public abstract class RunnableBase : IRunnable
{// 用于标识任务是否运行的布尔值变量public bool IsRunning { get { return _isRunning; } }private volatile bool _isRunning;// 启动之后表示任务正在运行public virtual void Start(){_isRunning true;}// 停止之后表示任务结束运行public virtual void Stop(){_isRunning false;}public virtual void WaitToStop(){}
}
到目前为止整个代码都还是比较简单清晰的我们接着看 IBackgroundWorkerManager 的默认实现 BackgroundWorkerManager 类首先我们看一下该类拥有哪些属性与字段。public class BackgroundWorkerManager : RunnableBase, IBackgroundWorkerManager, ISingletonDependency, IDisposable
{private readonly IIocResolver _iocResolver;private readonly ListIBackgroundWorker _backgroundJobs;public BackgroundWorkerManager(IIocResolver iocResolver){_iocResolver iocResolver;_backgroundJobs new ListIBackgroundWorker();}
}
在后台工作者管理器类的内部默认有一个 List 集合用于维护所有的后台工作者对象。那么其他的 Start() 等方法肯定是基于这个集合进行操作的。public override void Start()
{base.Start();_backgroundJobs.ForEach(job job.Start());
}public override void Stop()
{_backgroundJobs.ForEach(job job.Stop());base.Stop();
}public override void WaitToStop()
{_backgroundJobs.ForEach(job job.WaitToStop());base.WaitToStop();
}
可以看到实现还是比较简单的接下来我们继续看他的 Add() 方法是如何进行操作的public void Add(IBackgroundWorker worker)
{_backgroundJobs.Add(worker);if (IsRunning){worker.Start();}
}
在这里我们看到他会针对 IsRunning 进行判定是否立即启动加入的后台工作者对象。而这个 IsRunning 属性值唯一产生变化的情况就在于 Start() 方法与 Stop() 方法的调用。最后肯定也有相关的销毁方法用于释放所有注入的后台工作者对象并将集合清除。private bool _isDisposed;public void Dispose()
{if (_isDisposed){return;}_isDisposed true;// 遍历集合通过 Ioc 解析器的 Release 方法释放对象_backgroundJobs.ForEach(_iocResolver.Release);// 清空集合_backgroundJobs.Clear();
}
所以针对于所有后台工作者的管理都是通过 IBackgroundWorkerManager 来进行操作的。2.1.2 后台工作者看完了管理器我们来看一下 IBackgroundWorker 后台工作者对象是怎样的构成。public interface IBackgroundWorker : IRunnable
{}
貌似只是一个空的接口其作用主要是标识某个类型是否为后台工作者转到其抽象类实现 BackgroundWorkerBase里面只是注入了一些辅助对象与本地化的一些方法。public abstract class BackgroundWorkerBase : RunnableBase, IBackgroundWorker
{// 配置管理器public ISettingManager SettingManager { protected get; set; }// 工作单元管理器public IUnitOfWorkManager UnitOfWorkManager{get{if (_unitOfWorkManager null){throw new AbpException(Must set UnitOfWorkManager before use it.);}return _unitOfWorkManager;}set { _unitOfWorkManager value; }}private IUnitOfWorkManager _unitOfWorkManager;// 获得当前的工作单元protected IActiveUnitOfWork CurrentUnitOfWork { get { return UnitOfWorkManager.Current; } }// 本地化资源管理器public ILocalizationManager LocalizationManager { protected get; set; }// 默认的本地化资源的源名称protected string LocalizationSourceName { get; set; }protected ILocalizationSource LocalizationSource{get{// 如果没有配置源名称直接抛出异常if (LocalizationSourceName null){throw new AbpException(Must set LocalizationSourceName before, in order to get LocalizationSource);}if (_localizationSource null || _localizationSource.Name ! LocalizationSourceName){_localizationSource LocalizationManager.GetSource(LocalizationSourceName);}return _localizationSource;}}private ILocalizationSource _localizationSource;// 日志记录器public ILogger Logger { protected get; set; }protected BackgroundWorkerBase(){Logger NullLogger.Instance;LocalizationManager NullLocalizationManager.Instance;}// ... 其他模板代码
}
我们接着看继承并实现了 BackgroundWorkerBase 的类型 PeriodicBackgroundWorkerBase从字面意思上来看该类型应该是一个定时后台工作者基类。重点在于 Periodic(定时)从其类型内部的定义可以看到该类型使用了一个 AbpTimer 对象来进行周期计时与具体工作任务的触发。我们暂时先不看这个 AbpTimer仅仅看 PeriodicBackgroundWorkerBase 的内部实现。public abstract class PeriodicBackgroundWorkerBase : BackgroundWorkerBase
{protected readonly AbpTimer Timer;// 注入 AbpTimerprotected PeriodicBackgroundWorkerBase(AbpTimer timer){Timer timer;// 绑定周期执行的任务这里是 DoWork()Timer.Elapsed Timer_Elapsed;}public override void Start(){base.Start();Timer.Start();}public override void Stop(){Timer.Stop();base.Stop();}public override void WaitToStop(){Timer.WaitToStop();base.WaitToStop();}private void Timer_Elapsed(object sender, System.EventArgs e){try{DoWork();}catch (Exception ex){Logger.Warn(ex.ToString(), ex);}}protected abstract void DoWork();
}
可以看到这里基类绑定了 DoWork() 作为其定时执行的方法那么用户在使用的时候直接继承自该基类然后重写 DoWork() 方法即可绑定自己的后台工作者的任务。2.1.3 AbpTimer 定时器在上面的基类我们看到基类的 Start()、Stop()、WaitTpStop() 方法都是调用的 AbpTimer 所提供的所以说 AbpTimer 其实也继承了 RunableBase 基类并实现其具体的启动与停止操作。其实 AbpTimer 的核心就是通过 CLR 的 Timer 来实现周期性任务执行的不过默认的 Timer 类有两个比较大的问题。CLR 的 Timer 并不会等待你的任务执行完再执行下一个周期的任务如果你的某个任务耗时过长超过了 Timer 定义的周期。那么 Timer 会开启一个新的线程执行这样的话最后我们系统的资源会因为线程大量重复创建而被拖垮。如何知道一个 Timer 所执行的业务方法已经真正地被结束了。所以 Abp 才会重新封装一个 AbpTimer 作为一个基础的计时器。第一个问题的解决方法很简单就是在执行具体绑定的业务方法之前通过 Timer.Change() 方法来让 Timer 临时失效。等待业务方法执行完成之后再将 Timer 的周期置为用户设定的周期。// CLR Timer 绑定的回调方法
private void TimerCallBack(object state)
{lock (_taskTimer){if (!_running || _performingTasks){return;}// 暂时让 Timer 失效_taskTimer.Change(Timeout.Infinite, Timeout.Infinite);// 设置执行标识为 TRUE表示当前的 AbpTimer 正在执行_performingTasks true;}try{// 如果绑定了相应的触发事件if (Elapsed ! null){// 执行相应的业务方法这里就是最开始绑定的 DoWork() 方法Elapsed(this, new EventArgs());}}catch{}finally{lock (_taskTimer){// 标识业务方法执行完成_performingTasks false;if (_running){// 更改周期为用户指定的执行周期等待下一次触发_taskTimer.Change(Period, Timeout.Infinite);}Monitor.Pulse(_taskTimer);}}
}
针对于第二个问题Abp 通过 WaitToStop() 方法会阻塞调用这个 Timer 的线程并且在 _performingTasks 标识位是 false 的时候释放。public override void WaitToStop()
{// 锁定 CLR 的 Timer 对象lock (_taskTimer){// 循环检测while (_performingTasks){Monitor.Wait(_taskTimer);}}base.WaitToStop();
}
至于其他的 Start() 方法就是使用 CLR 的 Timer 更改其执行周期而 Stop() 就是直接将 Timer 的周期设置为无限大使计时器失效。2.1.4 总结Abp 后台工作者的核心就是通过 AbpTimer 来实现周期性任务的执行用户只需要继承自 PeriodicBackgroundWorkerBase然后将其添加到 IBackgroundWorkerManager 的集合当中。这样 Abp 在启动之后就会遍历这个工作者集合然后周期执行这些后台工作者绑定的方法。当然如果你继承了 PeriodicBackgroundWorkerBase 之后可以通过设置构造函数的 AbpTimer 来指定自己的执行周期。2.2 后台作业队列后台工作队列的管理是通过 IBackgroundJobManager 来处理的而该接口又继承自 IBackgroundWorker所以一整个后台作业队列就是一个后台工作者只不过这个工作者有点特殊。2.2.1 后台作业管理器IBackgroundJobManager 接口的定义其实就两个方法一个 EnqueueAsyncTJob, TArgs() 用于将一个后台作业加入到执行队列当中。而 DeleteAsync() 方法呢顾名思义就是从队列当中移除指定的后台作业。首先看一下其默认实现 BackgroundJobManager该实现同样是继承自 PeriodicBackgroundWorkerBase 并且其默认周期为 5000 ms。public class BackgroundJobManager : PeriodicBackgroundWorkerBase, IBackgroundJobManager, ISingletonDependency
{// 事件总线public IEventBus EventBus { get; set; }// 轮训后台作业的间隔默认值为 5000 毫秒.public static int JobPollPeriod { get; set; }// IOC 解析器private readonly IIocResolver _iocResolver;// 后台作业队列存储private readonly IBackgroundJobStore _store;static BackgroundJobManager(){JobPollPeriod 5000;}public BackgroundJobManager(IIocResolver iocResolver,IBackgroundJobStore store,AbpTimer timer): base(timer){_store store;_iocResolver iocResolver;EventBus NullEventBus.Instance;Timer.Period JobPollPeriod;}
}
基础结构基本上就这个样子接下来看一下他的两个接口方法是如何实现的。EnqueueAsyncTJob, TArgs 方法通过传入指定的后台作业对象和相应的参数同时还有任务的优先级。将其通过 IBackgroundJobStore 进行持久化并返回一个任务的唯一 JobId 以便进行删除操作。public async Taskstring EnqueueAsyncTJob, TArgs(TArgs args, BackgroundJobPriority priority BackgroundJobPriority.Normal, TimeSpan? delay null)where TJob : IBackgroundJobTArgs
{// 通过 JobInfo 包装任务的基本信息var jobInfo new BackgroundJobInfo{JobType typeof(TJob).AssemblyQualifiedName,JobArgs args.ToJsonString(),Priority priority};// 如果需要延时执行的话则用当前时间加上延时的时间作为任务下次运行的时间if (delay.HasValue){jobInfo.NextTryTime Clock.Now.Add(delay.Value);}// 通过 Store 进行持久话存储await _store.InsertAsync(jobInfo);// 返回后台任务的唯一标识return jobInfo.Id.ToString();
}
至于删除操作在 Manager 内部其实也是通过 IBackgroundJobStore 进行实际的删除操作的。public async Taskbool DeleteAsync(string jobId)
{// 判断 jobId 的值是否有效if (long.TryParse(jobId, out long finalJobId) false){throw new ArgumentException($The jobId {jobId} should be a number., nameof(jobId));}// 使用 jobId 从 Store 处筛选到 JobInfo 对象的信息BackgroundJobInfo jobInfo await _store.GetAsync(finalJobId);if (jobInfo null){return false;}// 如果存在有 JobInfo 则使用 Store 进行删除操作await _store.DeleteAsync(jobInfo);return true;
}
后台作业管理器实质上是一个周期性执行的后台工作者那么我们的后台作业是每 5000 ms 执行一次那么他的 DoWork() 方法又在执行什么操作呢protected override void DoWork()
{// 从 Store 当中获得等待执行的后台作业集合var waitingJobs AsyncHelper.RunSync(() _store.GetWaitingJobsAsync(1000));// 遍历这些等待执行的后台任务然后通过 TryProcessJob 进行执行foreach (var job in waitingJobs){TryProcessJob(job);}
}
可以看到每 5 秒钟我们的后台作业管理器就会从 IBackgroundJobStore 当中拿到最大 1000 条的后台作业信息然后遍历这些信息。通过 TryProcessJob(job) 方法来执行后台作业。而 TryProcessJob() 方法本质上就是通过反射构建出一个 IBackgroundJob 对象然后取得序列化的参数值通过反射得到的 MethodInfo 对象来执行我们的后台任务。执行完成之后就会从 Store 当中移除掉执行完成的任务。针对于在执行过程当中所出现的异常会通过 IEventBus 触发一个 AbpHandledExceptionData 事件记录后台作业执行失败时的异常信息。并且一旦在执行过程当中出现了任何异常的情况都会将该任务的 IsAbandoned 字段置为 true当该字段为 true 时该任务将不再回被执行。PS就是在 GetWaitingJobsAsync() 方法时会过滤掉 IsAbandoned 值为 true 的任务。private void TryProcessJob(BackgroundJobInfo jobInfo)
{try{// 任务执行次数自增 1jobInfo.TryCount;// 最后一次执行时间设置为当前时间jobInfo.LastTryTime Clock.Now;// 通过反射取得后台作业的类型var jobType Type.GetType(jobInfo.JobType);// 通过 Ioc 解析器得到一个临时的后台作业对象执行完之后既被释放using (var job _iocResolver.ResolveAsDisposable(jobType)){try{// 通过反射得到后台作业的 Execute 方法var jobExecuteMethod job.Object.GetType().GetTypeInfo().GetMethod(Execute);var argsType jobExecuteMethod.GetParameters()[0].ParameterType;var argsObj JsonConvert.DeserializeObject(jobInfo.JobArgs, argsType);// 结合持久话存储的参数信息调用 Execute 方法进行后台作业jobExecuteMethod.Invoke(job.Object, new[] { argsObj });// 执行完成之后从 Store 删除该任务的信息AsyncHelper.RunSync(() _store.DeleteAsync(jobInfo));}catch (Exception ex){Logger.Warn(ex.Message, ex);// 计算下一次执行的时间一旦超过 2 天该任务都执行失败则返回 nullvar nextTryTime jobInfo.CalculateNextTryTime();if (nextTryTime.HasValue){jobInfo.NextTryTime nextTryTime.Value;}else{// 如果为 null 则说明该任务在 2 天的时间内都没有执行成功则放弃继续执行jobInfo.IsAbandoned true;}// 更新 Store 存储的任务信息TryUpdate(jobInfo);// 触发异常事件EventBus.Trigger(this,new AbpHandledExceptionData(new BackgroundJobException(A background job execution is failed. See inner exception for details. See BackgroundJob property to get information on the background job., ex){BackgroundJob jobInfo,JobObject job.Object}));}}}catch (Exception ex){Logger.Warn(ex.ToString(), ex);// 表示任务不再执行jobInfo.IsAbandoned true;// 更新 StoreTryUpdate(jobInfo);}
}
2.2.2 后台作业后台作业的默认接口定义为 IBackgroundJobin TArgs 他只有一个 Execute(TArgs args) 方法用于接收指定类型的作业参数并执行。一般来说我们不建议直接通过继承 IBackgroundJobin TArgs 来实现后台作业而是继承自 BackgroundJobTArgs 抽象类。该抽象类内部也没有什么特别的实现主要是注入了一些基础设施比如说 UOW 与 本地化资源管理器方便我们开发使用。后台作业本身是具体执行的对象而 BackgroundJobInfo 则是存储了后台作业的 Type 类型和参数方便在需要执行的时候通过反射的方式执行后台作业。2.2.2 后台作业队列存储从 IBackgroundJobStore 我们就可以猜到以 Abp 框架的套路他肯定会有两种实现第一种就是基于内存的 InMemoryBackgroundJobStore。而第二种呢就是由 Abp.Zero 模块所提供的基于数据库的 BackgroundJobStore。IBackgroundJobStore 接口所定义的方法基本上就是增删改查没有什么复杂的。public interface IBackgroundJobStore
{// 通过 JobId 获取后台任务信息TaskBackgroundJobInfo GetAsync(long jobId);// 插入一个新的后台任务信息Task InsertAsync(BackgroundJobInfo jobInfo);/// summary/// Gets waiting jobs. It should get jobs based on these:/// Conditions: !IsAbandoned And NextTryTime lt; Clock.Now./// Order by: Priority DESC, TryCount ASC, NextTryTime ASC./// Maximum result: paramref namemaxResultCount/./// /summary/// param namemaxResultCountMaximum result count./paramTaskListBackgroundJobInfo GetWaitingJobsAsync(int maxResultCount);/// summary/// Deletes a job./// /summary/// param namejobInfoJob information./paramTask DeleteAsync(BackgroundJobInfo jobInfo);/// summary/// Updates a job./// /summary/// param namejobInfoJob information./paramTask UpdateAsync(BackgroundJobInfo jobInfo);
}
这里先从简单的内存 Store 说起这个 InMemoryBackgroundJobStore 内部使用了一个并行字典来存储这些任务信息。public class InMemoryBackgroundJobStore : IBackgroundJobStore
{private readonly ConcurrentDictionarylong, BackgroundJobInfo _jobs;private long _lastId;public InMemoryBackgroundJobStore(){_jobs new ConcurrentDictionarylong, BackgroundJobInfo();}
}
相当简单这几个接口方法基本上就是针对与这个并行字典操作的一层封装。public TaskBackgroundJobInfo GetAsync(long jobId)
{return Task.FromResult(_jobs[jobId]);
}public Task InsertAsync(BackgroundJobInfo jobInfo)
{jobInfo.Id Interlocked.Increment(ref _lastId);_jobs[jobInfo.Id] jobInfo;return Task.FromResult(0);
}public TaskListBackgroundJobInfo GetWaitingJobsAsync(int maxResultCount)
{var waitingJobs _jobs.Values// 首先筛选出不再执行的后台任务.Where(t !t.IsAbandoned t.NextTryTime Clock.Now)// 第一次根据后台作业的优先级进行排序高优先级优先执行.OrderByDescending(t t.Priority)// 再根据执行次数排序执行次数越少的越靠前.ThenBy(t t.TryCount).ThenBy(t t.NextTryTime).Take(maxResultCount).ToList();return Task.FromResult(waitingJobs);
}public Task DeleteAsync(BackgroundJobInfo jobInfo)
{_jobs.TryRemove(jobInfo.Id, out _);return Task.FromResult(0);
}public Task UpdateAsync(BackgroundJobInfo jobInfo)
{// 如果是不再执行的任务删除if (jobInfo.IsAbandoned){return DeleteAsync(jobInfo);}return Task.FromResult(0);
}
至于持久化到数据库无非是注入一个仓储然后针对这个仓储进行增删查改的操作罢了这里就不在赘述。2.2.3 后台作业优先级后台作业的优先级定义在 BackgroundJobPriority 枚举当中一共有 5 个等级分别是 Low、BelowNormal、Normal、AboveNormal、High 他们从最低到最高排列。作者myzony出处https://www.cnblogs.com/myzony/p/9841601.html扫描二维码获取更多精彩码侠江湖喜欢就点个在看再走吧
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/90086.shtml
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!