黑彩网站建设旅游网站建设需求说明书

web/2025/10/3 16:56:18/文章来源:
黑彩网站建设,旅游网站建设需求说明书,企业网站建设高端品牌,wordpress文章无法置顶文章目录 整体设计processMail1.Checkpoint Tigger2.ProcessingTime Timer Trigger processInput兼容SourceStreamTask 整体设计 Mailbox线程模型通过引入阻塞队列配合一个Mailbox线程的方式#xff0c;可以轻松修改StreamTask内部状态的修改。Checkpoint、ProcessingTime Ti… 文章目录 整体设计processMail1.Checkpoint Tigger2.ProcessingTime Timer Trigger processInput兼容SourceStreamTask 整体设计 Mailbox线程模型通过引入阻塞队列配合一个Mailbox线程的方式可以轻松修改StreamTask内部状态的修改。Checkpoint、ProcessingTime Timer的相关操作Runnable任务会以Mail的形式保存到Mailbox内的阻塞队列中。StreamTask在invoke阶段的runMailboxLoop时期就会轮询Mailbox来处理队列中保存的MailMail处理完毕后才会对DataStream上的数据元素执行处理逻辑。 MailboxProcessor的能力就是负责拉取、处理Mail以及执行MailboxDefaultAction默认动作即processInput()方法中对DataStream上的普通消息的处理逻辑包括处理Event、barrier、Watermark等。 /*** 开始轮询Mailbox内的MailCheckpoint和ProcessingTime Timer的触发事件会以Runnable的形式作为Mail添加到Mailbox的队列中等待“Mailbox线程”去处理*/ public void runMailboxLoop() throws Exception {// 获取最新的TaskMailbox主要用于存储提交的Mail并提供获取接口。// TaskMailbox有2个队列// 1.queue阻塞队列通过ReentrantLock控制队列中的读写操作// 2.batch非阻塞队列调用createBatch()方法会将queue中的Mail转存到batch中这样读操作就能通过tryTakeFromBatch()方法从batch队列中批量获取Mail且只能被Mailbox线程消费final TaskMailbox localMailbox mailbox;// 检查当前线程是否为Mailbox线程即StreamTask运行时所在的主线程Preconditions.checkState(localMailbox.isMailboxThread(),Method must be executed by declared mailbox thread!);// 确认Mailbox的状态必须为OPENassert localMailbox.getState() TaskMailbox.State.OPEN : Mailbox must be opened!;// 创建MailboxController实例可以控制Mailbox的循环、临时暂停和恢复MailboxDefaultAction默认动作final MailboxController defaultActionContext new MailboxController(this);/*** 核心事件循环* processMail()方法会检测Mailbox中是否还有Mail需要处理新Mail会在ReentrantLock的保护下被添加到queue队列并转存到batch队列中。* MailboxProcessor处理完batch队列中的全部Mail后执行作为Mail的Runnable#run()方法才会进入到while循环内执行MailboxDefaultAction的默认动作* 即调用StreamTask#processInput()方法对读取到的数据Event、Barrier、Watermark等进行处理*/while (processMail(localMailbox)) {mailboxDefaultAction.runDefaultAction(defaultActionContext); // lock is acquired inside default action as needed} }可以看出对Mail和MailboxDefaultAction的处理是由唯一的Mailbox线程负责的。 processMail 在while轮询时首先会processMail /*** 处理Mailbox中的MailCheckpoint、ProcessingTime Timer的触发事件会以Runnable的形式作为Mail保存在Mailbox的queue队列中* 并在ReentrantLock的保护下将queue队列中的新Mail转移到batch队列中。MailboxProcessor会根据queue队列、batch队列内的Mail情况* 决定处理Mail or processInput。只有当TaskMailbox内的所有Mail全都处理完毕后MailboxProcessor才会去processInput()*/ private boolean processMail(TaskMailbox mailbox) throws Exception {/*** 新Mail写入queue队列TaskMailbox会将queue队列中的新Mail转移到batch队列中MailboxProcessor会根据queue队列、batch队列内的Mail情况* 判断执行Mail的run() or processInput()。只有当TaskMailbox内的所有Mail全部处理完成后MailboxProcessor才会去processInput()。*/if (!mailbox.createBatch()) {return true;}OptionalMail maybeMail;/*** 能走到这说明queue队列中的Mail已被全部转移至batch队列。现在要从batch队列中获取到Mail并执行它作为Runnable的run()方法* 直到batch队列中的所有Mail全都处理完毕*/while (isMailboxLoopRunning() (maybeMail mailbox.tryTakeFromBatch()).isPresent()) {maybeMail.get().run();}/**如果默认操作处于Unavailable状态那就先阻塞住直到它重新回归available状态*/while (isDefaultActionUnavailable() isMailboxLoopRunning()) {mailbox.take(MIN_PRIORITY).run();}// 返回Mailbox是否还在Loopreturn isMailboxLoopRunning(); }很核心的一个点就是Mailbox要去createBatchTaskMailboxImpl提供了具体的实现逻辑。Mailbox引入了2个队列新Mail被add到Mailbox内的queue队列中此过程受ReentrantLock保护。同时为了减少读取queue队列时的同步开销Mailbox还构建了一个batch队列专门用来后续消费避免加锁操作。 /*** 对DequeMail队列的读写通过ReentrantLock加以保护*/ private final ReentrantLock lock new ReentrantLock();/*** Internal queue of mails.* 使用Deque内部队列保存所有的Mail*/ GuardedBy(lock) private final DequeMail queue new ArrayDeque(); /*** 为了减少读取queue队列所造成的同步开销TaskMailbox会创建一个batch队列queue队列中的Mail会被转移到batch队列中* 有效避免了后续消费时的加锁操作*/ private final DequeMail batch new ArrayDeque();Override public boolean createBatch() {checkIsMailboxThread();/*** 如果queue队列中没有新Mail那就要看batch队列是否为空。* 1.如果batch也是空的Mailbox里已经没有任何Mail了需要去processInput()了那processMail()也会return true* MailboxProcessor就会进入到while循环内部执行processInput()来处理DataStream上的数据* 2.如果batch不空说明MailboxProcessor还需要继续processMail()即取出Mail执行它作为Runnable的run()方法* 由此可见Mailbox中的batch队列中的Mail最终一定会被Mailbox线程消耗殆尽轮询、处理然后才会去processInput()*/if (!hasNewMail) { // 只要queue队列里还有MailhasNewMail就为truereturn !batch.isEmpty();}/**能走到这说明queue队列中仍有新Mail接下来需要将它的新Mail向batch队列转移该过程受ReentrantLock保护*/final ReentrantLock lock this.lock;// 获取锁lock.lock();try {Mail mail;/**每次循环都将queue队列中的First Mail转移到batch队列中直至queue队列被消耗殆尽。此时一定return true*/while ((mail queue.pollFirst()) ! null) {batch.addLast(mail);}// 此时queue队列内的所有Mail都被转移到batch队列中了queue中没有新Mail了hasNewMail false;// 此时根据batch队列是否为空MailboxProcessor会判断执行Mail的run() or processInput()return !batch.isEmpty();} finally {// 最终释放锁lock.unlock();} }如果Mailbox内的queue队列中仍有新Mail那就在ReentrantLock的加持下将queue内的Mail全都转移到batch队列中如果Mailbox内的queue队列中没有新Mail那就看batch队列的情况了。决断权交给外层的MailboxProcessor总的来看 如果batch队列中有MailMailboxProcessor会从Mailbox内的batch队列中逐个pollFirst然后执行它作为Runnable#run()方法直到batch队列中的所有Mail全都被“消耗殆尽”为止如果batch队列中没有MailMailboxProcessor此时就没有Mail可处理了那就直接processInput 1.Checkpoint Tigger 对Checkpoint的触发是通过MailboxExecutor向Mailbox提交Mail的 /*** 触发执行StreamTask中的Checkpoint操作异步的通过MailboxExecutor将“执行Checkpoint”的请求封装成Mail后* 提交到TaskMailbox中最终由MailboxProcessor来处理*/ Override public FutureBoolean triggerCheckpointAsync(CheckpointMetaData checkpointMetaData,CheckpointOptions checkpointOptions,boolean advanceToEndOfEventTime) {// 通过MailboxExecutor将“触发执行Checkpoint”的具体逻辑封装成Mail提交到Mailbox中后期会被MailboxProcessor执行return mailboxProcessor.getMainMailboxExecutor().submit(// 触发Checkpoint的具体逻辑() - triggerCheckpoint(checkpointMetaData, checkpointOptions, advanceToEndOfEventTime),checkpoint %s with %s,checkpointMetaData,checkpointOptions); }triggerCheckpoint操作会被封装成Mail添加到Mailbox中等待被处理。 Override public void execute(Nonnull final RunnableWithException command,final String descriptionFormat,final Object... descriptionArgs) {try {mailbox.put(new Mail(command, priority, actionExecutor, descriptionFormat, descriptionArgs));} catch (IllegalStateException mbex) {throw new RejectedExecutionException(mbex);} }当然Checkpoint的完成操作也是同样的套路。 2.ProcessingTime Timer Trigger /*** 借助Mailbox线程模型由MailboxExecutor负责将ProcessingTime Timer触发的消息封装成Mail提交到TaskMailbox中后续由MailboxProcessor处理*/ public ProcessingTimeService getProcessingTimeService(int operatorIndex) {Preconditions.checkState(timerService ! null, The timer service has not been initialized.);MailboxExecutor mailboxExecutor mailboxProcessor.getMailboxExecutor(operatorIndex);// 通过MailboxExecutor将Mail提交到Mailbox中等待处理return new ProcessingTimeServiceImpl(timerService, callback - deferCallbackToMailbox(mailboxExecutor, callback)); }private ProcessingTimeCallback deferCallbackToMailbox(MailboxExecutor mailboxExecutor, ProcessingTimeCallback callback) {return timestamp - {mailboxExecutor.execute(() - invokeProcessingTimeCallback(callback, timestamp),Timer callback for %s %d,callback,timestamp);}; }processInput StreamInputProcessor会对输入的数据进行处理、输出包含StreamTaskInput OperatorChain DataOutput。每次processInput都相当于是在处理一个有界流外层MailboxProcessor在不断地的轮询处理完DataStream上的StreamRecord后会返回InputStatus的枚举值根据InputStatus值来决定下一步该“何去何从”。 /*** StreamTask的执行逻辑处理输入的数据返回InputStatus状态并根据InputStatus决定是否需要结束当前Task。* 该方法会通过MailboxProcessor调度、执行作为MailboxProcessor的默认动作底层调用StreamInputProcessor#processInput()方法*/ protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {/*** 核心借助StreamInputProcessor完成数据的读取并交给算子处理处理完毕后会返回InputStatus。* 每次触发相当于处理一个有界流在外层Mailbox拉取Mail才是while循环无限拉取*/InputStatus status inputProcessor.processInput();/*** case 1上游如果还有数据 RecordWriter是可用的立即返回。意为继续处理*/if (status InputStatus.MORE_AVAILABLE recordWriter.isAvailable()) {return;}/*** case 2当状态为END_OF_INPUT说明本批次的有界流数据已经处理完毕* 通过MailboxCollector来告诉Mailbox*/if (status InputStatus.END_OF_INPUT) {controller.allActionsCompleted();return;}/*** case 3当前有界流中没有数据但未来可能会有。此时处理线程会被挂起直到有新的可用数据到来 RecordWriter可用* 此时会先临时暂停对MailboxDefaultAction的处理等啥时候又有新数据了再重新恢复MailboxDefaultAction的处理。*/CompletableFuture? jointFuture getInputOutputJointFuture(status);// 通过MailboxCollector让Mailbox线程暂时停止对MailboxDefaultAction的处理MailboxDefaultAction.Suspension suspendedDefaultAction controller.suspendDefaultAction();// 等啥时候又有了input、outputRecordWriter也变得可用了以后再重新继续执行默认操作jointFuture.thenRun(suspendedDefaultAction::resume); }MailboxController是MailboxDefaultAction和Mailbox之间交互的桥梁在StreamTask处理DataStream元素的过程中会利用MailboxController将处理状态及时通知给Mailbox。如果这批有界流处理完毕就会通过MailboxController通知Mailbox本质就是向Mailbox发送一个Mail进行下一轮的处理。 private void sendControlMail(RunnableWithException mail, String descriptionFormat, Object... descriptionArgs) {mailbox.putFirst(new Mail(mail,Integer.MAX_VALUE /*not used with putFirst*/,descriptionFormat,descriptionArgs)); }兼容SourceStreamTask 作为DataStream Source是专门用来生产无界流数据的并不能穿插兼顾Mailbox内Mail的检测。如果仅有一个线程生产无界流数据的话那将永远无法检测Mailbox内的Mail。作为StreamTask的子类SourceStreamTask会额外启动另一个独立的LegacySourceFunctionThread线程来执行SourceFunction中的循环生产无界流Mailbox线程主线程依然负责处理Mail和默认操作。 /*** 专门为Source源生产数据的线程*/ private class LegacySourceFunctionThread extends Thread {private final CompletableFutureVoid completionFuture;LegacySourceFunctionThread() {this.completionFuture new CompletableFuture();}Overridepublic void run() {try {// CheckpointLock保证线程安全headOperator.run(getCheckpointLock(), getStreamStatusMaintainer(), operatorChain);completionFuture.complete(null);} catch (Throwable t) {// Note, t can be also an InterruptedExceptioncompletionFuture.completeExceptionally(t);}}public void setTaskDescription(final String taskDescription) {setName(Legacy Source Thread - taskDescription);}CompletableFutureVoid getCompletionFuture() {return isFailing() !isAlive() ? CompletableFuture.completedFuture(null) : completionFuture;} }负责为Source生产无界流数据的LegacySourceFunctionThread线程启动后不管是启动成功 or 出现异常都会封装对应的Mail并发送给Mailbox而Mailbox线程的processMail一直在等待处理Mail。 /*** SourceStreamTask中一个Thread负责专门生产无界流另一个MailBox Thread处理Checkpoint、ProcessingTime Timer等事件Mail*/ Override protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {/*** 通过MailboxDefaultAction.Controller告诉Mailbox让MailboxThread先暂停处理MailboxDefaultAction。* TaskMailbox收到该消息后就会在processMail()中一直等待并处理Mail在MailboxThread中会一直处理Mail*/controller.suspendDefaultAction();/**启动LegacySourceFunctionThread线程专门生产Source无界流数据的和MailboxThread线程一起运行*/sourceThread.setTaskDescription(getName());sourceThread.start();/**LegacySourceFunctionThread线程启动后会通知MailboxMailbox会在processMail()中一直等待并处理mail不会返回即Mailbox线程会一直处理mail*/sourceThread.getCompletionFuture().whenComplete((Void ignore, Throwable sourceThreadThrowable) - {/**LegacySourceFunctionThread线程启动过程中发生的任何异常、以及启动成功都会以Mail的形式发送给Mailbox*/if (isCanceled() ExceptionUtils.findThrowable(sourceThreadThrowable, InterruptedException.class).isPresent()) {mailboxProcessor.reportThrowable(new CancelTaskException(sourceThreadThrowable));} else if (!isFinished sourceThreadThrowable ! null) {mailboxProcessor.reportThrowable(sourceThreadThrowable);} else {mailboxProcessor.allActionsCompleted();}}); }Mailbox主线程和LegacySourceFunctionThread线程线程都在运行通过CheckpointLock锁来保证线程安全。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/86323.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

怀化同城网站找工作网站

摘要:PDO查询中,2个绑定操作:参数绑定与列绑定;参数绑定:bindParm() 和 bindValue();bindParm(:占位符,变量,类型常量) 类型常量默认为字符串bindValue(:占位符,值或变量,类型常量) 如果直接传值,可省略类型…

政务网站集约化建设难点与建议徐州建设集团有限公司

问题:当我们要添加缓存时,如果我们用了PageHelper时,PageHelper只会对查询语句有效(使用到sql的查询),那么如果我们把查询到的数据都添加到缓存时,就会无法进行分页; 此时我们选择将…

徐州建站网站模板合肥 网站设计

n 座城市,从 0 到 n-1 编号,其间共有 n-1 条路线。因此,要想在两座不同城市之间旅行只有唯一一条路线可供选择(路线网形成一颗树)。去年,交通运输部决定重新规划路线,以改变交通拥堵的状况。 路…

网站建设锚点链接祭祖网站怎么做

自从最初的IEEE 802.3af 1型以太网供电(PoE)标准于2003年首次推出以来,该技术已经发展到包括2型(高达30瓦)、3型(高达60瓦)和4型(高达90瓦)。这意味着PoE电压现在支持从手…

如何创建一个个人网站wordpress 标题入库

接着上一篇不同场景下JVM调优手段及代码优化建议,接着来JVM调优可配置参数及配置时机和原则。以在JDK 8为例,JVM提供了一系列的可配置参数,这些参数可以帮助开发者和系统管理员针对不同的应用场景进行性能调优。以下是按维度划分的一些关键参数及其用途、…

国外服装设计网站seo优化报价公司

先贴代码,代码精简了。$invoker_function($argus);}}?>描述:程序是在ThinkPHP开发,目的是把Cache的get方法接收的参数转发到指定的方法上,最后一行:其中D方法是ThinkPHP自带的方法用的是单例模式。如果不加参数$ar…

公司网站申请书wordpress怎么建app

文章目录 前言一、笔试和性格测试二、面试2.1 技术面2.2 hr面前言 实习:笔试 + 1轮技术面 + 1轮主管面 一、笔试和性格测试 笔试题链接 1、网上可以找到很多以往的题目,需要注意的是数字芯片岗位会考到很多验证、中后端的知识,主打一个全栈,不要只看设计或者只看验证的东…

公司建网站多少钱qcjxkd百度收录什么网站吗

前言 一个针对深度学习应用优化的 GPU 加速库。它提供了高性能、高可靠性的加速算法,旨在加速深度神经网络模型的训练和推理过程。 cuDNN 提供了一系列优化的基本算法和函数,包括卷积、池化、规范化、激活函数等,以及针对深度学习任务的高级功…

网站建设项目说明书松岗做网站

前言:大数据相关的技术名词特别多,这些技术栈之间的关系是什么,对初学者来说很难找到抓手。我一开始从后端转大数据的时候有点懵逼,整体接触了一遍之后才把大数据技术栈给弄明白了。 一、大数据技术栈 做大数据开发,无…

网站设计中的js网站流量刷

原文1:https://cloud.tencent.com/developer/article/1151834 原文2:https://www.cnblogs.com/zhaohuhu/p/9140673.html转载于:https://www.cnblogs.com/olivertian/p/10982658.html

规划管理部门的网站建设保定建网站需要多少钱

2018-03-05 14:06:40 问题描述:给出一个数据流,这个数据流的长度很大或者未知。并且对该数据流中数据只能访问一次。请写出一个随机选择算法,使得数据流中所有数据被选中的概率相等。 问题求解:如果是长度已知或者有限的问题&…

麦壳云网站建设推广思路及执行方案

了解ISO模型:构建通信的蓝图 为了促进网络应用的普及,国际标准化组织(ISO)引入了开放式系统互联(Open System Interconnect,OSI)模型。这个模型包括了七个层次,从底层的物理连接到顶…

哈尔滨网站域名部门中学网站源码

技术方法 数据映射的技术方法主要包括以下几种: 手工法: 手工法涉及开发人员手动编码数据源和目标架构之间的链接。这通常使用如XSLT这样的计算机语言来编写代码,将XML文档翻译成各种格式。然而,随着数据系统的扩展和复杂化&…

东莞高端品牌网站建设价格自己做网站需要做啥

Android.mk是Android提供的一个makefile文件,可以将源文件分组为模块。用来引用的头文件目录、需要编译的*.c/.cpp文件、jni源文件、指定编译生成.so共享库文件或者*.a静态库文件,可以定义一个或多个模块,也可以多个模块中使用同一个源文件&a…

遵义网站中山h5网站建设

以下内容为自己的理解,如有错误请指出。 连通 连通和电路中的导通一样。 注意:连通可以是直接连通,也可以经过其他节点后再连通。只要能导通就叫连通。 连通图 任意两个节点间都有路径的图,叫做连通图。 在无向图中&#xff…

宣传网站建设背景谷歌推广开户多少费用

RSI指数的计算非常简单,就是使用一段时间内的平均上涨除以平均上涨加平均下跌(取正值)。也就意味着RSI指数的取值是[0,100]之间,其中0表示周期内没有上涨的,100表示周期内没有下跌的。RSI的直观意义是它表示了一段周期…

做外贸怎样免费登录外国网站中国建筑网最新招聘

目录 044 递归 e04 冒泡排序2 044 递归 e05 插入排序1 044 递归 e05 插入排序2 045 多路递归 斐波那契 046 多路递归 斐波那契 时间复杂度 047 多路递归 斐波那契 兔子问题 048 多路递归 斐波那契 青蛙跳台阶 049 递归 优化 记忆法 050 递归 爆栈问题 051 递归 尾调用…

德州极速网站建设百家号站酷海洛设计网站官网

然后来看字典高级,首先 打印某个元素 然后打印的时候注意,如果直接打印的值,在字典中没有就报错 这里要注意不能用点访问

长沙好的网站建设公司品牌网站开发设计

基于 IntelliJ 平台的 JetBrains IDE 可能是当今最常见的 IDE 之一。它们的受欢迎程度在 JVM 语言社区中尤其明显,IntelliJ IDEA 仍然是大多数开发人员的首选 IDE。所有这一切都是在一些新竞争对手的出现和老竞争对手克服以前的缺点并重新加入竞争者的情况下实现的。…

响应式网站发展机器人软件开发平台

点击蓝字关注我们课程链接:http://video.jessetalk.cn/course/explore良心课程,大家一起来学习哈!任务16:介绍1、依赖注入概念详解从UML和软件建模来理解从单元测试来理解2、ASP.NET Core 源码解析任务17:从UML角度来理…