RocketMQ(1.NameServer源码)

NameServer功能简述

主要功能如下

  1. 服务注册与发现:Nameserver扮演了RocketMQ集群中服务注册中心的角色。当RocketMQ中的Broker、Producer和Consumer启动时,它们会向Nameserver注册自己的网络地址和角色信息。Nameserver维护着集群中所有活跃实例的信息,并提供查询接口供其他组件发现这些实例。这样,Producer和Consumer可以根据Nameserver提供的信息找到Broker,实现消息的发布和消费。
  2. Topic路由信息管理:RocketMQ中的消息按照Topic进行分类。Nameserver维护着Topic到Broker之间的路由关系,包括哪些Topic被哪些Broker负责。这样,当Producer发送消息时,它会根据Topic在Nameserver查询路由信息,得知应该将消息发送到哪些Broker上。
  3. 负载均衡:Nameserver通过周期性地检查Broker的状态信息,包括负载、存活状态等,来维护一个Broker的列表,供Producer和Consumer使用。这样做的目的是为了在消息生产和消费过程中实现负载均衡,避免某个Broker过载,影响整个系统性能。
  4. 权限控制:Nameserver支持对连接到集群的客户端进行权限控制。通过配置,管理员可以限制哪些客户端可以连接到Broker,增加系统的安全性。
  5. 集群管理:Nameserver允许管理员对Broker和Topic进行动态管理,包括增加、删除Broker节点,创建和删除Topic等操作。这使得RocketMQ集群能够根据实际业务需求进行扩展和调整。

综上所述,RockerMQ Nameserver是RocketMQ集群中起着至关重要、不可或缺的作用。其服务注册与发现、Topic路由信息管理、负载均衡、权限控制和集群管理等功能,Nameserver确保整个RocketMQ系统的稳定、高效和安全运行。

核心源码

main0

public static void main(String[] args) {// 本地启动需设置环境变量System.setProperty(MixAll.ROCKETMQ_HOME_PROPERTY, "/Users/config/rocketmq");main0(args);controllerManagerMain();}

1.parseCommandlineAndConfigFile(解析命令行和配置文件,初始化namesrvConfig、nettyServerConfig、nettyClientConfig等配置对象)

public static void parseCommandlineAndConfigFile(String[] args) throws Exception {System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));Options options = ServerUtil.buildCommandlineOptions(new Options());CommandLine commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new DefaultParser());if (null == commandLine) {System.exit(-1);return;}namesrvConfig = new NamesrvConfig();nettyServerConfig = new NettyServerConfig();nettyClientConfig = new NettyClientConfig();nettyServerConfig.setListenPort(9876);if (commandLine.hasOption('c')) {String file = commandLine.getOptionValue('c');if (file != null) {InputStream in = new BufferedInputStream(Files.newInputStream(Paths.get(file)));properties = new Properties();properties.load(in);MixAll.properties2Object(properties, namesrvConfig);MixAll.properties2Object(properties, nettyServerConfig);MixAll.properties2Object(properties, nettyClientConfig);if (namesrvConfig.isEnableControllerInNamesrv()) {controllerConfig = new ControllerConfig();MixAll.properties2Object(properties, controllerConfig);}namesrvConfig.setConfigStorePath(file);System.out.printf("load config properties file OK, %s%n", file);in.close();}}MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);if (commandLine.hasOption('p')) {MixAll.printObjectProperties(logConsole, namesrvConfig);MixAll.printObjectProperties(logConsole, nettyServerConfig);MixAll.printObjectProperties(logConsole, nettyClientConfig);if (namesrvConfig.isEnableControllerInNamesrv()) {MixAll.printObjectProperties(logConsole, controllerConfig);}System.exit(0);}if (null == namesrvConfig.getRocketmqHome()) {System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);System.exit(-2);}MixAll.printObjectProperties(log, namesrvConfig);MixAll.printObjectProperties(log, nettyServerConfig);}

2.createAndStartNamesrvController(创建并运行NamesrvController)

public static NamesrvController start(final NamesrvController controller) throws Exception {if (null == controller) {throw new IllegalArgumentException("NamesrvController is null");}boolean initResult = controller.initialize();if (!initResult) {controller.shutdown();System.exit(-3);}Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, (Callable<Void>) () -> {controller.shutdown();return null;}));controller.start();return controller;}
一、controller.initialize()初始化
  1. 加载配置
private final HashMap<String/* Namespace */, HashMap<String/* Key */, String/* Value */>> configTable =new HashMap<>();
public void load() {String content = null;try {content = MixAll.file2String(this.namesrvController.getNamesrvConfig().getKvConfigPath());} catch (IOException e) {log.warn("Load KV config table exception", e);}if (content != null) {KVConfigSerializeWrapper kvConfigSerializeWrapper =KVConfigSerializeWrapper.fromJson(content, KVConfigSerializeWrapper.class);if (null != kvConfigSerializeWrapper) {this.configTable.putAll(kvConfigSerializeWrapper.getConfigTable());log.info("load KV config table OK");}}}
  1. 初始化网络组件
private void initiateNetworkComponents() {// remotingServer是一个NettyRemotingServer对象,用于接收和处理来自其他服务的请求或响应// BrokerHouseKeepingService对象用于处理Broker的连接和断开事件this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);// remotingClient是一个NettyRemotingClient对象,它用于向其他服务发送请求或响应this.remotingClient = new NettyRemotingClient(this.nettyClientConfig);}
  1. 初始化线程执行器
private void initiateThreadExecutors() {this.defaultThreadPoolQueue = new LinkedBlockingQueue<>(this.namesrvConfig.getDefaultThreadPoolQueueCapacity());// 用于处理默认的远程请求this.defaultExecutor = new ThreadPoolExecutor(this.namesrvConfig.getDefaultThreadPoolNums(), this.namesrvConfig.getDefaultThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.defaultThreadPoolQueue, new ThreadFactoryImpl("RemotingExecutorThread_")) {@Overrideprotected <T> RunnableFuture<T> newTaskFor(final Runnable runnable, final T value) {return new FutureTaskExt<>(runnable, value);}};this.clientRequestThreadPoolQueue = new LinkedBlockingQueue<>(this.namesrvConfig.getClientRequestThreadPoolQueueCapacity());// 处理客户端的路由信息请求this.clientRequestExecutor = new ThreadPoolExecutor(this.namesrvConfig.getClientRequestThreadPoolNums(), this.namesrvConfig.getClientRequestThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.clientRequestThreadPoolQueue, new ThreadFactoryImpl("ClientRequestExecutorThread_")) {@Overrideprotected <T> RunnableFuture<T> newTaskFor(final Runnable runnable, final T value) {return new FutureTaskExt<>(runnable, value);}};}
  1. 注册remoting server处理器
private void registerProcessor() {if (namesrvConfig.isClusterTest()) {// ClusterTestRequestProcessor是一个用于集群测试的处理器,它会在请求前后添加一些环境信息,比如产品环境名称、请求时间等this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()), this.defaultExecutor);} else {// Support get route info only temporarily// 在 namesrvConfig.isClusterTest() = false 时如果收到请求的 requestCode 等于 RequestCode.GET_ROUTEINFO_BY_TOPIC 则会使用ClientRequestProcessor来处理;当收到其他请求时,会使用DefaultRequestProcessor来处理。ClientRequestProcessor clientRequestProcessor = new ClientRequestProcessor(this);this.remotingServer.registerProcessor(RequestCode.GET_ROUTEINFO_BY_TOPIC, clientRequestProcessor, this.clientRequestExecutor);// DefaultRequestProcessor是一个用于正常运行的处理器,它会根据请求的类型,调用不同的方法来处理,比如注册Broker、获取路由信息、更新配置等。this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.defaultExecutor);}}
  1. 启动定时任务
private void startScheduleService() {// 首次延迟5毫秒执行,后续执行间隔5秒执行(分两种情况)// 后续执行:1.当执行任务时间小于间隔时间时,延迟(间隔时间-任务执行时间)执行//         2.当任务执行时间大于间隔时间,则任务结束立即执行下一次任务// 间隔扫描不活跃的Brokerthis.scanExecutorService.scheduleAtFixedRate(NamesrvController.this.routeInfoManager::scanNotActiveBroker,5, this.namesrvConfig.getScanNotActiveBrokerInterval(), TimeUnit.MILLISECONDS);//每隔10分钟打印所有KV配置this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.kvConfigManager::printAllPeriodically,1, 10, TimeUnit.MINUTES);// 每隔1分钟打印WaterMarkthis.scheduledExecutorService.scheduleAtFixedRate(() -> {try {NamesrvController.this.printWaterMark();} catch (Throwable e) {LOGGER.error("printWaterMark error.", e);}}, 10, 1, TimeUnit.SECONDS);}
  1. 初始化ssl上下文(配置remotingServer使用TLS协议进行安全通信)
  2. 注册RPC钩子
private void initiateRpcHooks() {// 注册RPC钩子 在remotingServer处理请求之前或之后执行一些自定义的逻辑this.remotingServer.registerRPCHook(new ZoneRouteRPCHook());}
二、注册JVM钩子
  1. controllerManager.shutdown()
// 注册一个ShutdownHookThread对象,JVM钩子,在程序终止时调用controllerManager.shutdown(),释放资源
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, (Callable<Void>) () -> {controllerManager.shutdown();return null;}));
public void shutdown() {this.heartbeatManager.shutdown();this.controllerRequestExecutor.shutdown();this.notifyService.shutdown();this.controller.shutdown();this.remotingClient.shutdown();}
三、controller.start()运行
  1. 运行NettyRemotingServer,启动一个NettyRemotingServer,用于接收和处理客户端的请求

  2. 运行remotingClient,启动一个NettyRemotingClient,用于向其他服务发送请求

  3. 运行FileWatch,调用它的start方法,启动一个文件监视服务,用于动态加载证书文件

用来跟踪SSL

  1. 运行RouteInfoManager,启动一个路由信息管理器,用于维护Broker和Topic的路由关系

unRegisterService.start() 提供了一种以批处理方式注销Broker的机制。扫描离线的Broker BlockingQueue,take()获取数据,while循环获取不活跃的broker。take()是阻塞的,drainTo()方法获取队列中的全部数据。

registerBroker() 接收Broker每隔30秒上报Broker信息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/7872.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

openfeign调用文件服务的文件上传接口报错:Current request is not a multipart request

解决办法&#xff1a; Api 接口 Api(tags "文件上接口") RestController public class FileController {Autowiredprivate FileFeignService fileFeignService;ApiOperation("上传文件")PostMapping(value "/uploadFile")public ResData<…

python常用数据类型区别

1.set集合和dict字典的区别 set没有对应的value值&#xff0c;两者都是可变类型&#xff0c;即不可哈希;两者的内部元素是不可变类型&#xff0c;即可哈希&#xff0c;都无索引&#xff0c;不可进行切片和根据索引进行的操作。 2.set集合和list列表的区别 相同点 都是可变类…

zookeeper学习(一) Standalone模式(单机模式)安装

安装准备 centos7环境jdk1.8环境zookeeper安装包 安装jdk 上传jdk安装包解压安装包到目录中 tar -zxvf jdk-8u361-linux-x64.tar.gz如果需要指定目录可以在后面加上 -C&#xff0c;如 tar -zxvf jdk-8u361-linux-x64.tar.gz -C 目录配置jdk环境变量 vim /etc/profile打开…

Dijkstra 算法——求解最短路径问题

迪杰斯特拉算法&#xff08;Dijkstra’s algorithm&#xff09;是一种用于解决单源最短路径问题的贪心算法。它可以找到从一个起始顶点到其他所有顶点的最短路径&#xff0c;并且适用于边的权重非负的图。 算法步骤如下&#xff1a; 创建一个数组 dist&#xff0c;用于保存起…

react当我们有两个完全不相关的组件想要通信时,就可以利用这种模式,其中一个组件负责订阅某个消息,而另一个元素则负责发送这个消息。使用Context配合

在nextjs项目中&#xff0c;发现两个组件没啥关系&#xff0c;例如一个是一直存在的头部组件&#xff0c;另一个是页面中的组件&#xff0c;当我点击头部组件中的特定按钮时&#xff0c;把数据传递到页面组件中&#xff0c;页面组件接受到canshu数据后在做其他操作&#xff0c;…

入门前端监控

背景 前端监控是指通过一系列手段对Web页面或应用程序进行实时监控和数据采集&#xff0c;以了解页面或应用程序的性能状况、用户行为等等&#xff0c;并及时发现和解决潜在的问题。一个完整的前端监控平台可以包括&#xff1a;数据收集与上报、数据整理与存储、数据展示这里仅…

redis---持久化和数据类型的基本操作

目录 1.redis持久化 2.redis数据类型 1.redis持久化 【1】RDB 启用rdb&#xff0c;查看是否有对应文件生成 1.进入配置文件&#xff0c;修改配置 [rootclient ~]# vim /etc/redis.conf save 60 5 # 自动出发机制&#xff08;60秒内进行5次操…

Java连锁门诊医院HIS信息管理系统源码

Java连锁门诊医院HIS信息管理系统源码&#xff1a;SaaS运维平台多医院多机构多门诊入驻强大的电子病历完整开发文档 一、系统概述 ❉采用主流成熟技术&#xff0c;软件结构简洁、代码规范易阅读&#xff0c;SaaS应用&#xff0c;全浏览器访问前后端分离&#xff0c;多服务协同…

通过两种实现方式理解CANoe TC8 demo是如何判断接收的以太网报文里的字段的

假设有一个测试用例,需求是:编写一个测试用例,发送一条icmpv4 echo request报文给DUT,identifier字段设置为10。判断DUT能够回复icmpv4 echo reply报文,且identifier字段值为10。 实现:在canoe的simulation setup界面插入一个test节点,ip地址为:192.168.0.1,mac地址为…

具身智能,是机器人的“冷饭热炒”吗?

大模型正如火如荼&#xff0c;下一个AI风口就来了。 如果你关注2023世界人工智能大会等行业峰会&#xff0c;以及英伟达、微软、谷歌、特斯拉和国內科技大厂的最新发布会&#xff0c;除了“大模型”&#xff0c;应该会听到另一个高频词——具身智能。 所谓具身智能Embodied AI …

IRIS搭建docker

之前把web实现了docker&#xff0c;开发或测试环境可能需要开发自己搭数据库&#xff0c;为了方便使用&#xff0c;把数据库也做一个docker。 由于原生的CentOS我还有改yum仓库&#xff0c;所以这次从之前lis搞的改好yum的镜像开始&#xff08;从改好yum的lisnew的镜像创建lis…

【Linux】Ubuntu基本使用与配置, 以及常见问题汇总(一)

前言 大学期间&#xff0c;感觉很多时候学习课外知识都是被推着往前走&#xff0c;很多内容并没有深入去学习&#xff0c;知识的记录受限于所学比较片面&#xff0c;如今渐渐意识到似乎并没有建立起相关知识的体系架构&#xff0c;缺乏一个系统学习并整理的过程。本文将以Ubunt…

【LangChain】检索器之MultiQueryRetriever

MultiQueryRetriever 概要内容总结 概要 基于距离的向量数据库检索在高维空间中嵌入查询&#xff0c;并根据“距离”查找相似的嵌入文档。 但是&#xff0c;如果查询措辞发生细微变化&#xff0c;或者嵌入不能很好地捕获数据的语义&#xff0c;检索可能会产生不同的结果。有时…

grid map学习笔记1之Ubuntu18.04+ROS-melodic编译安装grid_map栅格地图及示例运行

文章目录 0 引言1 安装依赖和编译1.1 安装依赖1.2 下载编译 2 运行示例2.1 simple_demo2.2 tutorial_demo2.3 iterators_demo2.4 image_to_gridmap_demo2.5 grid_map_to_image_demo2.6 opencv_demo2.7 resolution_change_demo2.8 filters_demo2.9 interpolation_demo 0 引言 苏…

labview 多线程同步

所谓通讯的同步是指多个线程同时进行或严格按照顺序执行&#xff0c;数据的严格性是指发送多少数据接收多少数据&#xff0c;不能出现数据丢失或重复接收的现象。 labview的同步机制有事件发生、集合点、通知器、信号量。 可以这么来记忆&#xff1a;事急&#xff08;集&…

JavaScript |(一)JavaScript简介及基本语法 | 尚硅谷JavaScript基础实战

学习来源&#xff1a;尚硅谷JavaScript基础&实战丨JS入门到精通全套完整版 文章目录 &#x1f4da;JavaScript简介&#x1f407; 实现&#x1f407;JavaScript的特点 &#x1f4da;基本知识&#x1f407;编写位置&#x1f525;方式一&#xff1a;在标签中写&#xff08;不推…

独立游戏《黑夜狩猎者》

做独立游戏已经4个月了&#xff0c;在下作品黑夜狩猎者已经上线TapTap平台。目前正加入薪火计划&#xff0c; 如果你也想加入&#xff0c;下方链接 TapTap 薪火计划

【超全面】Linux嵌入式干货学习系列教程

文章目录 一、前言二、Linux基础篇三、数据结构与算法基础三、Linux应用篇四、Linux网络篇五、ARM篇六、Linux系统移植篇七、Linux驱动篇八、Linux特别篇九、Linux项目篇 一、前言 博主学习Linux也有几个月了&#xff0c;在这里为广大朋友整理出嵌入式linux的学习知识&#xff…

zookeeper-3.7.1集群

1.下载&解压安装包apache-zookeeper-3.7.1-bin.tar.gz 解压到/app/ &改名zookeeper-3.7.1 [rootnode1 app]# tar -zxvf apache-zookeeper-3.7.1-bin.tar.gz -C /app/ [rootnode1 app]# mv apache-zookeeper-3.7.1-bin zookeeper-3.7.1 ---- 删除docs [rootnode1…

算法工程师-机器学习面试题总结(2)

线性回归 线性回归的基本思想是&#xff1f; 线性回归是一种用于建立和预测变量之间线性关系的统计模型。其基本思想是假设自变量&#xff08;输入&#xff09;和因变量&#xff08;输出&#xff09;之间存在线性关系&#xff0c;通过建立一个线性方程来拟合观测数据&#xff…