Netty基础—6.Netty实现RPC服务三

大纲

1.RPC的相关概念

2.RPC服务调用端动态代理实现

3.Netty客户端之RPC远程调用过程分析

4.RPC网络通信中的编码解码器

5.Netty服务端之RPC服务提供端的处理

6.RPC服务调用端实现超时功能

5.Netty服务端之RPC服务提供端的处理

(1)RPC服务提供端NettyServer

(2)基于反射调用请求对象的目标方法

(1)RPC服务提供端NettyRpcServer

public class ServiceConfig {private String serviceName;//调用方的服务名称private Class serviceInterfaceClass;//服务接口类型private Class serviceClass;...
}public class NettyRpcServer {private static final Logger logger = LogManager.getLogger(NettyRpcServer.class);private static final int DEFAULT_PORT = 8998;private List<ServiceConfig> serviceConfigs = new CopyOnWriteArrayList<ServiceConfig>();private int port;public NettyRpcServer(int port) {this.port = port;}public void start() {logger.info("Netty RPC Server Starting...");EventLoopGroup bossEventLoopGroup = new NioEventLoopGroup();EventLoopGroup workerEventLoopGroup = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossEventLoopGroup, workerEventLoopGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new RpcDecoder(RpcRequest.class)).addLast(new RpcEncoder(RpcResponse.class)).addLast(new NettyRpcServerHandler(serviceConfigs));}}).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);//到这一步为止,server启动了而且监听指定的端口号了ChannelFuture channelFuture = serverBootstrap.bind(port).sync();logger.info("Netty RPC Server started successfully, listened[" + port + "]");//进入一个阻塞的状态,同步一直等待到你的server端要关闭掉channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {logger.error("Netty RPC Server failed to start, listened[" + port + "]");} finally {bossEventLoopGroup.shutdownGracefully();workerEventLoopGroup.shutdownGracefully();}}//可以代理多个服务public void addServiceConfig(ServiceConfig serviceConfig) {this.serviceConfigs.add(serviceConfig);}public static void main(String[] args) {ServiceConfig serviceConfig = new ServiceConfig( "TestService", TestService.class, TestServiceImpl.class);NettyRpcServer nettyRpcServer = new NettyRpcServer(DEFAULT_PORT);nettyRpcServer.addServiceConfig(serviceConfig);nettyRpcServer.start();}
}

(2)基于反射调用请求对象的目标方法

//RpcRequest类需要修改字段调整为如下所示
public class RpcRequest implements Serializable {private String requestId;private String className;private String methodName;private Class[] parameterTypes;//参数类型private Object[] args;//参数值private String invokerApplicationName;//调用方的服务名称private String invokerIp;//调用方的IP地址...
}public class NettyRpcServerHandler extends ChannelInboundHandlerAdapter {private static final Logger logger = LogManager.getLogger(NettyRpcServerHandler.class);private ConcurrentHashMap<String, ServiceConfig> serviceConfigMap = new ConcurrentHashMap<String, ServiceConfig>();public NettyRpcServerHandler(List<ServiceConfig> serviceConfigs) {for (ServiceConfig serviceConfig : serviceConfigs) {String serviceInterfaceClass = serviceConfig.getServiceInterfaceClass().getName();serviceConfigMap.put(serviceInterfaceClass, serviceConfig);}}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {RpcRequest rpcRequest = (RpcRequest)msg;logger.info("Netty RPC Server receives the request: " + rpcRequest);RpcResponse rpcResponse = new RpcResponse();rpcResponse.setRequestId(rpcRequest.getRequestId());try {//此时我们要实现什么呢?//我们需要根据RpcRequest指定的class,获取到这个class//然后通过反射构建这个class对象实例//接着通过反射获取到这个RpcRequest指定方法和入参类型的method//最后通过反射调用,传入方法,拿到返回值//根据接口名字拿到接口实现类的名字后再获取类ServiceConfig serviceConfig = serviceConfigMap.get(rpcRequest.getServiceInterfaceClass());Class clazz = serviceConfig.getServiceClass();Object instance = clazz.newInstance();Method method = clazz.getMethod(rpcRequest.getMethodName(), rpcRequest.getParameterTypes());Object result = method.invoke(instance, rpcRequest.getArgs());//把rpc调用结果封装到响应里去rpcResponse.setResult(result);rpcResponse.setSuccess(RpcResponse.SUCCESS);} catch(Exception e) {logger.error("Netty RPC Server failed to response the request.", e);rpcResponse.setSuccess(RpcResponse.FAILURE);rpcResponse.setException(e);}ctx.write(rpcResponse);ctx.flush();logger.info("send RPC response to client: " + rpcResponse);}
}

6.RPC服务调用端实现超时功能

public class ReferenceConfig {private static final long DEFAULT_TIMEOUT = 5000;private static final String DEFAULT_SERVICE_HOST = "127.0.0.1";private static final int DEFAULT_SERVICE_PORT = 8998;private Class serviceInterfaceClass;private String serviceHost;private int servicePort;private long timeout;...
}public class NettyRpcClient {private static final Logger logger = LogManager.getLogger(NettyRpcClient.class);private ReferenceConfig referenceConfig;private ChannelFuture channelFuture;private NettyRpcClientHandler nettyRpcClientHandler;public NettyRpcClient(ReferenceConfig referenceConfig) {this.referenceConfig = referenceConfig;this.nettyRpcClientHandler = new NettyRpcClientHandler(referenceConfig.getTimeout());}public void connect() {logger.info("connecting to Netty RPC server: " + referenceConfig.getServiceHost() + ":" + referenceConfig.getServicePort());EventLoopGroup eventLoopGroup = new NioEventLoopGroup();Bootstrap bootstrap = new Bootstrap();bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true)//长时间没有通信就发送一个检测包.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new RpcEncoder(RpcRequest.class)).addLast(new RpcDecoder(RpcResponse.class)).addLast(new NettyRpcReadTimeoutHandler(referenceConfig.getTimeout())).addLast(nettyRpcClientHandler);}});       try {if (referenceConfig.getServiceHost() != null && !referenceConfig.getServiceHost().equals("")) {channelFuture = bootstrap.connect(referenceConfig.getServiceHost(), referenceConfig.getServicePort()).sync();logger.info("successfully connected.");}} catch(Exception e) {throw new RuntimeException(e);}}public RpcResponse remoteCall(RpcRequest rpcRequest) throws Throwable {//标记一下请求发起的时间NettyRpcRequestTimeHolder.put(rpcRequest.getRequestId(), new Date().getTime());channelFuture.channel().writeAndFlush(rpcRequest).sync();RpcResponse rpcResponse = nettyRpcClientHandler.getRpcResponse(rpcRequest.getRequestId());logger.info("receives response from netty rpc server.");if (rpcResponse.isSuccess()) {return rpcResponse;}throw rpcResponse.getException();}
}public class NettyRpcReadTimeoutHandler extends ChannelInboundHandlerAdapter {private static final Logger logger = LogManager.getLogger(NettyRpcReadTimeoutHandler.class);private long timeout;public NettyRpcReadTimeoutHandler(long timeout) {this.timeout = timeout;}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {RpcResponse rpcResponse = (RpcResponse)msg;long requestTime = NettyRpcRequestTimeHolder.get(rpcResponse.getRequestId());long now = new Date().getTime();if (now - requestTime >= timeout) {rpcResponse.setTimeout(true);logger.error("Netty RPC response is marked as timeout status: " + rpcResponse);}//移除发起请求时间的标记NettyRpcRequestTimeHolder.remove(rpcResponse.getRequestId());ctx.fireChannelRead(rpcResponse);}
}public class NettyRpcClientHandler extends ChannelInboundHandlerAdapter {private static final Logger logger = LogManager.getLogger(NettyRpcClientHandler.class);private static final long GET_RPC_RESPONSE_SLEEP_INTERVAL = 5;private ConcurrentHashMap<String, RpcResponse> rpcResponses = new ConcurrentHashMap<String, RpcResponse>();private long timeout;public NettyRpcClientHandler(long timeout) {this.timeout = timeout;}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {RpcResponse rpcResponse = (RpcResponse) msg;if (rpcResponse.getTimeout()) {logger.error("Netty RPC client receives the response timeout: " + rpcResponse);} else {rpcResponses.put(rpcResponse.getRequestId(), rpcResponse);logger.info("Netty RPC client receives the response: " + rpcResponse);}}public RpcResponse getRpcResponse(String requestId) throws NettyRpcReadTimeoutException {long waitStartTime = new Date().getTime();while (rpcResponses.get(requestId) == null) {try {long now = new Date().getTime();if (now - waitStartTime >= timeout) {break;}Thread.sleep(GET_RPC_RESPONSE_SLEEP_INTERVAL);} catch (InterruptedException e) {logger.error("wait for response interrupted", e);}}RpcResponse rpcResponse = rpcResponses.get(requestId);if (rpcResponse == null) {logger.error("Get RPC response timeout.");throw new NettyRpcReadTimeoutException("Get RPC response timeout.");} else {rpcResponses.remove(requestId);}return rpcResponse;}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/73545.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

路由器与防火墙配置命令

路由器与防火墙配置命令 小明啊&#xff0c;你不是学计算机的嘛&#xff0c;叔叔家的路由器坏了&#xff0c;可以过来帮叔叔看看吗 命令可以用缩写&#xff0c;造就一堆容易造成歧义的缩写&#xff0c;比如add是address的缩写&#xff0c;sh是shutdown的缩写。 默认为Cisco路…

Go语言进化之旅:从1.18到1.24的语法变革

文章目录 里程碑变革&#xff1a;泛型支持Go 1.18&#xff1a;泛型的引入Go 1.19-1.21&#xff1a;泛型的完善Go 1.24&#xff1a;泛型类型别名全面支持 循环与迭代的进化Go 1.22&#xff1a;循环变量作用域变化与整数遍历Go 1.23&#xff1a;迭代器函数的支持Go 1.24&#xff…

发现一个GoVCL的问题

之前用govcl写了一个服务端的界面程序&#xff0c;用来控制服务的开启和关闭。 由于这个服务程序运行的时间比较长&#xff0c;经常是挂着在服务器上24小时不间断运行。 后来经过调试发现&#xff0c;govcl的界面按钮控件&#xff0c;在程序长时间运行后&#xff0c;会出现无法…

34个适合机械工程及自动化专业【论文选题】

论文选题具有极其重要的意义&#xff0c;它直接关系到论文的质量、价值以及研究的可行性和顺利程度。选题明确了研究的具体领域和核心问题&#xff0c;就像给研究旅程设定了方向和目的地。例如&#xff0c;选择 “人工智能在医疗影像诊断中的应用” 这一选题&#xff0c;就确定…

电脑实用小工具--VMware常用功能简介

一、创建、编辑虚拟机 1.1 创建新的虚拟机 详见文章新创建虚拟机流程 1.2 编辑虚拟机 创建完成后&#xff0c;点击编辑虚拟机设置&#xff0c;可对虚拟机内存、处理器、硬盘等各再次进行编辑设置。 二、虚拟机开关机 2.1 打开虚拟机 虚拟机创建成功后&#xff0c;点击…

双指针算法专题之——有效三角形的个数

文章目录 题目介绍思路分析AC代码 题目介绍 链接: 611. 有效三角形的个数 思路分析 如果判断三个数能否构成一个三角形&#xff0c;相信大家都知道&#xff1a; 只要任意两边之和大于第三边即可。 比如三条边长度为a&#xff0c;b&#xff0c;c 那只要满足 ab>c ac>b b…

Linux内核实时机制27 - RT调度器10 - RT throttling 带宽控制下

文章目录 1、初始化带宽 init_rt_bandwidth1.1、init_rt_bandwidth2、定时器处理2.1、sched_rt_period_timer2.2、do_sched_rt_period_timer3、总结1、初始化带宽 init_rt_bandwidth rt_runtime : 一个时间周期内的运行时间,超过则限流,默认值为0.95ms 1、init_rt_bandwidth…

1.5[hardware][day5]

Link类跳转指令可以拆分为两个部分&#xff0c;一个是跳转&#xff0c;即下一个PC的生成&#xff0c;如果将分支条件的比较放到译码级来进行&#xff0c;则这部分只涉及取值级和译码级流水&#xff1b;另一个是Link操作&#xff0c;简单来说就是写寄存器&#xff0c;这部则主要…

Tomcat 与 Java 环境变量配置简明教程

Tomcat 与 Java 环境变量配置简明教程 一、Tomcat 环境变量配置 1. 确认安装路径 假设 Tomcat 安装在&#xff1a;D:\Tomcat\apache-tomcat-9.0.70 2. 设置 CATALINA_HOME 步骤&#xff1a; 右键点击「此电脑」→「属性」点击「高级系统设置」→「环境变量」在「系统变量…

3.16学习总结

学习了Java的知识点 基本数据类型 byte占1字节&#xff0c;储存范围-128~127 short占2字节&#xff0c;储存范围-32768~32767 int占4字节&#xff0c;储存范围-2147483648~2147483647 long占8字节&#xff0c;储存范围是-9223372036854775808~9223372036854775807 float占…

Android手机中各类安全相关知识总结

更多内容请见: 爬虫和逆向教程-专栏介绍和目录 文章目录 1. Android 安全威胁2. Android 安全防护措施3. Android 安全建议和最佳实践4. Android 安全工具推荐5. Android 安全常见问题5.1 如何检测设备是否感染恶意软件?5.2 如何防止应用滥用权限?5.3 如何保护设备免受网络攻…

【Ratis】项目总览

Apache Ratis 项目源码分析与运行原理 Apache Ratis 是一个高性能、可扩展的分布式一致性协议实现,是对Raft协议的Java版本的很好的工程实现。它提供了灵活的 API 和多种传输层支持(如 gRPC 和 Netty),适用于构建分布式系统中的核心组件,例如分布式存储、配置管理和服务发…

以太网 MAC 帧格式

文章目录 以太网 MAC 帧格式以太网帧间隔参考 本文为笔者学习以太网对网上资料归纳整理所做的笔记&#xff0c;文末均附有参考链接&#xff0c;如侵权&#xff0c;请联系删除。 以太网 MAC 帧格式 以太网技术的正式标准是 IEEE 802.3&#xff0c;它规定了以太网传输数据的帧结…

pycharm配置镜像源【pycharm最新版(23.2.5及以上)方法】

经常遇到pycharm中无法安装或者安装慢的问题&#xff0c;纠结了好久&#xff0c;终于找到这个解决办法了。 为什么要配置镜像源&#xff1a; 因为Python的包管理工具pip一般从PyPI&#xff08;Python Package Index&#xff09;下载安装包&#xff0c;但是PyPI位于国外&#x…

驾驭 DeepSeek 科技之翼,翱翔现代学习新天际

在当今这个信息爆炸的时代&#xff0c;学习的方式和途径正在经历着前所未有的变革。人工智能技术的飞速发展&#xff0c;为我们的学习带来了全新的机遇和挑战。DeepSeek 作为一款强大的大语言模型&#xff0c;凭借其卓越的性能和丰富的功能&#xff0c;为现代学习注入了新的活力…

科普:WOE编码与One-Hot编码

WOE编码是业务逻辑与统计建模的结合&#xff0c;适合强业务导向的场景&#xff1b; One-Hot编码是数据驱动的特征工程&#xff0c;适合追求模型性能的场景。 编码方式核心价值典型案例WOE编码保留变量预测能力&#xff0c;适配线性模型银行违约预测逻辑回归One-Hot编码释放特征…

Python----数据分析(Pandas一:pandas库介绍,pandas操作文件读取和保存)

一、Pandas库 1.1、概念 Pandas是一个开源的、用于数据处理和分析的Python库&#xff0c;特别适合处理表格类数 据。它建立在NumPy数组之上&#xff0c;提供了高效的数据结构和数据分析工具&#xff0c;使得数据操作变得更加简单、便捷和高效。 Pandas 的目标是成为 Python 数据…

Type-C:智能家居的电力革命与空间美学重构

在万物互联的时代浪潮中&#xff0c;家居空间正经历着从功能容器到智慧终端的蜕变。当意大利设计师安东尼奥奇特里奥提出"消失的设计"理念二十年后&#xff0c;Type-C充电技术正以润物无声的方式重塑着现代家居的形态与内核&#xff0c;开启了一场静默的居住革命。 【…

C++ 左值(lvalue)和右值(rvalue)

在 C 中&#xff0c;左值&#xff08;lvalue&#xff09;和右值&#xff08;rvalue&#xff09;是指对象的不同类别&#xff0c;区分它们对于理解 C 中的表达式求值和资源管理非常重要&#xff0c;尤其在现代 C 中涉及到移动语义&#xff08;Move Semantics&#xff09;和完美转…

【含文档+PPT+源码】基于SpringBoot和Vue的编程学习系统

项目介绍 本课程演示的是一款 基于SpringBoot和Vue的编程学习系统&#xff0c;主要针对计算机相关专业的正在做毕设的学生与需要项目实战练习的 Java 学习者。 1.包含&#xff1a;项目源码、项目文档、数据库脚本、软件工具等所有资料 2.带你从零开始部署运行本套系统 3.该项…