个人做网站还是公众号赚钱好高校网络架构
web/
2025/10/8 12:38:47/
文章来源:
个人做网站还是公众号赚钱好,高校网络架构,网络科技公司名字起名大全,绍兴网络科技有限公司zeebe actor 模型#x1f64b;♂️
如果有阅读过zeebe 源码的朋友一定能够经常看到actor.run() 之类的语法#xff0c;那么这篇文章就围绕actor.run 方法#xff0c;说说zeebe actor 的模型。
环境⛅
zeebe release-8.1.14
actor.run() 是怎么开始的#x1f308; Lon…zeebe actor 模型♂️
如果有阅读过zeebe 源码的朋友一定能够经常看到actor.run() 之类的语法那么这篇文章就围绕actor.run 方法说说zeebe actor 的模型。
环境⛅
zeebe release-8.1.14
actor.run() 是怎么开始的 LongPollingActivateJobsHandler.java
以LongPollingActivateJobsHandler 的激活任务方法为例我们可以看到run 方法实际上执行ActorControl类的run 方法让我们进到run 方法中。 private ActorControl actor;public void activateJobs(final InflightActivateJobsRequest request) {actor.run(() - {final InFlightLongPollingActivateJobsRequestsState state getJobTypeState(request.getType());if (state.shouldAttempt(failedAttemptThreshold)) {activateJobsUnchecked(state, request);} else {completeOrResubmitRequest(request, false);}});}ActorControl
可以看到scheduleRunnable 的目标是构造ActorJob,然后将job 添加到ActorTask 中添加的方式分为insert 和submit。其实到这里我们就可以认为actor.run 就已经结束了因为insert 和submit 方法主要就是将job 添加到task 的jobQueues 中对于job 的执行要等到队列不断被线程pop 到当前job 的时候。 final ActorTask task;Overridepublic void run(final Runnable action) {scheduleRunnable(action);}private void scheduleRunnable(final Runnable runnable) {final ActorThread currentActorThread ActorThread.current();if (currentActorThread ! null currentActorThread.getCurrentTask() task) {final ActorJob newJob currentActorThread.newJob();newJob.setRunnable(runnable);newJob.onJobAddedToTask(task);// 插入到执行队列task.insertJob(newJob);} else {final ActorJob job new ActorJob();job.setRunnable(runnable);job.onJobAddedToTask(task);// 提交到外部队列// submit 实际上是将task 放到thread group 里边task.submit(job);}}job 是怎么被执行的⚡
并不是任意一个ActorControl 都可以执行run 方法的按照上图所示Actor 会在broker 生命周期开始要进行注册 ,也就是说ActorControl 中的task 会注册到taskQueues。然后“线程池”不断从taskQueues 中pop 出task每个task 中又会有多个job按照策略选取不同的job 执行我们可以认为job 就是actor.run(Runnable runnable) 中的runnable。
Gateway.java
gateway 注册task private CompletableFutureActivateJobsHandler submitActorToActivateJobs(final ActivateJobsHandler handler) {final var future new CompletableFutureActivateJobsHandler();final var actor Actor.newActor().name(ActivateJobsHandler).actorStartedHandler(handler.andThen(t - future.complete(handler))).build();// 将task 注册到TaskQueuesactorSchedulingService.submitActor(actor);return future;}ActorThreadGroup.java
就是上面提到的“线程池”负责初始化每一条ActorThread 线程并为其分配默认的WorkStealingGroup protected final String groupName;protected final ActorThread[] threads;protected final WorkStealingGroup tasks;protected final int numOfThreads;// 构造器初始化每条线程并为其分配一个默认的WorkStealingGroup 任务队列public ActorThreadGroup(final String groupName, final int numOfThreads, final ActorSchedulerBuilder builder) {this.groupName groupName;this.numOfThreads numOfThreads;tasks new WorkStealingGroup(numOfThreads);threads new ActorThread[numOfThreads];for (int t 0; t numOfThreads; t) {final String threadName String.format(%s-%d, groupName, t);final ActorThread thread builder.getActorThreadFactory().newThread(threadName,t,this,tasks,builder.getActorClock(),builder.getActorTimerQueue(),builder.isMetricsEnabled());threads[t] thread;}}// startpublic void start() {for (final ActorThread actorThread : threads) {// 启动每一个ActorThreadactorThread.start();}}ActorThread.java
ActorThread 继承自Thread可以看到startrundoWork 的引用流程在doWork 方法中首先从taskScheduler 中获取当前task然后执行当前task
// 继承自Thread Overridepublic synchronized void start() {if (UNSAFE.compareAndSwapObject(this, STATE_OFFSET, ActorThreadState.NEW, ActorThreadState.RUNNING)) {// super.start 会执行下面的run 方法super.start();} else {throw new IllegalStateException(Cannot start runner, not in state NEW.);}}// 主要执行doWork 方法Overridepublic void run() {idleStrategy.init();while (state ActorThreadState.RUNNING) {try {doWork();} catch (final Exception e) {LOG.error(Unexpected error occurred while in the actor thread {}, getName(), e);}}state ActorThreadState.TERMINATED;terminationFuture.complete(null);}private void doWork() {submittedCallbacks.drain(this);if (clock.update()) {timerJobQueue.processExpiredTimers(clock);}// 从taskScheduler 中获取当前taskcurrentTask taskScheduler.getNextTask();if (currentTask ! null) {final var actorName currentTask.actor.getName();try (final var timer actorMetrics.startExecutionTimer(actorName)) {// 执行当前任务executeCurrentTask();}if (actorMetrics.isEnabled()) {actorMetrics.updateJobQueueLength(actorName, currentTask.estimateQueueLength());actorMetrics.countExecution(actorName);}} else {idleStrategy.onIdle();}}private void executeCurrentTask() {final var properties currentTask.getActor().getContext();MDC.setContextMap(properties);idleStrategy.onTaskExecuted();boolean resubmit false;try {// 真正执行当前任务resubmit currentTask.execute(this);} catch (final Throwable e) {FATAL_ERROR_HANDLER.handleError(e);LOG.error(Unexpected error occurred in task {}, currentTask, e);} finally {MDC.remove(actor-name);clock.update();}if (resubmit) {currentTask.resubmit();}}ActorTask.java
ActorTask 的执行流程它会不断从订阅的列表中拉取jobpoll 方法会更新当前currentJob, 如果一次逻辑执行中从fastlaneJobs 中poll 到了任务那么currentJob ! null 会短路返回true而不进行poll()从这里可以看到submittedJobs 和fastlaneJobs 的区别
找到job 后开始执行当前job
public boolean execute(final ActorThread runner) {schedulingState.set(TaskSchedulingState.ACTIVE);boolean resubmit false;// 不断从订阅的列表中拉取jobpoll 方法会更新当前currentJob, 如果一次逻辑执行中从fastlaneJobs 中poll 到了任务那么currentJob ! null 会短路返回true而不进行poll()while (!resubmit (currentJob ! null || poll())) {currentJob.execute(runner);switch (currentJob.schedulingState) {case TERMINATED - {final ActorJob terminatedJob currentJob;// 从fastlaneJobs任务集合中拉取任务currentJob fastLaneJobs.poll();// 如果是通过订阅触发的任务if (terminatedJob.isTriggeredBySubscription()) {final ActorSubscription subscription terminatedJob.getSubscription();// 如果订阅是一次性的那么在订阅触发后则将订阅移除if (!subscription.isRecurring()) {removeSubscription(subscription);}// 执行订阅的回调任务subscription.onJobCompleted();} else {runner.recycleJob(terminatedJob);}}case QUEUED -// the task is experiencing backpressure: do not retry it right now, instead re-enqueue// the actor task.// this allows other tasks which may be needed to unblock the backpressure to runresubmit true;default - {}}if (shouldYield) {shouldYield false;resubmit currentJob ! null;break;}}if (currentJob null) {resubmit onAllJobsDone();}return resubmit;}private boolean poll() {boolean result false;result | pollSubmittedJobs();result | pollSubscriptions();return result;}
ActorJob.java
ActorJob 的执行逻辑
还记得上面说过ActorJob 可以理解为runnable 的吗在invoke 中ActorJob 中的runnable 真正执行了至此job 的执行过程结束 void execute(final ActorThread runner) {actorThread runner;observeSchedulingLatency(runner.getActorMetrics());try {// 执行actor 的 callable 或者 runnable 方法invoke();if (resultFuture ! null) {resultFuture.complete(invocationResult);resultFuture null;}} catch (final Throwable e) {FATAL_ERROR_HANDLER.handleError(e);task.onFailure(e);} finally {actorThread null;// 无论那种情况成功或者失败都要判断是否job 应该被resubmitted// in any case, success or exception, decide if the job should be resubmittedif (isTriggeredBySubscription() || runnable null) {schedulingState TaskSchedulingState.TERMINATED;} else {schedulingState TaskSchedulingState.QUEUED;scheduledAt System.nanoTime();}}}private void invoke() throws Exception {if (callable ! null) {invocationResult callable.call();} else {// only tasks triggered by a subscription can yield; everything else just executes onceif (!isTriggeredBySubscription()) {final Runnable r runnable;runnable null;r.run();} else {// runnable 真正执行runnable.run();}}}总结
本文中的激活例子其实只是列举了Actor 的实现原理想一想文中提到的功能用一个真正的线程池可以很好的解决。但是actor模型 的特性远不仅如此对于其他特性在zeebe 中是如何实现的还请读者自己去挖掘~
zeebe 团队真的是太喜欢functional programming了找一个方法的调用链头都大了
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/89057.shtml
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!