ElasticSearch 7.x 版本使用 BulkProcessor 实现批量添加数据

ElasticSearch

1、ElasticSearch学习随笔之基础介绍
2、ElasticSearch学习随笔之简单操作
3、ElasticSearch学习随笔之java api 操作
4、ElasticSearch学习随笔之SpringBoot Starter 操作
5、ElasticSearch学习随笔之嵌套操作
6、ElasticSearch学习随笔之分词算法
7、ElasticSearch学习随笔之高级检索
8、ELK技术栈介绍
9、Logstash部署与使用
10、ElasticSearch 7.x 版本使用 BulkProcessor 实现批量添加数据
11、ElasticSearch 8.x 弃用了 High Level REST Client,移除了 Java Transport Client,推荐使用 Elasticsearch Java API

ElasticSearch,创始人 Shay Banon(谢巴农)


文章目录

  • ElasticSearch
  • 前言
  • 一:引入 pom
  • 二:创建 ES Client
  • 三:创建 BulkProcessor
  • 四:批量推数据


前言

本文主要应用 Rest High Level Client 来进行对 ElasticSearch 进行操作,虽说官方已经不推荐,但是 ES 升级带来的代价也是相当大的,所以,此处略去一万字。

  • 那什么是 BulkProcessor 呢?
    BulkProcessorElasticSearch 客户端中的一个功能,用于批量执行索引、更新或删除操作,BulkProcessor 运行将多个操作打包成一个请求进行发送,以提高效率和性能。

批量操作索引的好处:

  • 性能优势:将多个操作打包成一个请求,这样可以减少网络开销,提高数据传输效率,从而可以加快数据写入索引速度。
  • 减少开销:较少的网络开销和较少的服务器的交互,减少服务器开销,尤其是大规模写入数据时。
  • 原子性:批量操作可以保证一组操作要么全部成功,要么全部失败,报错数据的一致性。
  • 减少开发成本:批量操作,可以简化客户端代码,减少请求和管理连接的操作。

当然,批量操作也是有缺点的:

  • 内存消耗:在执行批量操作时,首先会将数据写入内存,这样会消耗更多的内存。
  • 错误处理复杂性:单条数据上传,如果出错可以重试或者进行记录操作等,但是批量操作中的某个请求失败,需要额外来处理,比单条操作复杂。
  • 延迟响应:批量操作可能导致请求排队等待,会产生一些延迟。

多余的不说,来上代码。

一:引入 pom

首先引入客户端依赖,我的测试 ES 服务是 8.7.0 版本的,这里对应 High Level REST Client 客户端 7.3.2 版本的。

<dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-high-level-client</artifactId><version>7.3.2</version>
</dependency>

之所以不用更高版本,是因为版本高了会报如下错误:

java.io.IOException: Unable to parse response body for Response{requestLine=POST /devintcompany@1562219164186/_doc?timeout=1m HTTP/1.1, host=http://192.168.*。*:9200, response=HTTP/1.1 201 Created}at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1473)at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1424)at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1394)at org.elasticsearch.client.RestHighLevelClient.index(RestHighLevelClient.java:836)at com.example.es.EsTest.addIndex(EsTest.java:97)at com.example.es.EsTest.main(EsTest.java:36)
Caused by: java.lang.NullPointerExceptionat java.util.Objects.requireNonNull(Objects.java:203)at org.elasticsearch.action.DocWriteResponse.<init>(DocWriteResponse.java:127)at org.elasticsearch.action.index.IndexResponse.<init>(IndexResponse.java:50)at org.elasticsearch.action.index.IndexResponse.<init>(IndexResponse.java:39)at org.elasticsearch.action.index.IndexResponse$Builder.build(IndexResponse.java:103)at org.elasticsearch.action.index.IndexResponse.fromXContent(IndexResponse.java:85)at org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:1727)at org.elasticsearch.client.RestHighLevelClient.lambda$performRequestAndParseEntity$8(RestHighLevelClient.java:1395)at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1471)... 5 more

亲自测试过的,应该还是版本不兼容的缘故,但是数据已经插入到 Index 了,就很奇怪。

二:创建 ES Client

这里初始化客户端,需要用户名密码进行认证的。

private static RestHighLevelClient createClient(){String hostname = "192.168.*.*";int port = 9200;final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("your username", "your password"));RestClientBuilder restClientBuilder = RestClient.builder(new HttpHost(hostname, port)).setHttpClientConfigCallback(httpAsyncClientBuilder -> httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider));return new RestHighLevelClient(restClientBuilder);
}

三:创建 BulkProcessor

这里创建 BulkProcessor 批量操作对象,通过 High Level REST Client 来绑定,加入监听器 BulkProcessor.Listener,如果批量操作失败或发生异常,在 afterBulk() 方法中处理。
批量处理需要设置的参数代码中已有注释,一般就设置这些参数就可以了,可根据自己的使用场景进行调节。

public static BulkProcessor getBulkProcessor(RestHighLevelClient client) {BulkProcessor.Listener listener = new BulkProcessor.Listener() {@Overridepublic void beforeBulk(long executionId, BulkRequest request) {System.out.println("开始执行批量操作,ID: " + executionId);}@Overridepublic void afterBulk(long executionId, BulkRequest request, BulkResponse response) {if (response.hasFailures()) {System.out.println("批量操作完成,ID: " + executionId);}}@Overridepublic void afterBulk(long executionId, BulkRequest request, Throwable failure) {System.out.println("批量操作失败,ID: " + executionId);failure.printStackTrace();}};BulkProcessor.Builder builder = BulkProcessor.builder(((bulkRequest, bulkResponseActionListener) -> {bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);bulkRequest.timeout(TimeValue.timeValueSeconds(100));client.bulkAsync(bulkRequest, RequestOptions.DEFAULT, bulkResponseActionListener);}), listener);// 当达到1000个操作时触发批量请求builder.setBulkActions(1000);// 当达到5MB大小时触发批量请求builder.setBulkSize(new ByteSizeValue(1, ByteSizeUnit.MB));// 每5秒触发一次批量请求,无论大小和操作数如何builder.setFlushInterval(TimeValue.timeValueSeconds(5));// 设置退避策略,以防服务器过载或拒绝请求builder.setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(1000), 3));// 设置并发请求的数量为1,即同时只有一个批量请求在执行builder.setConcurrentRequests(1);return builder.build();
}

四:批量推数据

我们在 main 方法中进行测试,代码如下:

public static void main(String[] args) throws IOException {RestHighLevelClient client = createClient();BulkProcessor bulkProcessor = getBulkProcessor(client);for (int i = 0; i < 10; i++) {String source = "{\"ApplianceType\":[{\"ApplianceTypeCn\":\"国产\",\"ApplianceTypeEn\":\"Domestic\",\"ApplianceTypeId\":\"1\"}],\"ApplicationCount\":0,\"ClassICount\":17,\"ClassIICount\":1,\"ClassIIICount\":0,\"Classification\":[{\"Cn\":\"2002版分类\",\"En\":\"2002 reg. category of relevant app.\",\"Id\":\"Class2002\",\"Items\":[{\"Cn\":\"Ⅰ类\",\"En\":\"Class Ⅰ\",\"Id\":\"1\",\"Id2\":\"I\",\"Items\":[{\"Cn\":\"进口第一类医疗器械(含第一类体外诊断试剂)备案信息\",\"En\":\"Information on imported ClassⅠmedical devices (including ClassⅠ IVD reagents)\",\"Id\":\"100\"}]},{\"Cn\":\"Ⅱ类\",\"En\":\"Class Ⅱ\",\"Id\":\"2\",\"Id2\":\"II\",\"Items\":[{\"Cn\":\"妇产科、辅助生殖和避孕器械\",\"En\":\"Obstetrics and gynecology, assisted reproductive and contraceptive devices\",\"Id\":\"201818\"}]}]},{\"Class1Code\":[{\"Id\":\"02\"}],\"Class2Code\":[{\"Id\":\"03\"}],\"DataType\":[{\"Id\":\"1\"},{\"Id\":\"3\"}],\"ProductClassificationCode\":[{\"Id\":\"09\"}],\"ProductClassificationNameCode\":[]}],\"Company\":{\"Cn\":\"海南创鑫医药科技发展有限公司\",\"En\":\"Hainan Chuangxin Pharmaceutical Technology Development Co. Ltd.\",\"Id\":\"1000002388\"},\"CompanyAliasCn\":[\"海南创鑫医药科技发展有限公司\"],\"CompanyAliasEn\":[\"Hainan Chuangxin Pharmaceutical Technology Development Co. Ltd.\"],\"CompanyCn\":\"海南创鑫医药科技发展有限公司\",\"CompanyCnSearch\":\"海南创鑫医药科技发展有限公司\",\"CompanyEn\":\"Hainan Chuangxin Pharmaceutical Technology Development Co. Ltd.\",\"CompanyEnSearch\":\"Hainan Chuangxin Pharmaceutical Technology Development Co. Ltd.\",\"CompanyId\":\"1000002388\",\"CompanyType\":{\"Cn\":\"国内公司\",\"En\":\"Domestic company\",\"Id\":\"Domestic company\"},\"CompanyTypeCn\":\"国内公司\",\"CompanyTypeEn\":\"Domestic company\",\"CompanyTypeId\":\"Domestic company\",\"DomesticCount\":18,\"EffectiveRegistrationCount\":18,\"FirstApplicationYear\":null,\"FirstRegistrationYear\":\"2017\",\"IVD\":\"0\",\"ImportCount\":0,\"LatestApplicationYear\":null,\"LatestRegistrationYear\":\"2020\",\"Listing\":{\"Cn\":null,\"En\":null,\"Id\":null},\"ListingCn\":null,\"ListingEn\":null,\"ListingId\":null,\"TotalCount\":18,\"company_registration_relation\":{\"name\":\"company\"},\"website_url\":\"\"}";bulkProcessor.add(new IndexRequest("devintcompany@1562219164186").source(source, XContentType.JSON));System.out.println("添加第 " + i + "条数据!");}try {bulkProcessor.awaitClose(10, TimeUnit.MINUTES);client.close();} catch (InterruptedException e) {e.printStackTrace();}System.out.println("添加完成!");
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/617581.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

使用numpy处理图片——缩放图片

缩放图片是让图片丢失部分像素&#xff0c;从而导致图片失真。一种比较简单的方法就是抽取法。比如如果我们要将照片在宽度上缩小50%&#xff0c;则可以在第二维度上每隔2个像素取一个像素来保存&#xff1b;类似的&#xff0c;如果我们希望在高度上缩小50%&#xff0c;则可以在…

AUTO SEG-LOSS SEARCHING METRIC SURROGATES FOR SEMANTIC SEGMENTATION

AUTO SEG-LOSS: 搜索度量替代语义分割 论文链接&#xff1a;https://arxiv.org/abs/2010.07930 项目链接&#xff1a;https://github.com/fundamentalvision/Auto-Seg-Loss ABSTRACT 设计合适的损失函数是训练深度网络的关键。特别是在语义分割领域&#xff0c;针对不同的场…

腾讯云服务器建站教程——2024更新部署网站教程

使用腾讯云服务器搭建网站全流程&#xff0c;包括轻量应用服务器和云服务器CVM建站教程&#xff0c;轻量可以使用应用镜像一键建站&#xff0c;云服务器CVM可以通过安装宝塔面板的方式来搭建网站&#xff0c;腾讯云服务器网txyfwq.com分享使用腾讯云服务器建站教程&#xff0c;…

【复现】Spider-Flow RCE漏洞(CVE-2024-0195)_16

目录 一.概述 二 .漏洞影响 三.漏洞复现 1. 漏洞一&#xff1a; 四.修复建议&#xff1a; 五. 搜索语法&#xff1a; 六.免责声明 一.概述 Spider Flow 是一个高度灵活可配置的爬虫平台&#xff0c;用户无需编写代码&#xff0c;以流程图的方式&#xff0c;即可实现爬虫…

基于SSM的电脑测评系统(有报告)。Javaee项目。ssm项目。

演示视频&#xff1a; 基于SSM的电脑测评系统&#xff08;有报告&#xff09;。Javaee项目。ssm项目。 项目介绍&#xff1a; 采用M&#xff08;model&#xff09;V&#xff08;view&#xff09;C&#xff08;controller&#xff09;三层体系结构&#xff0c;通过Spring Spri…

Codeforces Round 911 (Div. 2) C. Anji‘s Binary Tree (DFS + 树)

题目 思路: dfs树的每一条到叶子的路径, 并计算路径中需要修改的个数, 在这些个数中取最小值 注意: 本题中的树是以每个结点的左右孩子是什么的形式给出的, 所以可以不用建树, 只需保存每个结点的左右孩子是什么即可。 代码&#xff1a; #include<bits/stdc.h> using…

Docker登录MySQL,密码正确却提示密码错误

当我输入了正确的MySQL密码的时候确提示我密码错误&#xff1a; ERROR 1045 (28000): Access denied for user rootlocalhost (using password: YES) docker run --name mysql_master \ -e MYSQL_ROOT_PASSWORD123123 \ -v /root/mysql_master/data:/var/lib/mysql \ -v /root…

Postgresql常见(花式)操作完全示例

案例说明 将Excel数据导入Postgresql&#xff0c;并实现常见统计&#xff08;数据示例如下&#xff09; 导入Excel数据到数据库 使用Navicat工具连接数据库&#xff0c;使用导入功能可直接导入&#xff0c;此处不做过多介绍&#xff0c;详细操作请看下图&#xff1a; 点击“下…

点云从入门到精通技术详解100篇-基于多传感器融合的智能汽车 环境感知(下)

目录 基于激光雷达点云的目标检测 4.1 点云神经网络检测模型 4.2 点云预处理

SCI一区级 | Matlab实现RIME-CNN-BiLSTM-Mutilhead-Attention多变量多步时序预测

SCI一区级 | Matlab实现RIME-CNN-BiLSTM-Mutilhead-Attention多变量多步时序预测 目录 SCI一区级 | Matlab实现RIME-CNN-BiLSTM-Mutilhead-Attention多变量多步时序预测预测效果基本介绍程序设计参考资料 预测效果 基本介绍 1.Matlab实现RIME-CNN-BiLSTM-Mutilhead-Attention多…

npx和npm有什么区别,包管理器yarn的使用方法,node的版本管理工具nvm使用方法

文章目录 一、npx介绍及使用1、npx 是什么2、npx 会把远端的包下载到本地吗?3、npx 执行完成之后&#xff0c; 下载的包是否会被删除&#xff1f;4、npx和npm的区别 二、yarn介绍及使用1、Yarn是什么&#xff1f;2、Yarn的常见场景&#xff1a;3、Yarn常用命令 三、nvm介绍及使…

vue3基础:单文件组件介绍

介绍 Vue 的单文件组件 (即 *.vue 文件&#xff0c;简称 SFC&#xff0c;全称是single file component) 是一种特殊的文件格式&#xff0c;使我们能够将一个 Vue 组件的模板、逻辑与样式封装在单个文件中。下面是一个单文件组件的示例&#xff1a; <script> export def…

Ubuntu root 远程登录失败

背景&#xff1a;设置了两个系统用户&#xff1a;root、test&#xff1b;test可以登录&#xff0c;可以使用su 命令切换root用户登录成功&#xff1b; 但是直接用root登录&#xff0c;会报错。 查看登录日志的方法&#xff1a; 需要两个远程窗口&#xff0c;在第一个远程窗口…

日志审计系统Agent项目创建——初始化数据库和日志文件(Linux版本)

1、定义和初始化函数&#xff1a; bool Init(std::string ip); 1.1、获取ip地址&#xff0c;这里的ip地址是通过makefile文件直接设定的。 bool XAgent::Init(string ip) {if (ip.empty()){cerr << "Agent::init failed! ip is empty" << endl;return…

力扣|2023华为秋招冲刺

文章目录 第一关&#xff1a;2023 年 7 月面试题挑战第二关&#xff1a;2023 年 6 月面试题挑战第三关&#xff1a;2023 年 5 月面试题挑战 第一关&#xff1a;2023 年 7 月面试题挑战 class Solution { public:void reverseWord(vector<char>& s,int l,int r){for(i…

单片机学习记录(一)

简答题 第1章 1.微处理器、微计算机、CPU、单片机、嵌入式处理器他们之间有何区别&#xff1f; 答&#xff1a;微处理器、CPU都是中央处理器的不同称谓&#xff0c;微处理器芯片本身不是计算机&#xff1b; 单片机、微计算机都是一个完整的计算机系统&#xff0c;单片机是集…

关于Python里xlwings库对Excel表格的操作(三十二)

这篇小笔记主要记录如何【如何使用“Chart类”、“Api类"和“Axes函数”设置坐标轴标题文本内容】。 前面的小笔记已整理成目录&#xff0c;可点链接去目录寻找所需更方便。 【目录部分内容如下】【点击此处可进入目录】 &#xff08;1&#xff09;如何安装导入xlwings库…

建筑钢筋表面为什么加上螺纹呢?

问题描述&#xff1a; 建筑钢筋表面为什么加上螺纹呢&#xff1f; 问题解答&#xff1a; 增加黏附力&#xff1a; 螺纹结构提供更多摩擦力&#xff0c;加强钢筋与混凝土之间的粘附&#xff0c;防止滑动或剥离&#xff0c;提高结构的稳定性和强度。 提高抗拉强度&#xff1a;…

监督学习 - 多层感知机回归(Multilayer Perceptron Regression,MLP Regression)

什么是机器学习 多层感知机回归&#xff08;Multilayer Perceptron Regression&#xff0c;MLP Regression&#xff09;是一种人工神经网络&#xff08;Artificial Neural Network&#xff0c;ANN&#xff09;的形式&#xff0c;用于解决回归问题。多层感知机是一种包含多个层…

《安富莱嵌入式周报》第330期:开源ECU模组,开源USB PD供电SMD回流焊,嵌入式系统开发C代码参考指南,旨在提升C语言编写的源码质量

周报汇总地址&#xff1a;嵌入式周报 - uCOS & uCGUI & emWin & embOS & TouchGFX & ThreadX - 硬汉嵌入式论坛 - Powered by Discuz! 更新一期视频教程 BSP视频教程第29期&#xff1a;J1939协议栈CAN总线专题&#xff0c;源码框架&#xff0c;执行流程和…