Dubbo 3.x源码(29)—Dubbo Consumer服务调用源码(1)服务调用入口

基于Dubbo 3.1,详细介绍了Dubbo Consumer服务调用源码。

此前我们学习了Dubbo服务的导出和引入的源码,现在我们来学习Dubbo服务调用的源码。

此前的文章中我们讲过了最上层代理的调用逻辑(服务引用bean的获取以及懒加载原理):业务引入的接口代理对象(ReferenceBean内部的lazyProxy对象)-> 代理目标对象(ReferenceConfig内部的接口代理对象ref),代理目标对象的请求都会被统一转发到内部的InvokerInvocationHandler#invoke方法中去。

所以,我们学习Dubbo服务调用源码入口就是InvokerInvocationHandler#invoke方法。

Dubbo 3.x服务调用源码:

  1. Dubbo 3.x源码(29)—Dubbo Consumer服务调用源码(1)服务调用入口
  2. Dubbo 3.x源码(30)—Dubbo Consumer服务调用源码(2)发起远程调用

Dubbo 3.x服务引用源码:

  1. Dubbo 3.x源码(11)—Dubbo服务的发布与引用的入口
  2. Dubbo 3.x源码(18)—Dubbo服务引用源码(1)
  3. Dubbo 3.x源码(19)—Dubbo服务引用源码(2)
  4. Dubbo 3.x源码(20)—Dubbo服务引用源码(3)
  5. Dubbo 3.x源码(21)—Dubbo服务引用源码(4)
  6. Dubbo 3.x源码(22)—Dubbo服务引用源码(5)服务引用bean的获取以及懒加载原理
  7. Dubbo 3.x源码(23)—Dubbo服务引用源码(6)MigrationRuleListener迁移规则监听器
  8. Dubbo 3.x源码(24)—Dubbo服务引用源码(7)接口级服务发现订阅refreshInterfaceInvoker
  9. Dubbo 3.x源码(25)—Dubbo服务引用源码(8)notify订阅服务通知更新
  10. Dubbo 3.x源码(26)—Dubbo服务引用源码(9)应用级服务发现订阅refreshServiceDiscoveryInvoker
  11. Dubbo 3.x源码(27)—Dubbo服务引用源码(10)subscribeURLs订阅应用级服务url

Dubbo 3.x服务发布源码:

  1. Dubbo 3.x源码(11)—Dubbo服务的发布与引用的入口
  2. Dubbo 3.x源码(12)—Dubbo服务发布导出源码(1)
  3. Dubbo 3.x源码(13)—Dubbo服务发布导出源码(2)
  4. Dubbo 3.x源码(14)—Dubbo服务发布导出源码(3)
  5. Dubbo 3.x源码(15)—Dubbo服务发布导出源码(4)
  6. Dubbo 3.x源码(16)—Dubbo服务发布导出源码(5)
  7. Dubbo 3.x源码(17)—Dubbo服务发布导出源码(6)
  8. Dubbo 3.x源码(28)—Dubbo服务发布导出源码(7)应用级服务接口元数据发布

文章目录

  • InvokerInvocationHandler#invoke调用入口
  • InvocationUtil#invoke发起调用
    • getUrl获取consumerUrl
  • MigrationInvoker#invoke决策调用Invoker
  • MockClusterInvoker#invoker本地mock调用
  • AbstractCluster#invoke继续调用
  • CallbackRegistrationInvoker#invoke回调注册
  • CopyOfFilterChainNode#invoke过滤器链式调用
  • ConsumerContextFilter封装信息
    • FutureFilter执行回调方法
    • MonitorFilter收集监控数据
    • RouterSnapshotFilter路由快照zhichi
  • AbstractClusterInvoker#invoke集群容错rpc调用
  • FailoverClusterInvoker#doInvoke失败重试调用
    • AbstractClusterInvoker#select选择invoker
    • AbstractClusterInvoker#doSelect继续选择invoker
    • FailoverClusterInvoker#invokeWithContext调用服务
  • 总结

InvokerInvocationHandler#invoke调用入口

InvokerInvocationHandler#invoke可以看作是消费者发起调用的入口方法。

该方法中,将会构建一个RpcInvocation作为接口方法调用抽象,包含方法名字,接口名,方法参数等信息,然后通过InvocationUtil#invoke方法来发起真正的远程or本地调用。

/*** InvokerInvocationHandler的方法* <p>* 执行代理对象的方法的转发** @param proxy  在其上调用方法的代理实例* @param method 在代理实例上调用的接口方法* @param args   一个对象数组,包含在代理实例的方法调用中传递的参数值,如果接口方法不接受参数,则为null* @return 调用代理对象方法执行结果*/
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {//对于object类的方法,直接反射调用if (method.getDeclaringClass() == Object.class) {return method.invoke(invoker, args);}//获取方法名和方法参数类型数组String methodName = method.getName();Class<?>[] parameterTypes = method.getParameterTypes();//特殊方法的调用,直接调用invoker对象的同名方法if (parameterTypes.length == 0) {if ("toString".equals(methodName)) {return invoker.toString();} else if ("$destroy".equals(methodName)) {invoker.destroy();return null;} else if ("hashCode".equals(methodName)) {return invoker.hashCode();}} else if (parameterTypes.length == 1 && "equals".equals(methodName)) {return invoker.equals(args[0]);}//构建一个RpcInvocation作为接口方法调用抽象,包含方法名字,接口名,方法参数等信息RpcInvocation rpcInvocation = new RpcInvocation(serviceModel, method.getName(), invoker.getInterface().getName(), protocolServiceKey, method.getParameterTypes(), args);if (serviceModel instanceof ConsumerModel) {rpcInvocation.put(Constants.CONSUMER_MODEL, serviceModel);rpcInvocation.put(Constants.METHOD_MODEL, ((ConsumerModel) serviceModel).getMethodModel(method));}//通过InvocationUtil.invoke方法来发起真正的远程or本地调用return InvocationUtil.invoke(invoker, rpcInvocation);
}

InvocationUtil#invoke发起调用

这里的Invoker是经过层层封装之后的Invoker(注意MockClusterInvoker上面还有一个MigrationInvoker没画出来),我们将按照这个顺序从上向下讲解。

在这里插入图片描述

public class InvocationUtil {private static final Logger logger = LoggerFactory.getLogger(InvokerInvocationHandler.class);/*** 消费者调用** @param invoker       可执行器* @param rpcInvocation 接口方法调用抽* @return 执行结果*/public static Object invoke(Invoker<?> invoker, RpcInvocation rpcInvocation) throws Throwable {//消费者url,consumer://10.253.45.126/org.apache.dubbo.demo.GreetingService?application=demo-consumer&background=false&check=false&dubbo=2.0.2&group=greeting&init=false&interface=org.apache.dubbo.demo.GreetingService&methods=hello&pid=72363&register.ip=10.253.45.126&release=&revision=1.0.0&side=consumer&sticky=false&timestamp=1668394930680&unloadClusterRelated=false&version=1.0.0URL url = invoker.getUrl();//服务key,{group}/{serviceInterface}:{version}String serviceKey = url.getServiceKey();//设置目标服务的唯一名称rpcInvocation.setTargetServiceUniqueName(serviceKey);//设置消费者url到rpc调用上下文中吗格式一个内部的线程本地变量// invoker.getUrl() returns consumer url.RpcServiceContext.getServiceContext().setConsumerUrl(url);//调用性能解析器if (ProfilerSwitch.isEnableSimpleProfiler()) {//线程本地变量ProfilerEntry parentProfiler = Profiler.getBizProfiler();ProfilerEntry bizProfiler;if (parentProfiler != null) {//接收请求。客户端调用开始。bizProfiler = Profiler.enter(parentProfiler,"Receive request. Client invoke begin. ServiceKey: " + serviceKey + " MethodName:" + rpcInvocation.getMethodName());} else {//接收请求。客户端调用开始。bizProfiler = Profiler.start("Receive request. Client invoke begin. ServiceKey: " + serviceKey + " " + "MethodName:" + rpcInvocation.getMethodName());}rpcInvocation.put(Profiler.PROFILER_KEY, bizProfiler);try {/** 执行invoker#invoker方法实现调用*/return invoker.invoke(rpcInvocation).recreate();} finally {//释放Profiler.release(bizProfiler);int timeout;//timeout附加信息,远程服务调用超时时间(毫秒),默认1000Object timeoutKey = rpcInvocation.getObjectAttachmentWithoutConvert(TIMEOUT_KEY);if (timeoutKey instanceof Integer) {timeout = (Integer) timeoutKey;} else {timeout = url.getMethodPositiveParameter(rpcInvocation.getMethodName(),TIMEOUT_KEY,DEFAULT_TIMEOUT);}//调用执行时间long usage = bizProfiler.getEndTime() - bizProfiler.getStartTime();//是否超时if ((usage / (1000_000L * ProfilerSwitch.getWarnPercent())) > timeout) {StringBuilder attachment = new StringBuilder();rpcInvocation.foreachAttachment((entry) -> {attachment.append(entry.getKey()).append("=").append(entry.getValue()).append(";\n");});//超时打印日志logger.warn(String.format("[Dubbo-Consumer] execute service %s#%s cost %d.%06d ms, this invocation almost (maybe already) timeout. Timeout: %dms\n" + "invocation context:\n%s" + "thread info: \n%s",rpcInvocation.getProtocolServiceKey(),rpcInvocation.getMethodName(),usage / 1000_000,usage % 1000_000,timeout,attachment,Profiler.buildDetail(bizProfiler)));}}}/** 抛出异常后重新执行invoker#invoker方法实现调用*/return invoker.invoke(rpcInvocation).recreate();}
}

getUrl获取consumerUrl

该方法获取消费者url。例如:consumer://10.253.45.126/org.apache.dubbo.demo.GreetingService?application=demo-consumer&background=false&check=false&dubbo=2.0.2&group=greeting&init=false&interface=org.apache.dubbo.demo.GreetingService&methods=hello&pid=72363&register.ip=10.253.45.126&release=&revision=1.0.0&side=consumer&sticky=false&timestamp=1668394930680&unloadClusterRelated=false&version=1.0.0

/*** MigrationInvoker的方法*/
@Override
public URL getUrl() {if (currentAvailableInvoker != null) {//当前激活的Invokerreturn currentAvailableInvoker.getUrl();} else if (invoker != null) {//接口级别invokerreturn invoker.getUrl();} else if (serviceDiscoveryInvoker != null) {//应用级别invokerreturn serviceDiscoveryInvoker.getUrl();}//消费者urlreturn consumerUrl;
}

MigrationInvoker#invoke决策调用Invoker

MigrationInvoker是可迁移Invoker,也是最上层Invoker,它可用于选择不同的Invoker执行调用,用于应用级服务发现和接口级服务发现之间的迁移,以及实现灰度调用。

该方法决策使用应用级Invoker还是接口级Invoker执行调用,如果设置了激活的invoker并且类型为应用级优先,那么还会进入灰度策略,如果随机决策值(0-100)大于灰度值,那么走接口级订阅模式,否则走应用级订阅模式。灰度比例功能仅在应用级优先状态下生效。

/*** MigrationInvoker的方法* * 决策使用应用级Invoker还是接口级Invoker执行调用** @param invocation RpcInvocation* @return 调用结果*/
@Override
public Result invoke(Invocation invocation) throws RpcException {//如果设置了当前激活的invokerif (currentAvailableInvoker != null) {//如果类型为应用级优先if (step == APPLICATION_FIRST) {//如果随机决策值(0-100)大于灰度值,那么走接口模式//灰度比例功能仅在应用级优先状态下生效if (promotion < 100 && ThreadLocalRandom.current().nextDouble(100) > promotion) {//退回到接口模式调用return invoker.invoke(invocation);}//每次检查调用程序是否可用,应用级invoker优先,然后继续使用invoker调用return decideInvoker().invoke(invocation);}//其他类型return currentAvailableInvoker.invoke(invocation);}//如果没有当前激活的invokerswitch (step) {case APPLICATION_FIRST://每次检查调用程序是否可用,应用级invoker优先currentAvailableInvoker = decideInvoker();break;case FORCE_APPLICATION://应用级invokercurrentAvailableInvoker = serviceDiscoveryInvoker;break;case FORCE_INTERFACE:default://接口级invokercurrentAvailableInvoker = invoker;}//调用return currentAvailableInvoker.invoke(invocation);
}

MockClusterInvoker#invoker本地mock调用

dubbo除了限流措施之外,还支持mock本地伪装。本地伪装常被用于服务降级和熔断。比如某验权服务,当服务提供方全部挂掉后,假如此时服务消费方发起了一次远程调用,那么本次调用将会失败并抛出一个 RpcException 异常。

为了避免出现这种直接抛出异常的情况出现,那么客户端就可以利用本地伪装来提供 Mock 数据返回授权失败。mock的大概配置如下:

  1. false、0、null、N/A:如果没有设置mock属性,或者设置了这些值,表示不启用mock,直接发起远程调用,这是默认策略。
  2. force:强制服务降级,如果mock属性值以“force”开头则使用该策略。服务调用方在调用该接口服务时候会直接执行客户端本地的mock逻辑,不会远程调用服务提供者。比如使用mock=force:return+null表示消费方对该服务的方法调用都直接返回null值,不发起远程调用。
  3. fail策略:服务失败降级,如果mock属性值不符合上面所有的类型,则使用该策略。表示服务调用彻底失败并抛出RPCException之后执行本地mock逻辑,不一定会抛出异常。服务彻底失败具体和设置的集群容错方式有关。如果设置了retries,则是在全部重试次数使用完毕并且抛出RPCException异常之后才会执行mock的逻辑。
  4. true/default:如果mock属性值为true或者default,那么在最终失败并抛出RPCException之后,会在接口同路径下查找interfaceName+Mock名字的类,随后通过无参构造器实例化,随后调用该Mock类对象的对应方法,该方法中提供对应的逻辑。

详细介绍参见官方文档:https://dubbo.apache.org/zh/docs3-v2/java-sdk/advanced-features-and-usage/service/local-mock/#%E5%BC%80%E5%90%AF-mock-%E9%85%8D%E7%BD%AE。

/*** MockClusterInvoker的方法* <p>* mock调用** @param invocation RpcInvocation* @return 调用结果*/
@Override
public Result invoke(Invocation invocation) throws RpcException {Result result;//消费者url获取mock参数String value = getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY, Boolean.FALSE.toString()).trim();//false、0、null、N/A,表示不需要mockif (ConfigUtils.isEmpty(value)) {//no mock/** 继续调用下层Invoker#invoke*/result = this.invoker.invoke(invocation);}//以force开头,表示强制服务降级else if (value.startsWith(FORCE_KEY)) {if (logger.isWarnEnabled()) {logger.warn("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + getUrl());}//force:direct mock//服务调用方在调用该接口服务时候会直接执行客户端本地的mock逻辑,不会远程调用服务提供者。//比如使用mock=force:return+null表示消费方对该服务的方法调用都直接返回null值,不发起远程调用。result = doMockInvoke(invocation, null);}//其他值,先先发起远程调用,当远程服务调用失败时,才会根据配置降级执行mock功能else {//fail-mocktry {/** 继续调用下层Invoker#invoke*/result = this.invoker.invoke(invocation);//fix:#4585//最终失败并抛出RPCExceptionif (result.getException() != null && result.getException() instanceof RpcException) {RpcException rpcException = (RpcException) result.getException();if (rpcException.isBiz()) {throw rpcException;} else {/** 执行本地mock调用*/result = doMockInvoke(invocation, rpcException);}}} catch (RpcException e) {if (e.isBiz()) {throw e;}if (logger.isWarnEnabled()) {logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + getUrl(), e);}/** 执行本地mock调用*/result = doMockInvoke(invocation, e);}}return result;
}

AbstractCluster#invoke继续调用

继续向下到这里,首先执行inoker的过滤操作。

/*** AbstractCluster的方法*/
@Override
public Result invoke(Invocation invocation) throws RpcException {return filterInvoker.invoke(invocation);
}

CallbackRegistrationInvoker#invoke回调注册

filterInvoker实际类型为FilterChainBuilder的内部类CallbackRegistrationInvoker,它的invoke方法用于添加一个回调,它可以在RPC调用完成时触发,在这回调函数将会倒序执行filters中的过滤器。

/*** FilterChainBuilder的内部类CallbackRegistrationInvoker的方法*/
@Override
public Result invoke(Invocation invocation) throws RpcException {/** 继续调用下层invoker#invoke*/Result asyncResult = filterInvoker.invoke(invocation);//添加一个回调,它可以在RPC调用完成时触发。asyncResult.whenCompleteWithContext((r, t) -> {RuntimeException filterRuntimeException = null;//过滤器倒序执行for (int i = filters.size() - 1; i >= 0; i--) {FILTER filter = filters.get(i);try {InvocationProfilerUtils.releaseDetailProfiler(invocation);if (filter instanceof ListenableFilter) {//执行过滤器ListenableFilter listenableFilter = ((ListenableFilter) filter);Filter.Listener listener = listenableFilter.listener(invocation);try {if (listener != null) {if (t == null) {listener.onResponse(r, filterInvoker, invocation);} else {listener.onError(t, filterInvoker, invocation);}}} finally {listenableFilter.removeListener(invocation);}} else if (filter instanceof FILTER.Listener) {FILTER.Listener listener = (FILTER.Listener) filter;if (t == null) {listener.onResponse(r, filterInvoker, invocation);} else {listener.onError(t, filterInvoker, invocation);}}} catch (RuntimeException runtimeException) {LOGGER.error(String.format("Exception occurred while executing the %s filter named %s.", i, filter.getClass().getSimpleName()));if (LOGGER.isDebugEnabled()) {LOGGER.debug(String.format("Whole filter list is: %s", filters.stream().map(tmpFilter -> tmpFilter.getClass().getSimpleName()).collect(Collectors.toList())));}filterRuntimeException = runtimeException;t = runtimeException;}}if (filterRuntimeException != null) {throw filterRuntimeException;}});return asyncResult;
}

CopyOfFilterChainNode#invoke过滤器链式调用

FilterChainBuilder的内部类,一个CopyOfFilterChainNode实例表示一个过滤器节点,将会首先执行全部的过滤器,执行完毕之后,才会继续后面真正的rpc调用。

这个版本默认4个filter,分别是:ConsumerContextFilter、FutureFilter、MonitorFilter、RouterSnapshotFilter。

ConsumerContextFilter封装信息

ConsumerContextFilter为consumer invoker序设置当前RpcContext,包括invoker,invocation, local host, remote host and port。它这样做是为了让执行线程的RpcContext包含这些必要的信息,方便后续直接获取。

/*** ConsumerContextFilter的方法** 始终在实现中调用invoker.invoke()将请求转交给下一个筛选器节点。** @param invoker 下一个invoker节点*/
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {RpcContext.RestoreServiceContext originServiceContext = RpcContext.storeServiceContext();try {//设置相关属性RpcContext.getServiceContext()//下一个invoker.setInvoker(invoker)//调用抽象.setInvocation(invocation)//本地地址.setLocalAddress(NetUtils.getLocalHost(), 0);RpcContext context = RpcContext.getClientAttachment();context.setAttachment(REMOTE_APPLICATION_KEY, invoker.getUrl().getApplication());if (invocation instanceof RpcInvocation) {//下一个invoker((RpcInvocation) invocation).setInvoker(invoker);}if (CollectionUtils.isNotEmpty(supportedSelectors)) {for (PenetrateAttachmentSelector supportedSelector : supportedSelectors) {Map<String, Object> selected = supportedSelector.select();if (CollectionUtils.isNotEmptyMap(selected)) {((RpcInvocation) invocation).addObjectAttachments(selected);}}} else {((RpcInvocation) invocation).addObjectAttachments(RpcContext.getServerAttachment().getObjectAttachments());}Map<String, Object> contextAttachments = RpcContext.getClientAttachment().getObjectAttachments();if (CollectionUtils.isNotEmptyMap(contextAttachments)) {/*** invocation.addAttachmentsIfAbsent(context){@link RpcInvocation#addAttachmentsIfAbsent(Map)}should not be used here,* because the {@link RpcContext#setAttachment(String, String)} is passed in the Filter when the call is triggered* by the built-in retry mechanism of the Dubbo. The attachment to update RpcContext will no longer work, which is* a mistake in most cases (for example, through Filter to RpcContext output traceId and spanId and other information).*/((RpcInvocation) invocation).addObjectAttachments(contextAttachments);}// pass default timeout set by end user (ReferenceConfig)Object countDown = context.getObjectAttachment(TIME_COUNTDOWN_KEY);if (countDown != null) {TimeoutCountDown timeoutCountDown = (TimeoutCountDown) countDown;if (timeoutCountDown.isExpired()) {return AsyncRpcResult.newDefaultAsyncResult(new RpcException(RpcException.TIMEOUT_TERMINATE,"No time left for making the following call: " + invocation.getServiceName() + "."+ invocation.getMethodName() + ", terminate directly."), invocation);}}RpcContext.removeServerContext();//调用下一个invoker节点return invoker.invoke(invocation);} finally {RpcContext.restoreServiceContext(originServiceContext);}
}

FutureFilter执行回调方法

这个过滤器主要是获取异步方法调用信息AsyncMethodInfo,AsyncMethodInfo内部包含在执行Invoker的各个阶段的回调方法和实例,这里将会执行invoker调用时(调用之前)的回调方法。

/*** FutureFilter的方法** 始终在实现中调用invoker.invoke()将请求转交给下一个筛选器节点。** @param invoker 下一个invoker节点*/
@Override
public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException {//获取Invoker回调方法,并且执行fireInvokeCallback(invoker, invocation);// need to configure if there's return value before the invocation in order to help invoker to judge if it's// necessary to return future.//调用下一个invoker节点return invoker.invoke(invocation);
}private void fireInvokeCallback(final Invoker<?> invoker, final Invocation invocation) {//获取异步方法调用信息AsyncMethodInfo,并且执行invoker调用时的回调方法final AsyncMethodInfo asyncMethodInfo = getAsyncMethodInfo(invoker, invocation);//如果为null直接返回if (asyncMethodInfo == null) {return;}//调用异步调用时的回调方法final Method onInvokeMethod = asyncMethodInfo.getOninvokeMethod();//调用异步调用时的回调实例final Object onInvokeInst = asyncMethodInfo.getOninvokeInstance();if (onInvokeMethod == null && onInvokeInst == null) {return;}if (onInvokeMethod == null || onInvokeInst == null) {throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a oninvoke callback config , but no such " + (onInvokeMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl());}if (!onInvokeMethod.isAccessible()) {onInvokeMethod.setAccessible(true);}Object[] params = invocation.getArguments();try {//反射执行invoker调用时的回调方法onInvokeMethod.invoke(onInvokeInst, params);} catch (InvocationTargetException e) {fireThrowCallback(invoker, invocation, e.getTargetException());} catch (Throwable e) {fireThrowCallback(invoker, invocation, e);}
}

MonitorFilter收集监控数据

监控拦截器,它将收集关于此调用的调用数据并将其发送到监控中心。

/*** MonitorFilter的方法** 始终在实现中调用invoker.invoke()将请求转交给下一个筛选器节点。** @param invoker 下一个invoker节点*/
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {//如果存在monitor,则设置监控数据if (invoker.getUrl().hasAttribute(MONITOR_KEY)) {invocation.put(MONITOR_FILTER_START_TIME, System.currentTimeMillis());invocation.put(MONITOR_REMOTE_HOST_STORE, RpcContext.getServiceContext().getRemoteHost());// count upgetConcurrent(invoker, invocation).incrementAndGet();}//调用下一个invoker节点return invoker.invoke(invocation);
}

RouterSnapshotFilter路由快照zhichi

匹配路由器快照切换器RouterSnapshotSwitcher,用于打印日志以及路由快照。

/*** RouterSnapshotFilter的方法** 始终在实现中调用invoker.invoke()将请求转交给下一个筛选器节点。** @param invoker 下一个invoker节点*/
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {//默认不支持if (!switcher.isEnable()) {//调用下一个invoker节点//如果后面没有其他filter,那么该节点就是真正的invoker节点return invoker.invoke(invocation);}if (!logger.isInfoEnabled()) {return invoker.invoke(invocation);}if (!switcher.isEnable(invocation.getServiceModel().getServiceKey())) {return invoker.invoke(invocation);}RpcContext.getServiceContext().setNeedPrintRouterSnapshot(true);return invoker.invoke(invocation);
}

如果后面没有其他filter,那么下一个节点就是真正的invoker节点,即具有集群容错策略的ClusterInvoker,默认FailoverClusterInvoker。


AbstractClusterInvoker#invoke集群容错rpc调用

该方法是执行rpc调用的骨干方法。大概步骤为:

  1. 调用list方法,从服务目录Directory中根据路由规则或滤出满足规则的服务提供者invoker列表。最后会调用每个路由器的route方法进行服务路由,如果没有路由器则返回全部invoker列表。具体源码后面再说。
  2. 调用initLoadBalance方法,获取负载均衡策略实例,默认RandomLoadBalance。具体源码后面再说。
  3. 调用doinvoke方法,继续向下执行rpc调用的逻辑。
/*** AbstractClusterInvoker的方法* <p>* rpc调用的模版方法实现*/@Overridepublic Result invoke(final Invocation invocation) throws RpcException {//销毁检测checkWhetherDestroyed();// binding attachments into invocation.
//        Map<String, Object> contextAttachments = RpcContext.getClientAttachment().getObjectAttachments();
//        if (contextAttachments != null && contextAttachments.size() != 0) {
//            ((RpcInvocation) invocation).addObjectAttachmentsIfAbsent(contextAttachments);
//        }InvocationProfilerUtils.enterDetailProfiler(invocation, () -> "Router route.");/** 1 从服务目录中根据路由规则或滤出满足规则的服务提供者invoker列表*/List<Invoker<T>> invokers = list(invocation);InvocationProfilerUtils.releaseDetailProfiler(invocation);/** 2 获取负载均衡策略实现*/LoadBalance loadbalance = initLoadBalance(invokers, invocation);RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);InvocationProfilerUtils.enterDetailProfiler(invocation, () -> "Cluster " + this.getClass().getName() + " invoke.");try {/** 3 继续执行rpc调用*/return doInvoke(invocation, invokers, loadbalance);} finally {InvocationProfilerUtils.releaseDetailProfiler(invocation);}}

FailoverClusterInvoker#doInvoke失败重试调用

此前我们通过服务路由获取了符合规则的服务提供者invoker列表,以及获取了服务负载均衡实例,下面将会进行带有容错机制的rpc调用。

FailoverClusterInvoker采用失败自动切换机制,Dubbo默认的容错策略。服务消费方调用失败后自动切换到其他服务提供者的服务器进行重试。客户端等待服务端的处理时间超过了设定的超时时间时,也算做失败,将会重试。可通过 retries属性来设置重试次数(不含第一次),默认重试两次。

通常用于读操作或者具有幂等的写操作,需要注意的是重试会带来更长延迟。大概步骤为:

  1. 计算最大调用次数,默认3,包括两次失败重试,最小1次。在一个循环中执行调用。
  2. 每次重试前,重新调用list方法从服务目录中根据路由规则或滤出满足规则的服务提供者invoker列表。可想而知,如果此时服务有变更,那么invoked列表将失去准确性。
  3. 基于负载均衡策略选择一个服务提供者invoker。
  4. 通过服务提供者invoker执行rpc调用获取结果。
/*** FailoverClusterInvoker的方法* * 失败重试的容错策略** @param invocation  方法调用抽象* @param invokers    服务提供者invoker列表,已经经过了路由过滤* @param loadbalance 负载均衡* @return 执行结果*/
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {List<Invoker<T>> copyInvokers = invokers;//可用invoker列表不能为空,否则直接抛出异常checkInvokers(copyInvokers, invocation);//方法名String methodName = RpcUtils.getMethodName(invocation);/** 计算最大调用次数,默认3,包括两次失败重试,最小1次* retries属性*/int len = calculateInvokeTimes(methodName);// retry loop.RpcException le = null; // last exception.//已调用过的invoker列表List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.//已调用过的provider列表Set<String> providers = new HashSet<String>(len);for (int i = 0; i < len; i++) {//重试前重新从服务目录中根据路由规则或滤出满足规则的服务提供者invoker列表//如果此时服务有变更,那么invoked列表将失去准确性if (i > 0) {checkWhetherDestroyed();//重新服务路由copyInvokers = list(invocation);// check againcheckInvokers(copyInvokers, invocation);}/** 基于负载均衡策略选择一个invoker*/Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);//添加到已调用列表invoked.add(invoker);RpcContext.getServiceContext().setInvokers((List) invoked);boolean success = false;try {/** 使用服务提供者invoker执行rpc服务调用*/Result result = invokeWithContext(invoker, invocation);if (le != null && logger.isWarnEnabled()) {logger.warn("Although retry the method " + methodName+ " in the service " + getInterface().getName()+ " was successful by the provider " + invoker.getUrl().getAddress()+ ", but there have been failed providers " + providers+ " (" + providers.size() + "/" + copyInvokers.size()+ ") from the registry " + directory.getUrl().getAddress()+ " on the consumer " + NetUtils.getLocalHost()+ " using the dubbo version " + Version.getVersion() + ". Last error is: "+ le.getMessage(), le);}success = true;return result;} catch (RpcException e) {if (e.isBiz()) { // biz exception.throw e;}le = e;} catch (Throwable e) {le = new RpcException(e.getMessage(), e);} finally {//如果未成功if (!success) {//加入已调用过的provider列表providers.add(invoker.getUrl().getAddress());}}}throw new RpcException(le.getCode(), "Failed to invoke the method "+ methodName + " in the service " + getInterface().getName()+ ". Tried " + len + " times of the providers " + providers+ " (" + providers.size() + "/" + copyInvokers.size()+ ") from the registry " + directory.getUrl().getAddress()+ " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "+ Version.getVersion() + ". Last error is: "+ le.getMessage(), le.getCause() != null ? le.getCause() : le);
}

AbstractClusterInvoker#select选择invoker

基于负载均衡策略选择一个invoker。

/*** AbstractClusterInvoker的方法* * 基于负载均衡策略选择一个invoker** @param loadbalance 负载均衡策略* @param invocation  调用方法抽象* @param invokers    全部服务提供者invoker列表,已经经过了路由过滤* @param selected    已被调用过的服务提供者invoker列表* @return* @throws RpcException*/
protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation,List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {if (CollectionUtils.isEmpty(invokers)) {return null;}//调用方法名String methodName = invocation == null ? StringUtils.EMPTY_STRING : invocation.getMethodName();//默认false,设置true 该接口上的所有方法使用同一个provider.如果需要更复杂的规则,请使用路由boolean sticky = invokers.get(0).getUrl().getMethodParameter(methodName, CLUSTER_STICKY_KEY, DEFAULT_CLUSTER_STICKY);//ignore overloaded method 忽略重载方法if (stickyInvoker != null && !invokers.contains(stickyInvoker)) {stickyInvoker = null;}//ignore concurrency problem 忽略并发问题if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) {if (availableCheck && stickyInvoker.isAvailable()) {return stickyInvoker;}}/** 继续选择invoker*/Invoker<T> invoker = doSelect(loadbalance, invocation, invokers, selected);if (sticky) {stickyInvoker = invoker;}return invoker;
}

AbstractClusterInvoker#doSelect继续选择invoker

  1. 如果只有一个服务提供者invoker,那么返回。
  2. 调用loadbalance#select方法基于负载均衡策略选择一个invoker。这是核心方法,Dubbo提供有很多的负载均衡实现,具体的源码我们后面单独分析。
  3. 如果invoker在已被调用过的服务提供者invoker列表中,那么调用reselect方法重新选择一个。
  4. 重新选择仍然失败,采用兜底策略。检查当前选中的invoker的索引,如果不是最后一个,则选择索引+1的invoker,否则返回第一个。
/*** AbstractClusterInvoker的方法* <p>* 基于负载均衡策略选择一个invoker** @param loadbalance 负载均衡策略* @param invocation  调用方法抽象* @param invokers    全部服务提供者invoker列表,已经经过了路由过滤* @param selected    已被调用过的服务提供者invoker列表*/
private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation,List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {if (CollectionUtils.isEmpty(invokers)) {return null;}//如果只有一个服务提供者invoker,那么返回if (invokers.size() == 1) {Invoker<T> tInvoker = invokers.get(0);checkShouldInvalidateInvoker(tInvoker);return tInvoker;}/** 基于负载均衡策略选择一个invoker* 核心方法*/Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);//If the `invoker` is in the  `selected` or invoker is unavailable && availablecheck is true, reselect.//如果invoker在selected中,则重新选择。boolean isSelected = selected != null && selected.contains(invoker);//如果availableCheck=true,并且invoker不可用,则重新选择。boolean isUnavailable = availableCheck && !invoker.isAvailable() && getUrl() != null;if (isUnavailable) {invalidateInvoker(invoker);}//重新选择if (isSelected || isUnavailable) {try {/** 重新基于负载均衡策略选择一个invoker*/Invoker<T> rInvoker = reselect(loadbalance, invocation, invokers, selected, availableCheck);if (rInvoker != null) {invoker = rInvoker;} //重新选择仍然失败,兜底策略else {//Check the index of current selected invoker, if it's not the last one, choose the one at index+1.//检查当前选中的invoker的索引,如果不是最后一个,则选择索引+1的invokerint index = invokers.indexOf(invoker);try {//Avoid collisioninvoker = invokers.get((index + 1) % invokers.size());} catch (Exception e) {logger.warn("2-5", "select invokers exception", "", e.getMessage() + " may because invokers list dynamic change, ignore.", e);}}} catch (Throwable t) {logger.error("2-5", "failed to reselect invokers", "", "cluster reselect fail reason is :" + t.getMessage() + " if can not solve, you can set cluster.availablecheck=false in url", t);}}return invoker;
}

FailoverClusterInvoker#invokeWithContext调用服务

将当前invoker设置到服务调用上下文RpcContext中,RpcContext是一个线程本地变量。随后使用服务提供者invoker#invoke执行rpc服务调用。

/*** AbstractClusterInvoker* <p>* 使用服务提供者invoker执行rpc服务调用** @param invoker    服务提供者invoker* @param invocation 方法调用抽象* @return 调用结果*/
protected Result invokeWithContext(Invoker<T> invoker, Invocation invocation) {//将当前invoker设置到服务调用上下文RpcContext中,RpcContext是一个线程本地变量setContext(invoker);Result result;try {if (ProfilerSwitch.isEnableSimpleProfiler()) {InvocationProfilerUtils.enterProfiler(invocation, "Invoker invoke. Target Address: " + invoker.getUrl().getAddress());}/** 使用服务提供者invoker执行rpc服务调用*/result = invoker.invoke(invocation);} finally {//清除上下文中的invoker属性clearContext(invoker);InvocationProfilerUtils.releaseSimpleProfiler(invocation);}return result;
}

总结

本次我们学习了,Dubbo 发起服务调用的上半部分源码,实际上就是按照Invoker一层层的调用,每一层的Invoker又不同的功能,通过Invoker层层包装实现一些类似于AOP的能力。

调用过程中也会执行一个Filter链表,用于执行一些额外的逻辑,最终默认会执行到FailoverClusterInvoker,这是一个失败重试的Invoker,是Dubbo默认的容错策略,会给予负载均衡策略选择一个真实的服务提供者Invoker发起远程RPC调用。

调用过程中,Invoker全程参与,这里我们也能明白为什么Invoker被称为可执行体,因为其内部封装了Dubbo远程RPC调用的各种逻辑:服务路由、负载均衡、失败容错等等,也能明白Invoker作为Dubbo核心模型的重要性了。

整体看下来,Dubbo服务调用的源码是不是比此前服务注册和服务发现的源码简单得多了呢?下文我们学习Dubbo 发起服务调用的下半部分源码,也就是真正的RPC调用的源码。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/69542.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

java-初识List

List&#xff1a; List 是一个接口&#xff0c;属于 java.util 包&#xff0c;用于表示有序的元素集合。List 允许存储重复元素&#xff0c;并且可以通过索引访问元素。它是 Java 集合框架&#xff08;Java Collections Framework&#xff09;的一部分 特点&#xff1a; 有序…

01-SDRAM控制器的设计——案例总概述

本教程重点▷▷▷ 存储器简介。 介绍 SDRAM 的工作原理。 详细讲解SDRAM 控制的Verilog 实现方法。 PLL IP和FIFO IP 的调用&#xff0c;计数器设计&#xff0c;按键边沿捕获&#xff0c;数码管控制。 完成SDRAM控制器应用的完整案例。 Signal Tap 调试方法。 准备工作▷…

idea 找不到或者无法加载主类

idea项目&#xff0c;之前一直是正常运行的&#xff0c;放假了之后再回来就遇到启动不了的问题。 WebApplication这个类右键运行的时候&#xff0c;也提示找不到主类。 对于这种之前运行没有问题&#xff0c;突然出问题的项目。 我的点是没有改动代码和数据的情况下项目就跑不起…

鸿蒙harmony 手势密码

1.效果图 2.设置手势页面代码 /*** 手势密码设置页面*/ Entry Component struct SettingGesturePage {/*** PatternLock组件控制器*/private patternLockController: PatternLockController new PatternLockController()/*** 用来保存提示文本信息*/State message: string …

Python中3个与众不同的运算符 :=海象 ->箭头 //地板除法运算符

在python中&#xff0c;有一些和其他编程语言不太一样的运算符&#xff0c;今天就给大家介绍几种Python中的3个与众不同的运算符 :海象 -&#xff1e;箭头 // 地板除法运算符。 1. 海象运算符 : 这个运算符用来分配值并同时返回变量&#xff0c;英文 walrus (a : 5) print(a…

2025.1.8(qt图形化界面之消息框)

笔记&#xff08;后期复习补充&#xff09; 作业 1> 手动将登录项目实现&#xff0c;不要使用拖拽编程 并且&#xff0c;当点击登录按钮时&#xff0c;后台会判断账号和密码是否相等&#xff0c;如果相等给出登录成功的提示&#xff0c;并且关闭当前界面&#xff0c;发射一…

实践深度学习:构建一个简单的图像分类器

引言 深度学习在图像识别领域取得了巨大的成功。本文将指导你如何使用深度学习框架来构建一个简单的图像分类器&#xff0c;我们将以Python和TensorFlow为例&#xff0c;展示从数据准备到模型训练的完整流程。 环境准备 在开始之前&#xff0c;请确保你的环境中安装了以下工…

json转excel,在excel内导入json, json-to-excel插件

简介 JSON 转 Excel 是一款 Microsoft Excel 插件&#xff0c;可将 JSON 数据转换为 Excel 表格。 要求 此插件适用于以下环境&#xff1a;Excel 2013 Service Pack 1 或更高版本、Excel 2016 for Mac、Excel 2016 或更高版本、Excel Online。 快速开始 本快速开始指南适用…

腾讯云AI代码助手评测:如何智能高效完成Go语言Web项目开发

腾讯云AI代码助手评测&#xff1a;如何智能高效完成Go语言Web项目开发 ?? 文章目录 腾讯云AI代码助手评测&#xff1a;如何智能高效完成Go语言Web项目开发 ?? 背景引言开发环境介绍腾讯云AI代码助手使用实例 1. 代码补全2. 技术对话3. 代码优化4. 规范代码5. Bug处理 获得…

STM32 CUBE Can调试

STM32 CUBE Can调试 1、CAN配置2、时钟配置3、手动添加4、回调函数5、启动函数和发送函数6、使用方法(采用消息队列来做缓存)7、数据不多在发送函数中获取空邮箱发送&#xff0c;否则循环等待空邮箱 1、CAN配置 2、时钟配置 3、手动添加 需要注意的是STM32CUBE配置的代码需要再…

清除el-table选中状态 clearSelection

如何在Vue应用中使用Element UI的el-table组件&#xff0c;通过this.$refs.multipleTable.clearSelection()方法来清除所有选中行的状态。适合前端开发者了解表格组件的交互操作。 // el-table绑定ref<el-table selection-change"selsChange" ref"multipl…

小结:NAT

在华为设备中&#xff0c;NAT&#xff08;网络地址转换&#xff09;有多种类型&#xff0c;通常用于实现私有网络与公网之间的地址转换&#xff0c;或是实现内部网络的地址隔离。以下是华为路由器和交换机中常见的 NAT 类型及其配置。 1. NAT 类型 (1) 静态 NAT&#xff08;S…

【大数据技术】搭建完全分布式高可用大数据集群(Kafka)

搭建完全分布式高可用大数据集群(Kafka) kafka_2.13-3.9.0.tgz注:请在阅读本篇文章前,将以上资源下载下来。 写在前面 本文主要介绍搭建完全分布式高可用集群 Kafka 的详细步骤。 注意: 统一约定将软件安装包存放于虚拟机的/software目录下,软件安装至/opt目录下。 安…

如何使用deepseek创建钉钉收费AI助理

1、打开deepseek&#xff0c;简单描述下自己的问题&#xff0c;勾选深度思考 2、deepseek给我生成了一大段提示词模板 3、打开钉钉-应用中心-找AI助理 4、点击立即创作 5、创作 默认创建的是免费上网&#xff0c;需要创建收费的话需要先申请&#xff0c;填一个表单内容和价格&…

DeepSeek RAGFlow构建本地知识库系统

学习目标 DeepSeek RAGFlow 构建本地知识库系统 学习内容 下载安装Docker 1.1 Docker 是什么 1.2 下载Docker 1.3 安装Docker配置DockerRAGFlow 配置 3.1 下载RAGFlow 3.2 RAGFlow配置 3.3 启动RAGFlow Docker新建知识库 4.1 查看本机IP 4.2 OLLAMA_HOST 变量配置 4.3 添加模…

unity学习31:Video Player 视频播放相关基础

目录 1 新增Video Player的 component 2 导入视频到Asset里 3 拖入到 video player的 video clip里去即可 4 渲染模式 4.1 多种渲染模式 4.2 如果选择 Render Texture模式 4.3 然后把Render Texture 拖到游戏里的 gameObject上面 5 在UI上显示 5.1 创建UI 5.2 在UI上…

机器学习 - 需要了解的条件概率、高斯分布、似然函数

似然函数是连接数据与参数的桥梁&#xff0c;通过“数据反推参数”的逆向思维&#xff0c;成为统计推断的核心工具。理解它的关键在于区分“参数固定时数据的概率”与“数据固定时参数的合理性”&#xff0c;这种视角转换是掌握现代统计学和机器学习的基础。 一、在学习似然函…

[LUA ERROR] bad light userdata pointer

Cocos2d项目&#xff0c;targetSdkVersion30&#xff0c;在 android 13 设备运行报错: [LUA ERROR] bad light userdata pointer &#xff0c;导致黑屏。 参考 cocos2dx 适配64位 arm64-v8a 30 lua 提示 bad light userdata pointer 黑屏-CSDN博客的方法 下载最新的Cocos2dx …

使用Docker + Ollama在Ubuntu中部署deepseek

1、安装docker 这里建议用docker来部署&#xff0c;方便简单 安装教程需要自己找详细的&#xff0c;会用到跳过 如果你没有安装 Docker&#xff0c;可以按照以下步骤安装&#xff1a; sudo apt update sudo apt install apt-transport-https ca-certificates curl software-p…

导航守卫router.beforeEach

router.beforeEach 是一个全局前置守卫&#xff0c;在每次路由跳转之前都会触发。 //index.jsrouter.beforeEach((to, from, next) > {// 打印即将要进入的目标路由信息console.log(即将要进入的目标路由信息:, to)// 打印当前正要离开的路由信息console.log(当前正要离开的…