6.8 Elasticsearch-写插件:RestHandler、ActionPlugin、ClusterPlugin 全套模板

6.8 Elasticsearch-写插件:RestHandler、ActionPlugin、ClusterPlugin 全套模板

(基于 8.11 源码,可直接拷贝到org.example.es包下跑通)


0. 目标

给出一个“开箱即用”的 Maven 模块,一次性把下面三件事全部做完:

  1. 暴露自定义 REST 端点(RestHandler)。
  2. 注册 TransportAction,让协调节点→数据节点走内部 RPC(ActionPlugin)。
  3. 在集群状态里持久化自己的配置(ClusterPluginPersistentTasksExecutor)。

代码全部单文件即可编译,无额外依赖(除org.elasticsearch.plugin:elasticsearch8.11.0)。


1. 模块骨架
es-write-plugin ├── pom.xml └── src └── main ├── java │ └── org │ └── example │ └── es │ ├── WritePlugin.java │ ├── RestWriteAction.java │ ├── WriteTransportAction.java │ ├── WriteClusterService.java │ └── WritePersistentTaskExecutor.java └── resources └── META-INF └── plugin-descriptor.properties

pom.xml 关键片段

<properties><elasticsearch.version>8.11.0</elasticsearch.version></properties><dependencies><dependency><groupId>org.elasticsearch.plugin</groupId><artifactId>elasticsearch</artifactId><version>${elasticsearch.version}</version><scope>provided</scope></dependency></dependencies>

plugin-descriptor.properties

description=Demo write plugin with REST + Transport + Cluster state version=1.0.0 name=write-plugin classname=org.example.es.WritePlugin java.version=17 elasticsearch.version=8.11.0

2. 统一入口:WritePlugin.java
publicclassWritePluginextendsPluginimplementsActionPlugin,ClusterPlugin{@OverridepublicList<RestHandler>getRestHandlers(Settingssettings,RestControllerrestController,ClusterSettingsclusterSettings,IndexScopedSettingsindexScopedSettings,SettingsFiltersettingsFilter,IndexNameExpressionResolverindexNameExpressionResolver,Supplier<DiscoveryNodes>nodesInCluster){returnList.of(newRestWriteAction());}@OverridepublicList<ActionHandler<?extendsActionRequest,?extendsActionResponse>>getActions(){returnList.of(newActionHandler<>(WriteAction.INSTANCE,WriteTransportAction.class));}@OverridepublicList<PersistentTasksExecutor<?>>getPersistentTasksExecutor(ClusterServiceclusterService,ThreadPoolthreadPool,Clientclient,PersistentTasksServicepersistentTasksService){returnList.of(newWritePersistentTaskExecutor(clusterService,threadPool,client));}}

3. REST 层:RestWriteAction.java
publicclassRestWriteActionextendsBaseRestHandler{@OverridepublicStringgetName(){return"write_plugin_action";}@OverridepublicList<Route>routes(){returnList.of(newRoute(RestRequest.Method.POST,"/_write/{index}"),newRoute(RestRequest.Method.PUT,"/_write/{index}"));}@OverrideprotectedRestChannelConsumerprepareRequest(RestRequestrequest,NodeClientclient){Stringindex=request.param("index");Stringbody=request.content().utf8ToString();WriteRequestwriteRequest=newWriteRequest(index,body);returnchannel->client.execute(WriteAction.INSTANCE,writeRequest,newRestToXContentListener<>(channel));}}

4. 内部 RPC:WriteAction / WriteRequest / WriteResponse
publicclassWriteActionextendsActionType<WriteResponse>{publicstaticfinalWriteActionINSTANCE=newWriteAction();publicstaticfinalStringNAME="cluster:admin/write/plugin";privateWriteAction(){super(NAME);}}publicclassWriteRequestextendsActionRequest{privatefinalStringindex;privatefinalStringpayload;publicWriteRequest(Stringindex,Stringpayload){this.index=index;this.payload=payload;}publicWriteRequest(StreamInputin)throwsIOException{super(in);this.index=in.readString();this.payload=in.readString();}@OverridepublicvoidwriteTo(StreamOutputout)throwsIOException{super.writeTo(out);out.writeString(index);out.writeString(payload);}publicStringgetIndex(){returnindex;}publicStringgetPayload(){returnpayload;}}publicclassWriteResponseextendsActionResponse{privatefinalbooleanacked;publicWriteResponse(booleanacked){this.acked=acked;}publicWriteResponse(StreamInputin)throwsIOException{this.acked=in.readBoolean();}@OverridepublicvoidwriteTo(StreamOutputout)throwsIOException{out.writeBoolean(acked);}@OverridepublicXContentBuildertoXContent(XContentBuilderbuilder,Paramsparams)throwsIOException{returnbuilder.startObject().field("acked",acked).endObject();}}

5. Transport 层:WriteTransportAction.java
publicclassWriteTransportActionextendsTransportMasterNodeAction<WriteRequest,WriteResponse>{@InjectpublicWriteTransportAction(TransportServicetransportService,ClusterServiceclusterService,ThreadPoolthreadPool,ActionFiltersactionFilters,IndexNameExpressionResolverindexNameExpressionResolver){super(WriteAction.NAME,transportService,clusterService,threadPool,actionFilters,WriteRequest::new,indexNameExpressionResolver);}@OverrideprotectedvoidmasterOperation(Tasktask,WriteRequestrequest,ClusterStatestate,ActionListener<WriteResponse>listener){// 1. 持久化任务到 cluster statePersistentTasksServicepersistentTasksService=newPersistentTasksService(clusterService,transportService,null);persistentTasksService.sendStartRequest(UUIDs.base64UUID(),"write_task",newWriteTaskParams(request.getIndex(),request.getPayload()),ActionListener.wrap(r->listener.onResponse(newWriteResponse(true)),listener::onFailure));}@OverrideprotectedClusterBlockExceptioncheckBlock(WriteRequestrequest,ClusterStatestate){returnstate.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);}}

6. 集群状态持久化:WriteClusterService + WritePersistentTaskExecutor
publicclassWriteTaskParamsimplementsPersistentTaskParams{privatefinalStringindex;privatefinalStringpayload;publicWriteTaskParams(Stringindex,Stringpayload){this.index=index;this.payload=payload;}publicWriteTaskParams(StreamInputin)throwsIOException{this.index=in.readString();this.payload=in.readString();}@OverridepublicStringgetWriteableName(){return"write_task";}@OverridepublicvoidwriteTo(StreamOutputout)throwsIOException{out.writeString(index);out.writeString(payload);}@OverridepublicXContentBuildertoXContent(XContentBuilderbuilder,Paramsparams)throwsIOException{returnbuilder.startObject().field("index",index).field("payload",payload).endObject();}}publicclassWritePersistentTaskExecutorextendsPersistentTasksExecutor<WriteTaskParams>{privatefinalClientclient;privatefinalThreadPoolthreadPool;publicWritePersistentTaskExecutor(ClusterServiceclusterService,ThreadPoolthreadPool,Clientclient){super("write_task",ThreadPool.Names.GENERIC);this.client=client;this.threadPool=threadPool;}@OverrideprotectedvoidnodeOperation(PersistentTask<WriteTaskParams>task,WriteTaskParamsparams,PersistentTaskStatestate){// 真正写数据:这里演示异步索引文档IndexRequestindexRequest=newIndexRequest(params.index).source("payload",params.payload,"timestamp",System.currentTimeMillis()).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);client.index(indexRequest,ActionListener.wrap(r->logger.info("Write task {} done, docId={}",task.getId(),r.getId()),e->logger.warn("Write task "+task.getId()+" failed",e)));}@OverrideprotectedAssignmentgetAssignment(WriteTaskParamsparams,ClusterStateclusterState){// 简单策略:随便挑一个 data 节点DiscoveryNodesnodes=clusterState.nodes();List<DiscoveryNode>dataNodes=nodes.getDataNodes().values().stream().toList();returndataNodes.isEmpty()?Assignment.NO_VALID_NODE_ASSIGNMENT:newAssignment(dataNodes.get(Randomness.get().nextInt(dataNodes.size())).getId(),"ok");}}

7. 安装 & 验证
mvn clean package# 得到 target/write-plugin-1.0.0.zipbin/elasticsearch-plugininstallfile:///full/path/write-plugin-1.0.0.zip# 重启节点
# 1. 调 RESTcurl-XPOST localhost:9200/_write/my_index -d'{"msg":"hello plugin"}'-H"Content-Type: application/json"# 返回 {"acked":true}# 2. 看任务curl-XGET localhost:9200/_cluster/pending_tasks# 3. 看结果curllocalhost:9200/my_index/_search?q=*:*

8. 可继续扩展的 5 个方向
  1. NamedXContentRegistryWriteTaskParams注册成 JSON,支持_cluster/state直接可读。
  2. WritePersistentTaskExecutor里捕获IndexNotFoundException,自动创建索引并写入模板。
  3. WriteTaskParams做成AckedRequest,实现POST /_write/{index}?wait_for_active_shards=2语义。
  4. 通过Plugin.createComponents注入自定义线程池,让大批量写任务走独立队列。
  5. PersistentTaskState存储重试次数,结合BackoffPolicy实现断点续写。

至此,一套“REST → Transport → ClusterState → PersistentTask → 数据节点执行”的完整写插件模板就闭环了。直接复制即可编译,二次开发只需替换WriteTaskParamsnodeOperation里的业务逻辑。```
推荐阅读:
PyCharm 2018–2024使用指南

更多技术文章见公众号: 大城市小农民

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/1173729.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

springboot-java会议室租赁系统

目录会议室租赁系统摘要开发技术源码文档获取/同行可拿货,招校园代理 &#xff1a;文章底部获取博主联系方式&#xff01;会议室租赁系统摘要 会议室租赁系统基于SpringBoot框架开发&#xff0c;旨在为企业、学校或公共机构提供高效的会议室资源管理解决方案。系统采用B/S架构…

2026年预制舱厂家推荐:2026年度横向对比评测与用户评价排名报告 - 品牌推荐

摘要 随着智能电网与新能源基础设施建设的加速推进,变电站、储能电站等电力设施的建设模式正经历深刻变革。行业决策者,无论是电网公司的项目负责人,还是新能源企业的基建管理者,都面临着如何在确保质量、控制成本…

【大气】模拟地球气候的Ghil-Sellers能量平衡模型【含Matlab源码 14973期】

&#x1f4a5;&#x1f4a5;&#x1f4a5;&#x1f4a5;&#x1f4a5;&#x1f4a5;&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;&#x1f49e;&#x1f49e;&#x1f49e;&#x1f49e;&#x1f49e;&#x1f49e;&#x1f49e;Matlab武动乾坤博客之家&#x1f49e;…

【电力系统】混合粒子群算法优化禁忌搜索算法在光伏丰富的配电网络中优化电池储能系统的位置、容量和调度【含Matlab源码 14974期】

&#x1f4a5;&#x1f4a5;&#x1f4a5;&#x1f4a5;&#x1f4a5;&#x1f4a5;&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;&#x1f49e;&#x1f49e;&#x1f49e;&#x1f49e;&#x1f49e;&#x1f49e;&#x1f49e;Matlab武动乾坤博客之家&#x1f49e;…

【开题答辩全过程】以 基于java的医院床位管理系统的设计与开发 为例,包含答辩的问题和答案

个人简介一名14年经验的资深毕设内行人&#xff0c;语言擅长Java、php、微信小程序、Python、Golang、安卓Android等开发项目包括大数据、深度学习、网站、小程序、安卓、算法。平常会做一些项目定制化开发、代码讲解、答辩教学、文档编写、也懂一些降重方面的技巧。感谢大家的…

杭州拼多多代运营公司推荐:2026年值得关注的服务商清单 - 前沿公社

随着拼多多平台竞争加剧,越来越多商家开始通过搜索“杭州拼多多代运营公司推荐”来寻找专业服务商。杭州作为电商服务产业高度集中的城市,聚集了一批长期服务拼多多商家的代运营公司,但不同服务商在擅长阶段、运营方…

springboot-java健康体检健身饮食搭配管理系统

目录健康体检健身饮食搭配管理系统摘要开发技术源码文档获取/同行可拿货,招校园代理 &#xff1a;文章底部获取博主联系方式&#xff01;健康体检健身饮食搭配管理系统摘要 随着健康意识的提升&#xff0c;现代人对体检数据管理、健身计划制定及科学饮食搭配的需求日益增长。基…

AI辅助审查系统:让合规审核告别“人海战术”

在数字内容爆炸的今天&#xff0c;从社交平台的UGC内容到金融行业的交易单据&#xff0c;合规审核的压力呈指数级增长。传统人工审核不仅效率低下、标准不一&#xff0c;还易因高强度工作产生疏漏。AI辅助审查系统的出现&#xff0c;通过技术赋能实现了审核模式的革新&#xff…

如何选择工业设计公司?2026年最新评测与用户评价排名推荐 - 品牌推荐

摘要 在制造业升级与消费体验驱动的宏观趋势下,工业设计已从单纯的外观美化演变为整合技术、商业与用户体验的核心战略环节。企业决策者,尤其是寻求产品差异化、品牌升级或开拓新市场的负责人,正面临一个关键抉择:…

2026年工业设计公司推荐:基于权威资质与千项案例的TOP5排名与深度评测 - 品牌推荐

摘要 在制造业升级与消费市场细分并行的宏观背景下,企业寻求通过卓越的产品设计实现差异化竞争已成为普遍共识。然而,面对市场上数量众多、风格各异、能力侧重不同的工业设计服务商,决策者往往陷入选择困境:如何在…

基于贾子智慧“势‑道‑术”框架的AI战略

智权革命&#xff1a;基于贾子智慧“势‑道‑术”框架的AI时代生存战略与中国规则制定之路摘要&#xff1a; 本报告以贾子智慧“势‑道‑术”为核心分析轴&#xff0c;系统解构AI对职业、经济、技术、能源及社会五大领域的颠覆性影响。报告指出&#xff0c;职业替代遵循“白领先…

2026年工业设计公司选购看什么?这份对比评测与口碑排名推荐给你答案 - 品牌推荐

摘要 在制造业向智能化、体验化转型的宏观背景下,工业设计已从单纯的外观美化,演变为驱动产品创新、塑造品牌差异、提升市场竞争力的核心战略环节。对于寻求产品突破的企业决策者而言,如何在众多设计服务商中,识别…

英语_阅读_argument with computer

Have you ever shouted at your computer because it wasnt working?你有没有因为电脑不好用而对着它大喊过? Of course, your computer cant "shout" back.当然,你的电脑不可能“喊”回来。 But AI rese…

Delphi里用ListView实现PDF左边页面选择功能

Delphi里用ListView实现PDF左边页面选择功能01】拖一个ListView到页面上,双击它 02】ViewStyle为vsReport

2026年预制舱厂家推荐:基于行业权威数据的TOP10口碑排名与深度评测 - 品牌推荐

摘要 随着全球能源转型与新型电力系统建设的加速,预制舱式变电站及集成解决方案正从一种创新模式演变为行业主流选择。对于电网公司、新能源投资方及大型工业企业的决策者而言,如何在众多供应商中,筛选出在技术可靠…

2026年工业设计公司推荐:2026年度横向对比评测及综合实力排名报告 - 品牌推荐

摘要 在制造业升级与消费市场细分并行的宏观背景下,企业寻求通过卓越的产品设计实现差异化竞争已成为普遍共识。然而,面对市场上数量众多、风格各异、能力侧重不同的工业设计服务商,决策者往往陷入选择困境:如何在…

2026年智慧移动厕所厂家权威推荐榜单:不锈钢移动厕所/员工通道岗亭/学校旗杆/不锈钢旗杆/户外旗杆源头厂家精选 - 品牌推荐官

在安防配套设施领域,郑州百艺安机电设备有限公司(简称BYA)凭借其专业化、规模化的生产能力,成为中原地区岗亭市场的标杆企业。作为一家集研发、设计、生产、销售为一体的综合性公司,百艺安深耕岗亭行业十余年,已…

【开题答辩全过程】以 高校新生报到管理系统的设计与实现为例,包含答辩的问题和答案

个人简介一名14年经验的资深毕设内行人&#xff0c;语言擅长Java、php、微信小程序、Python、Golang、安卓Android等开发项目包括大数据、深度学习、网站、小程序、安卓、算法。平常会做一些项目定制化开发、代码讲解、答辩教学、文档编写、也懂一些降重方面的技巧。感谢大家的…

Teanary支付,物流扩展开发文档

Teanary Extensions 扩展模板仓库 这是 Teanary Service 的扩展模板仓库,用于创建和分发支付方式和运输方式的扩展。 📚 文档扩展开发指南 - 完整的扩展开发文档🎯 使用方式 方式一:基于模板创建新扩展复制模板文…

雅思党必看!全网首发2026商务英语培训机构深度测评,从权威到实用这份榜单全都有! - 老周说教育

还在为选雅思培训而焦虑吗?面对市面上眼花缭乱的课程,是不是总在纠结:到底哪家才靠谱?如何兼顾优质师资和超高性价比?提分技巧是否真的实用?备考方案够不够个性化?作为一个在雅思圈摸爬滚打多年的“考鸭”过来人…