网站模版asp北京建网站公司

news/2025/10/7 10:03:55/文章来源:
网站模版asp,北京建网站公司,上海做网站建设,wordpress主题对接支付文章目录 零. RpcService服务概述1. AkkaRpcService的创建和初始化2.通过AkkaRpcService初始化RpcServer3. ResourceManager中RPC服务的启动4. 实现相互通讯能力 零. RpcService服务概述 RpcService负责创建和启动Flink集群环境中RpcEndpoint组件的RpcServer#xff0c;且Rpc… 文章目录 零. RpcService服务概述1. AkkaRpcService的创建和初始化2.通过AkkaRpcService初始化RpcServer3. ResourceManager中RPC服务的启动4. 实现相互通讯能力 零. RpcService服务概述 RpcService负责创建和启动Flink集群环境中RpcEndpoint组件的RpcServer且RpcService在启动集群时会提前创建好。AkkaRpcService作为RpcService的唯一实现类基于Akka的ActorSystem进行封装为不同的RpcEndpoint创建相应的ActorRef实例。 RpcService主要包含如下两个重要方法。 startServer()用于启动RpcEndpoint中的RpcServer。RpcServer实际上就是对Actor进行封装启动完成后RpcEndpoint中的RpcServer就能够对外提供服务了。connect()用于连接远端RpcEndpoint并返回给调用方RpcGateway接口的方法建立连接后RPC客户端就能像本地一样调用RpcServer提供的RpcGateway接口了。 例如在JobMaster组件中创建与ResourceManager组件之间的RPC连接时。此时可以通过Akka发送消息到ResourceManager的RpcServer中这样就使得JobMaster像调用本地方法一样在ResourceManager中执行请求任务。 1. AkkaRpcService的创建和初始化 在创建和启动ClusterEntrypoint及TaskManagerRunner的过程中会调用AkkaRpcServiceUtils.createRpcService()方法创建默认的AkkaRpcService接着启动RpcServer。 例如管理节点中会使用AkkaRpcService实例创建并启动ResourceManager、Dispatcher以及JobMaster等RPC服务。 创建AkkaRpcService主要包括如下步骤。 在ClusterEntrypoint中创建RpcService。启动ActorSystem服务。创建RobustActorSystem。RobustActorSystem实际上是对Akka的ActorSystem进行了封装和拓展相比于原生Akka ActorSystemRobustActorSystem包含了UncaughtExceptionHandler组件能够对ActorSystem抛出的异常进行处理。使用RobustActorSystem创建AkkaRpcService实例。将AkkaRpcService返回到ClusterEntrypoint中用于启动集群中各个RpcEndpoint组件服务 2.通过AkkaRpcService初始化RpcServer 在集群运行时中创建了共用的AkkaRpcService服务相当于创建了Akka系统中的ActorSystem接下来就是使用AkkaRpcService启动各个RpcEndpoint中的RpcServer实例。AkkaRpcService服务作为共用的rpc服务启动其他各个组件的RpcServer实例 这里先看通过AkkaRpcService初始化RpcEndpoint对应的RpcServer的逻辑。如下在org.apache.flink.runtime.rpc.RpcEndpoint的构造器中执行了RpcServer的初始化 protected RpcEndpoint(final RpcService rpcService, final String endpointId) {this.rpcService checkNotNull(rpcService, rpcService);this.endpointId checkNotNull(endpointId, endpointId);// 初始化RpcEndpoint中的RpcServerthis.rpcServer rpcService.startServer(this);this.mainThreadExecutor new MainThreadExecutor(rpcServer, this::validateRunsInMainThread); }具体看下rpcService.startServer(this) 启动rpcServer的逻辑 ActorSystem创建相应Actor的ActorRef引用类。创建完毕后会将RpcEndpoint和ActorRef信息存储在Actor键值对集合中。启动RpcEndpoint对应的RPC服务首先获取当前RpcEndpoint实现的RpcGateways接口。 RpcGateway接口最终通过RpcUtils.extractImplementedRpcGateways()方法从类定义抽取出来例如JobMaster组件会抽取JobMasterGateway接口定义。创建InvocationHandler代理类根据InvocationHandler代理类提供的invoke()方法实现被代理类的具体方法。根据RpcEndpoint是否为FencedRpcEndpointInvocationHandler分为FencedAkkaInvocationHandler和AkkaInvocationHandler两种类型。 FencedMainThreadExecutable代理的接口主要有FencedMainThreadExecutable和FencedRpcGateway两种。 AkkaInvocationHandler主要代理实现AkkaBasedEndpoint、RpcGateway、StartStoppable、MainThreadExecutable、RpcServer等接口。 创建好InvocationHandler代理类后通过反射的方式Proxy.newProxyInstance()创建代理类。创建的代理类会被转换为RpcServer实例再返回给RpcEndpoint使用。 在RpcServer创建的过程中可以看出实际上包含了创建RpcEndpoint中的Actor引用类ActorRef和AkkaInvocationHandler动态代理类。最后将动态代理类转换为RpcServer接口返回给RpcEndpoint实现类此时实现的组件就能够获取到RpcServer服务且通过RpcServer代理了所有的RpcGateways接口提供了本地方法调用和远程方法调用两种模式。 Override public C extends RpcEndpoint RpcGateway RpcServer startServer(C rpcEndpoint) { checkNotNull(rpcEndpoint, rpc endpoint); final SupervisorActor.ActorRegistration actorRegistration registerAkkaRpcActor(rpcEndpoint); final ActorRef actorRef actorRegistration.getActorRef(); final CompletableFutureVoid actorTerminationFuture actorRegistration.getTerminationFuture(); //启动RpcEndpoint对应的RPC服务LOG.info( Starting RPC endpoint for {} at {} ., rpcEndpoint.getClass().getName(), actorRef.path()); final String akkaAddress AkkaUtils.getAkkaURL(actorSystem, actorRef); final String hostname; OptionString host actorRef.path().address().host(); if (host.isEmpty()) { hostname localhost; } else { hostname host.get(); } //解析EpcEndpoint实现的所有RpcGateway接口SetClass? implementedRpcGateways new HashSet(RpcUtils.extractImplementedRpcGateways(rpcEndpoint.getClass())); //额外添加RpcServer和AkkaBasedEndpoint类implementedRpcGateways.add(RpcServer.class); implementedRpcGateways.add(AkkaBasedEndpoint.class); final InvocationHandler akkaInvocationHandler; //根据是否是FencedRpcEndpoint创建不同的动态代理对象if (rpcEndpoint instanceof FencedRpcEndpoint) { // a FencedRpcEndpoint needs a FencedAkkaInvocationHandler akkaInvocationHandler new FencedAkkaInvocationHandler( akkaAddress, hostname, actorRef, configuration.getTimeout(), configuration.getMaximumFramesize(), actorTerminationFuture, ((FencedRpcEndpoint?) rpcEndpoint)::getFencingToken, captureAskCallstacks); implementedRpcGateways.add(FencedMainThreadExecutable.class); } else { akkaInvocationHandler new AkkaInvocationHandler( akkaAddress, hostname, actorRef, configuration.getTimeout(), configuration.getMaximumFramesize(), actorTerminationFuture, captureAskCallstacks); } // Rather than using the System ClassLoader directly, we derive the ClassLoader // from this class . That works better in cases where Flink runs embedded and all Flink // code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoader ClassLoader classLoader getClass().getClassLoader(); SuppressWarnings(unchecked) RpcServer server (RpcServer) Proxy.newProxyInstance( classLoader, implementedRpcGateways.toArray( new Class?[implementedRpcGateways.size()]), akkaInvocationHandler); return server; }3. ResourceManager中RPC服务的启动 RpcServer在RpcEndpoint的构造器中完成初始化后接下来就是启动RpcEndpoint和RpcServer这里以ResourceManager为例进行说明。 在启动集群时看下如何启动ResourceManager的RPC服务的。如下调用链 ClusterEntrypoint.startCluster-runCluster -dispatcherResourceManagerComponentFactory.create -resourceManager.start();public final void start() { rpcServer.start(); }继续探索RPC服务是如何启动的 首先在DefaultDispatcherResourceManagerComponentFactory中调用ResourceManager.start()方法启动ResourceManager实例此时在ResourceManager.start()方法中会同步调用RpcServer.start()方法启动ResourceManager所在RpcEndpoint中的RpcServer如下。 调用ResourceManager.start()方法此时会调用RpcEndpoint.start()父方法启动ResourceManager组件的RpcServer。通过动态代理AkkaInvocationHandler.invoke()方法执行流程发现调用的是StartStoppable.start()方法此时会直接调用AkkaInvocationHandler.start()本地方法。在AkkaInvocationHandler.start()方法中实际上会调用rpcEndpoint.tell(ControlMessages.START,ActorRef.noSender())方法向ResourceManager对应的Actor发送控制消息表明当前Actor实例可以正常启动并接收来自远端的RPC请求。AkkaRpcActor调用handleControlMessage()方法处理ControlMessages.START控制消息。将AkkaRpcActor中的状态更新为StartedState此时ResourceManager的RpcServer启动完成ResourceManager组件能够接收来自其他组件的RPC请求。 在flink1.12中省略了AkkaInvocationHandler的干预。 经过以上步骤指定组件的RpcEndpoint节点就正常启动此时RpcServer会作为独立的线程运行在JobManager或TaskManager进程中处理本地和远程提交的RPC请求。 4. 实现相互通讯能力 当AkkaRpcService启动RpcEndpoint中的RpcServer后RpcEndpoint组件仅能对外提供处理RPC请求的能力RpcEndpoint组件需要在启动后向其他组件注册自己的RpcEndpoint信息并完成组件之间的RpcConnection注册才能相互访问和通信。而创建RPC连接需要调用RpcService.connect()方法。 如代码所示在AkkaRpcService.connect()方法中完成了RpcConnection对象的创建。 Override public C extends RpcGateway CompletableFutureC connect( final String address, final ClassC clazz) { return connectInternal( address, clazz, (ActorRef actorRef) - { Tuple2String, String addressHostname extractAddressHostname(actorRef); return new AkkaInvocationHandler( addressHostname.f0, addressHostname.f1, actorRef, configuration.getTimeout(), configuration.getMaximumFramesize(), null, captureAskCallstacks); }); }具体看AkkaRpcService.connectInternal()方法逻辑。 获取ActorRef引用对象。调用Patterns.ask()方法向actorRef对应的RpcEndpoint节点发送RemoteHandshakeMessage消息确保连接的RpcEndpoint节点正常如果成功则RpcEndpoint会返回HandshakeSuccessMessage消息。调用invocationHandlerFactory创建invocationHandler动态代理类此时可以看到传递的接口列表为new Class?[]{clazz}也就是当前RpcEndpoint需要访问的RpcGateway接口。例如JobMaster访问ResourceManager时这里就是ResourceManagerGateway接口。 private C extends RpcGateway CompletableFutureC connectInternal( final String address, final ClassC clazz, FunctionActorRef, InvocationHandler invocationHandlerFactory) { checkState(!stopped, RpcService is stopped); LOG.debug( Try to connect to remote RPC endpoint with address {}. Returning a {} gateway., address, clazz.getName()); //获取actorRef实例 final CompletableFutureActorRef actorRefFuture resolveActorAddress(address); //进行handshake操作确保需要连接的RpcEndpoint节点正常 final CompletableFutureHandshakeSuccessMessage handshakeFuture actorRefFuture.thenCompose( (ActorRef actorRef) - FutureUtils.toJava( //调用Patterns.ask()方法向actorRef对应的//RpcEndpoint节点发送RemoteHandshakeMessage消息//确保连接的RpcEndpoint节点正常如果成功则//RpcEndpoint会返回HandshakeSuccessMessage消息。 Patterns.ask( actorRef, new RemoteHandshakeMessage( clazz, getVersion()), configuration.getTimeout().toMilliseconds()) .HandshakeSuccessMessagemapTo( ClassTag$.MODULE$ .HandshakeSuccessMessageapply( HandshakeSuccessMessage .class)))); //创建RPC动态代理类 return actorRefFuture.thenCombineAsync( handshakeFuture, (ActorRef actorRef, HandshakeSuccessMessage ignored) - { InvocationHandler invocationHandler invocationHandlerFactory.apply(actorRef); // Rather than using the System ClassLoader directly, we derive the ClassLoader // from this class . That works better in cases where Flink runs embedded and // all Flink // code is loaded dynamically (for example from an OSGI bundle) through a custom // ClassLoader ClassLoader classLoader getClass().getClassLoader(); SuppressWarnings(unchecked) C proxy (C) Proxy.newProxyInstance( classLoader, new Class?[] {clazz}, invocationHandler); return proxy; }, actorSystem.dispatcher()); } 经过以上步骤实现了创建RpcEndpoint组件之间的RPC连接此时集群RPC组件之间可以进行相互访问例如JobMaster可以向ResourceManager发送Slot资源请求。 RPC 服务启动的 Akka actor 能接收来自RpcGateway RPC 调用。 参考《Flink设计与实现核心原理与源码解析》–张利兵

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/930285.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

手机网站判断跳转淄博周村学校网站建设报价

前言 本文由于介绍Android多线程编程的学习。 线程基本用法 定义线程有两种方式,分别是继承Thread类、实现Runnable接口: 继承Thread类:只需新建一个类继承自Thread,然后重写父类的run()方法,在这个方法里面写耗时…

为什么有些网站看不到百度快照南宁网站制作哪家好

对于做互联网的朋友们来说,引流是一个必不可少的环节。 掌握一种优秀的引流方法至关重要,这也可以视为我们的生计之源。 今天,我将向大家介绍一款全自动的引流工具——抖音全自动引流脚本软件。 这款软件的效果非常显著,它可以替…

怎么做新网站才能被百度收录有哪些官网做得比较好

版本与信息查询 docker --version:查看安装的Docker版本。 docker info:获取Docker系统的详细配置信息。 镜像管理 docker images:列出本地所有镜像。 docker search IMAGE_NAME:搜索Docker Hub上的镜像。 docker pull IMAGE_NAME[:TAG]:从仓库下载指定镜像。 docker rmi …

企业网站维护的要求包括前端自我介绍面试技巧

Python的正则表达式使用 定义使用场景查替换分割 常用的正则表达符号查原字符英文状态的句号点 .反斜杠 \英文的[]英文的()英文的?加号 星号 *英文状态的大括号 {} 案例 定义 正则表达式是指专门用于描述或刻画字符串内在规律的表达式。 使用场景 无法通过切片,…

大模型部署

部署云服务部署 优势:前期成本低,维护简单 劣势:数据不安全,长期使用成本高本地机器部署 优势:数据安全,长期成本低 劣势:初期成本高,维护困难他人部署 1.阿里云白炼 2.百度智能云 3.硅基流动 4.火山引擎OLLAM…

读技术之外:社会联结中的人工智能02劳工

读技术之外:社会联结中的人工智能02劳工1. 劳工 1.1. 入口通道处每隔一定距离都会出现很多考勤钟标志 1.2. 休息间里的考勤钟也起着重要作用—进出房间的所有扫描都会被追踪 1.3. 每个班次间只能休息15分钟,还有无薪…

详细介绍:如何有效删除 iPhone 上的所有内容?

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

鼠标图标更改样式

https://zhutix.com/tag/cursors/page/2/?post_order=likeI hope all of us can learn to progress!

做网站的预算表广西桂林天气预报15天查询

一、SpringBoot和SpringCloud简介 1、SpringBoot:是一个快速开发框架,通过用MAVEN依赖的继承方式,帮助我们快速整合第三方常用框架,完全采用注解化(使用注解方式启动SpringMVC),简化XML配置&am…

徐州网站建设价格odoo做网站

目录 Vue.js Ajax(axios) GET 方法 请求方法的别名 并发 请求配置项 响应结

马洪旭 做的网站大学赞友商城电商平台排名第几

以前都是用Cg的,现在改用GLSL,又要重新学,不过两种语言很多都是相通的。下面的例子是实现绘制一个三角形的简单程序。采用了VBO(veretx buffer object)、VAO(vertex array object)等OpenGL的一些…

网站开发速成培训机构郑州网站设计制作价格

1、正向代理 1.我访问不了某网站,但是我能访问一个代理服务器,这个代理服务器呢,他能访问那个我不能访问的网站 2.于是我先连上代理服务器,告诉他我需要那个无法访问网站的内容,代理服务器去取回来,然后返回给我。 3.客户端必须设置正向代理…

webpack和vite的区别 - 指南

webpack和vite的区别 - 指南pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco",…

m3u8在线播放测试的方法与常见问题解决方案(附网页演示

在如今的流媒体视频环境中,m3u8格式因其分片传输、秒开等特性越来越常见。无论是开发、运维还是视频内容生产者,很多时候都会遇到需要测试或在线播放m3u8链接的需求。那么,如何方便、高效地测试m3u8流的可用性,遇到…

校招题

NC258932 题目 其实就是一个三分答案的模板题,可以看出这是一个单谷函数。 借助这篇 博客 复习一下。 #include <bits/stdc++.h> using namespace std; using db = long double; db eps = 1e-6;void solve() {d…

React 播客专栏 Vol.18|React 第二阶段复习 样式与 Hooks 全面整合 - 实践

React 播客专栏 Vol.18|React 第二阶段复习 样式与 Hooks 全面整合 - 实践pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-fami…

嘉兴网站建设推荐浙江华企芯片商城网站建设

性能 当服务器正常运行时&#xff0c;WAL文件不断被写入磁盘。但是&#xff0c;这些写操作是顺序的:几乎没有随机访问&#xff0c;因此即使是HDD也可以处理这个任务。由于这种类型的加载与典型的数据文件访问非常不同&#xff0c;因此有必要为WAL文件设置一个单独的物理存储&a…

Manim实现旋转扭曲特效

在数学动画制作中,特殊效果可以极大地增强视觉表现力和吸引力。 本文将介绍如何使用Manim框架实现一个旋转扭曲特效,通过自定义动画类来创建独特的视觉效果。 实现原理 旋转扭曲特效的核心是通过修改对象上每个点的坐…

vip影院自助建站系统百度会员

graph LR A-->B性能概述 程序性能表现形式 执行速度&#xff1a;程序响应速度&#xff0c;总耗时是否足够短内存分配&#xff1a;内存分配是否合理&#xff0c;是否过多消耗内存或者存在泄漏启动时间&#xff1a;程序运行到可以正常处理业务需要的时间负载承受能力 性能测…

h5网站如何做flash 网站欣赏

js/jQuery常见操作 之各种语法例子&#xff08;包括jQuery中常见的与索引相关的选择器&#xff09; 1. 操作table常见的1.1 动态给table添加title&#xff08;指定td&#xff09;1.1.1 给td动态添加title&#xff08;含&#xff1a;获取tr的第几个td&#xff09;1.1.2 动态加工…