网站模版asp北京建网站公司
news/
2025/10/7 10:03:55/
文章来源:
网站模版asp,北京建网站公司,上海做网站建设,wordpress主题对接支付文章目录 零. RpcService服务概述1. AkkaRpcService的创建和初始化2.通过AkkaRpcService初始化RpcServer3. ResourceManager中RPC服务的启动4. 实现相互通讯能力 零. RpcService服务概述
RpcService负责创建和启动Flink集群环境中RpcEndpoint组件的RpcServer#xff0c;且Rpc… 文章目录 零. RpcService服务概述1. AkkaRpcService的创建和初始化2.通过AkkaRpcService初始化RpcServer3. ResourceManager中RPC服务的启动4. 实现相互通讯能力 零. RpcService服务概述
RpcService负责创建和启动Flink集群环境中RpcEndpoint组件的RpcServer且RpcService在启动集群时会提前创建好。AkkaRpcService作为RpcService的唯一实现类基于Akka的ActorSystem进行封装为不同的RpcEndpoint创建相应的ActorRef实例。 RpcService主要包含如下两个重要方法。 startServer()用于启动RpcEndpoint中的RpcServer。RpcServer实际上就是对Actor进行封装启动完成后RpcEndpoint中的RpcServer就能够对外提供服务了。connect()用于连接远端RpcEndpoint并返回给调用方RpcGateway接口的方法建立连接后RPC客户端就能像本地一样调用RpcServer提供的RpcGateway接口了。 例如在JobMaster组件中创建与ResourceManager组件之间的RPC连接时。此时可以通过Akka发送消息到ResourceManager的RpcServer中这样就使得JobMaster像调用本地方法一样在ResourceManager中执行请求任务。 1. AkkaRpcService的创建和初始化
在创建和启动ClusterEntrypoint及TaskManagerRunner的过程中会调用AkkaRpcServiceUtils.createRpcService()方法创建默认的AkkaRpcService接着启动RpcServer。 例如管理节点中会使用AkkaRpcService实例创建并启动ResourceManager、Dispatcher以及JobMaster等RPC服务。 创建AkkaRpcService主要包括如下步骤。 在ClusterEntrypoint中创建RpcService。启动ActorSystem服务。创建RobustActorSystem。RobustActorSystem实际上是对Akka的ActorSystem进行了封装和拓展相比于原生Akka ActorSystemRobustActorSystem包含了UncaughtExceptionHandler组件能够对ActorSystem抛出的异常进行处理。使用RobustActorSystem创建AkkaRpcService实例。将AkkaRpcService返回到ClusterEntrypoint中用于启动集群中各个RpcEndpoint组件服务 2.通过AkkaRpcService初始化RpcServer
在集群运行时中创建了共用的AkkaRpcService服务相当于创建了Akka系统中的ActorSystem接下来就是使用AkkaRpcService启动各个RpcEndpoint中的RpcServer实例。AkkaRpcService服务作为共用的rpc服务启动其他各个组件的RpcServer实例 这里先看通过AkkaRpcService初始化RpcEndpoint对应的RpcServer的逻辑。如下在org.apache.flink.runtime.rpc.RpcEndpoint的构造器中执行了RpcServer的初始化
protected RpcEndpoint(final RpcService rpcService, final String endpointId) {this.rpcService checkNotNull(rpcService, rpcService);this.endpointId checkNotNull(endpointId, endpointId);// 初始化RpcEndpoint中的RpcServerthis.rpcServer rpcService.startServer(this);this.mainThreadExecutor new MainThreadExecutor(rpcServer, this::validateRunsInMainThread);
}具体看下rpcService.startServer(this) 启动rpcServer的逻辑 ActorSystem创建相应Actor的ActorRef引用类。创建完毕后会将RpcEndpoint和ActorRef信息存储在Actor键值对集合中。启动RpcEndpoint对应的RPC服务首先获取当前RpcEndpoint实现的RpcGateways接口。 RpcGateway接口最终通过RpcUtils.extractImplementedRpcGateways()方法从类定义抽取出来例如JobMaster组件会抽取JobMasterGateway接口定义。创建InvocationHandler代理类根据InvocationHandler代理类提供的invoke()方法实现被代理类的具体方法。根据RpcEndpoint是否为FencedRpcEndpointInvocationHandler分为FencedAkkaInvocationHandler和AkkaInvocationHandler两种类型。 FencedMainThreadExecutable代理的接口主要有FencedMainThreadExecutable和FencedRpcGateway两种。 AkkaInvocationHandler主要代理实现AkkaBasedEndpoint、RpcGateway、StartStoppable、MainThreadExecutable、RpcServer等接口。 创建好InvocationHandler代理类后通过反射的方式Proxy.newProxyInstance()创建代理类。创建的代理类会被转换为RpcServer实例再返回给RpcEndpoint使用。 在RpcServer创建的过程中可以看出实际上包含了创建RpcEndpoint中的Actor引用类ActorRef和AkkaInvocationHandler动态代理类。最后将动态代理类转换为RpcServer接口返回给RpcEndpoint实现类此时实现的组件就能够获取到RpcServer服务且通过RpcServer代理了所有的RpcGateways接口提供了本地方法调用和远程方法调用两种模式。
Override
public C extends RpcEndpoint RpcGateway RpcServer startServer(C rpcEndpoint) { checkNotNull(rpcEndpoint, rpc endpoint); final SupervisorActor.ActorRegistration actorRegistration registerAkkaRpcActor(rpcEndpoint); final ActorRef actorRef actorRegistration.getActorRef(); final CompletableFutureVoid actorTerminationFuture actorRegistration.getTerminationFuture(); //启动RpcEndpoint对应的RPC服务LOG.info( Starting RPC endpoint for {} at {} ., rpcEndpoint.getClass().getName(), actorRef.path()); final String akkaAddress AkkaUtils.getAkkaURL(actorSystem, actorRef); final String hostname; OptionString host actorRef.path().address().host(); if (host.isEmpty()) { hostname localhost; } else { hostname host.get(); } //解析EpcEndpoint实现的所有RpcGateway接口SetClass? implementedRpcGateways new HashSet(RpcUtils.extractImplementedRpcGateways(rpcEndpoint.getClass())); //额外添加RpcServer和AkkaBasedEndpoint类implementedRpcGateways.add(RpcServer.class); implementedRpcGateways.add(AkkaBasedEndpoint.class); final InvocationHandler akkaInvocationHandler; //根据是否是FencedRpcEndpoint创建不同的动态代理对象if (rpcEndpoint instanceof FencedRpcEndpoint) { // a FencedRpcEndpoint needs a FencedAkkaInvocationHandler akkaInvocationHandler new FencedAkkaInvocationHandler( akkaAddress, hostname, actorRef, configuration.getTimeout(), configuration.getMaximumFramesize(), actorTerminationFuture, ((FencedRpcEndpoint?) rpcEndpoint)::getFencingToken, captureAskCallstacks); implementedRpcGateways.add(FencedMainThreadExecutable.class); } else { akkaInvocationHandler new AkkaInvocationHandler( akkaAddress, hostname, actorRef, configuration.getTimeout(), configuration.getMaximumFramesize(), actorTerminationFuture, captureAskCallstacks); } // Rather than using the System ClassLoader directly, we derive the ClassLoader // from this class . That works better in cases where Flink runs embedded and all Flink // code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoader ClassLoader classLoader getClass().getClassLoader(); SuppressWarnings(unchecked) RpcServer server (RpcServer) Proxy.newProxyInstance( classLoader, implementedRpcGateways.toArray( new Class?[implementedRpcGateways.size()]), akkaInvocationHandler); return server;
}3. ResourceManager中RPC服务的启动
RpcServer在RpcEndpoint的构造器中完成初始化后接下来就是启动RpcEndpoint和RpcServer这里以ResourceManager为例进行说明。
在启动集群时看下如何启动ResourceManager的RPC服务的。如下调用链
ClusterEntrypoint.startCluster-runCluster
-dispatcherResourceManagerComponentFactory.create
-resourceManager.start();public final void start() { rpcServer.start();
}继续探索RPC服务是如何启动的
首先在DefaultDispatcherResourceManagerComponentFactory中调用ResourceManager.start()方法启动ResourceManager实例此时在ResourceManager.start()方法中会同步调用RpcServer.start()方法启动ResourceManager所在RpcEndpoint中的RpcServer如下。 调用ResourceManager.start()方法此时会调用RpcEndpoint.start()父方法启动ResourceManager组件的RpcServer。通过动态代理AkkaInvocationHandler.invoke()方法执行流程发现调用的是StartStoppable.start()方法此时会直接调用AkkaInvocationHandler.start()本地方法。在AkkaInvocationHandler.start()方法中实际上会调用rpcEndpoint.tell(ControlMessages.START,ActorRef.noSender())方法向ResourceManager对应的Actor发送控制消息表明当前Actor实例可以正常启动并接收来自远端的RPC请求。AkkaRpcActor调用handleControlMessage()方法处理ControlMessages.START控制消息。将AkkaRpcActor中的状态更新为StartedState此时ResourceManager的RpcServer启动完成ResourceManager组件能够接收来自其他组件的RPC请求。 在flink1.12中省略了AkkaInvocationHandler的干预。
经过以上步骤指定组件的RpcEndpoint节点就正常启动此时RpcServer会作为独立的线程运行在JobManager或TaskManager进程中处理本地和远程提交的RPC请求。 4. 实现相互通讯能力 当AkkaRpcService启动RpcEndpoint中的RpcServer后RpcEndpoint组件仅能对外提供处理RPC请求的能力RpcEndpoint组件需要在启动后向其他组件注册自己的RpcEndpoint信息并完成组件之间的RpcConnection注册才能相互访问和通信。而创建RPC连接需要调用RpcService.connect()方法。 如代码所示在AkkaRpcService.connect()方法中完成了RpcConnection对象的创建。
Override
public C extends RpcGateway CompletableFutureC connect( final String address, final ClassC clazz) { return connectInternal( address, clazz, (ActorRef actorRef) - { Tuple2String, String addressHostname extractAddressHostname(actorRef); return new AkkaInvocationHandler( addressHostname.f0, addressHostname.f1, actorRef, configuration.getTimeout(), configuration.getMaximumFramesize(), null, captureAskCallstacks); });
}具体看AkkaRpcService.connectInternal()方法逻辑。 获取ActorRef引用对象。调用Patterns.ask()方法向actorRef对应的RpcEndpoint节点发送RemoteHandshakeMessage消息确保连接的RpcEndpoint节点正常如果成功则RpcEndpoint会返回HandshakeSuccessMessage消息。调用invocationHandlerFactory创建invocationHandler动态代理类此时可以看到传递的接口列表为new Class?[]{clazz}也就是当前RpcEndpoint需要访问的RpcGateway接口。例如JobMaster访问ResourceManager时这里就是ResourceManagerGateway接口。 private C extends RpcGateway CompletableFutureC connectInternal( final String address, final ClassC clazz, FunctionActorRef, InvocationHandler invocationHandlerFactory) { checkState(!stopped, RpcService is stopped); LOG.debug( Try to connect to remote RPC endpoint with address {}. Returning a {} gateway., address, clazz.getName()); //获取actorRef实例 final CompletableFutureActorRef actorRefFuture resolveActorAddress(address); //进行handshake操作确保需要连接的RpcEndpoint节点正常 final CompletableFutureHandshakeSuccessMessage handshakeFuture actorRefFuture.thenCompose( (ActorRef actorRef) - FutureUtils.toJava( //调用Patterns.ask()方法向actorRef对应的//RpcEndpoint节点发送RemoteHandshakeMessage消息//确保连接的RpcEndpoint节点正常如果成功则//RpcEndpoint会返回HandshakeSuccessMessage消息。 Patterns.ask( actorRef, new RemoteHandshakeMessage( clazz, getVersion()), configuration.getTimeout().toMilliseconds()) .HandshakeSuccessMessagemapTo( ClassTag$.MODULE$ .HandshakeSuccessMessageapply( HandshakeSuccessMessage .class)))); //创建RPC动态代理类 return actorRefFuture.thenCombineAsync( handshakeFuture, (ActorRef actorRef, HandshakeSuccessMessage ignored) - { InvocationHandler invocationHandler invocationHandlerFactory.apply(actorRef); // Rather than using the System ClassLoader directly, we derive the ClassLoader // from this class . That works better in cases where Flink runs embedded and // all Flink // code is loaded dynamically (for example from an OSGI bundle) through a custom // ClassLoader ClassLoader classLoader getClass().getClassLoader(); SuppressWarnings(unchecked) C proxy (C) Proxy.newProxyInstance( classLoader, new Class?[] {clazz}, invocationHandler); return proxy; }, actorSystem.dispatcher());
}
经过以上步骤实现了创建RpcEndpoint组件之间的RPC连接此时集群RPC组件之间可以进行相互访问例如JobMaster可以向ResourceManager发送Slot资源请求。 RPC 服务启动的 Akka actor 能接收来自RpcGateway RPC 调用。 参考《Flink设计与实现核心原理与源码解析》–张利兵
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/930285.shtml
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!