怎么促成客户做网站wordpress 视频管理 主题
怎么促成客户做网站,wordpress 视频管理 主题,带紫色箭头做网站软件,长沙毕业设计代做网站价格ElasticSearch
1、ElasticSearch学习随笔之基础介绍 2、ElasticSearch学习随笔之简单操作 3、ElasticSearch学习随笔之java api 操作 4、ElasticSearch学习随笔之SpringBoot Starter 操作 5、ElasticSearch学习随笔之嵌套操作 6、ElasticSearch学习随笔之分词算法 7、ElasticS…ElasticSearch
1、ElasticSearch学习随笔之基础介绍 2、ElasticSearch学习随笔之简单操作 3、ElasticSearch学习随笔之java api 操作 4、ElasticSearch学习随笔之SpringBoot Starter 操作 5、ElasticSearch学习随笔之嵌套操作 6、ElasticSearch学习随笔之分词算法 7、ElasticSearch学习随笔之高级检索 8、ELK技术栈介绍 9、Logstash部署与使用 10、ElasticSearch 7.x 版本使用 BulkProcessor 实现批量添加数据 11、ElasticSearch 8.x 弃用了 High Level REST Client移除了 Java Transport Client推荐使用 Elasticsearch Java API
ElasticSearch创始人 Shay Banon谢巴农 文章目录 ElasticSearch前言一引入 pom二创建 ES Client三创建 BulkProcessor四批量推数据 前言
本文主要应用 Rest High Level Client 来进行对 ElasticSearch 进行操作虽说官方已经不推荐但是 ES 升级带来的代价也是相当大的所以此处略去一万字。
那什么是 BulkProcessor 呢 BulkProcessor 是 ElasticSearch 客户端中的一个功能用于批量执行索引、更新或删除操作BulkProcessor 运行将多个操作打包成一个请求进行发送以提高效率和性能。
批量操作索引的好处:
性能优势将多个操作打包成一个请求这样可以减少网络开销提高数据传输效率从而可以加快数据写入索引速度。减少开销较少的网络开销和较少的服务器的交互减少服务器开销尤其是大规模写入数据时。原子性批量操作可以保证一组操作要么全部成功要么全部失败报错数据的一致性。减少开发成本批量操作可以简化客户端代码减少请求和管理连接的操作。
当然批量操作也是有缺点的
内存消耗在执行批量操作时首先会将数据写入内存这样会消耗更多的内存。错误处理复杂性单条数据上传如果出错可以重试或者进行记录操作等但是批量操作中的某个请求失败需要额外来处理比单条操作复杂。延迟响应批量操作可能导致请求排队等待会产生一些延迟。
多余的不说来上代码。
一引入 pom
首先引入客户端依赖我的测试 ES 服务是 8.7.0 版本的这里对应 High Level REST Client 客户端 7.3.2 版本的。
dependencygroupIdorg.elasticsearch.client/groupIdartifactIdelasticsearch-rest-high-level-client/artifactIdversion7.3.2/version
/dependency之所以不用更高版本是因为版本高了会报如下错误
java.io.IOException: Unable to parse response body for Response{requestLinePOST /devintcompany1562219164186/_doc?timeout1m HTTP/1.1, hosthttp://192.168.*。*:9200, responseHTTP/1.1 201 Created}at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1473)at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1424)at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1394)at org.elasticsearch.client.RestHighLevelClient.index(RestHighLevelClient.java:836)at com.example.es.EsTest.addIndex(EsTest.java:97)at com.example.es.EsTest.main(EsTest.java:36)
Caused by: java.lang.NullPointerExceptionat java.util.Objects.requireNonNull(Objects.java:203)at org.elasticsearch.action.DocWriteResponse.init(DocWriteResponse.java:127)at org.elasticsearch.action.index.IndexResponse.init(IndexResponse.java:50)at org.elasticsearch.action.index.IndexResponse.init(IndexResponse.java:39)at org.elasticsearch.action.index.IndexResponse$Builder.build(IndexResponse.java:103)at org.elasticsearch.action.index.IndexResponse.fromXContent(IndexResponse.java:85)at org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:1727)at org.elasticsearch.client.RestHighLevelClient.lambda$performRequestAndParseEntity$8(RestHighLevelClient.java:1395)at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1471)... 5 more亲自测试过的应该还是版本不兼容的缘故但是数据已经插入到 Index 了就很奇怪。
二创建 ES Client
这里初始化客户端需要用户名密码进行认证的。
private static RestHighLevelClient createClient(){String hostname 192.168.*.*;int port 9200;final CredentialsProvider credentialsProvider new BasicCredentialsProvider();credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(your username, your password));RestClientBuilder restClientBuilder RestClient.builder(new HttpHost(hostname, port)).setHttpClientConfigCallback(httpAsyncClientBuilder - httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider));return new RestHighLevelClient(restClientBuilder);
}三创建 BulkProcessor
这里创建 BulkProcessor 批量操作对象通过 High Level REST Client 来绑定加入监听器 BulkProcessor.Listener如果批量操作失败或发生异常在 afterBulk() 方法中处理。 批量处理需要设置的参数代码中已有注释一般就设置这些参数就可以了可根据自己的使用场景进行调节。
public static BulkProcessor getBulkProcessor(RestHighLevelClient client) {BulkProcessor.Listener listener new BulkProcessor.Listener() {Overridepublic void beforeBulk(long executionId, BulkRequest request) {System.out.println(开始执行批量操作ID: executionId);}Overridepublic void afterBulk(long executionId, BulkRequest request, BulkResponse response) {if (response.hasFailures()) {System.out.println(批量操作完成ID: executionId);}}Overridepublic void afterBulk(long executionId, BulkRequest request, Throwable failure) {System.out.println(批量操作失败ID: executionId);failure.printStackTrace();}};BulkProcessor.Builder builder BulkProcessor.builder(((bulkRequest, bulkResponseActionListener) - {bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);bulkRequest.timeout(TimeValue.timeValueSeconds(100));client.bulkAsync(bulkRequest, RequestOptions.DEFAULT, bulkResponseActionListener);}), listener);// 当达到1000个操作时触发批量请求builder.setBulkActions(1000);// 当达到5MB大小时触发批量请求builder.setBulkSize(new ByteSizeValue(1, ByteSizeUnit.MB));// 每5秒触发一次批量请求无论大小和操作数如何builder.setFlushInterval(TimeValue.timeValueSeconds(5));// 设置退避策略以防服务器过载或拒绝请求builder.setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(1000), 3));// 设置并发请求的数量为1即同时只有一个批量请求在执行builder.setConcurrentRequests(1);return builder.build();
}四批量推数据
我们在 main 方法中进行测试代码如下
public static void main(String[] args) throws IOException {RestHighLevelClient client createClient();BulkProcessor bulkProcessor getBulkProcessor(client);for (int i 0; i 10; i) {String source {\ApplianceType\:[{\ApplianceTypeCn\:\国产\,\ApplianceTypeEn\:\Domestic\,\ApplianceTypeId\:\1\}],\ApplicationCount\:0,\ClassICount\:17,\ClassIICount\:1,\ClassIIICount\:0,\Classification\:[{\Cn\:\2002版分类\,\En\:\2002 reg. category of relevant app.\,\Id\:\Class2002\,\Items\:[{\Cn\:\Ⅰ类\,\En\:\Class Ⅰ\,\Id\:\1\,\Id2\:\I\,\Items\:[{\Cn\:\进口第一类医疗器械含第一类体外诊断试剂备案信息\,\En\:\Information on imported ClassⅠmedical devices (including ClassⅠ IVD reagents)\,\Id\:\100\}]},{\Cn\:\Ⅱ类\,\En\:\Class Ⅱ\,\Id\:\2\,\Id2\:\II\,\Items\:[{\Cn\:\妇产科、辅助生殖和避孕器械\,\En\:\Obstetrics and gynecology, assisted reproductive and contraceptive devices\,\Id\:\201818\}]}]},{\Class1Code\:[{\Id\:\02\}],\Class2Code\:[{\Id\:\03\}],\DataType\:[{\Id\:\1\},{\Id\:\3\}],\ProductClassificationCode\:[{\Id\:\09\}],\ProductClassificationNameCode\:[]}],\Company\:{\Cn\:\海南创鑫医药科技发展有限公司\,\En\:\Hainan Chuangxin Pharmaceutical Technology Development Co. Ltd.\,\Id\:\1000002388\},\CompanyAliasCn\:[\海南创鑫医药科技发展有限公司\],\CompanyAliasEn\:[\Hainan Chuangxin Pharmaceutical Technology Development Co. Ltd.\],\CompanyCn\:\海南创鑫医药科技发展有限公司\,\CompanyCnSearch\:\海南创鑫医药科技发展有限公司\,\CompanyEn\:\Hainan Chuangxin Pharmaceutical Technology Development Co. Ltd.\,\CompanyEnSearch\:\Hainan Chuangxin Pharmaceutical Technology Development Co. Ltd.\,\CompanyId\:\1000002388\,\CompanyType\:{\Cn\:\国内公司\,\En\:\Domestic company\,\Id\:\Domestic company\},\CompanyTypeCn\:\国内公司\,\CompanyTypeEn\:\Domestic company\,\CompanyTypeId\:\Domestic company\,\DomesticCount\:18,\EffectiveRegistrationCount\:18,\FirstApplicationYear\:null,\FirstRegistrationYear\:\2017\,\IVD\:\0\,\ImportCount\:0,\LatestApplicationYear\:null,\LatestRegistrationYear\:\2020\,\Listing\:{\Cn\:null,\En\:null,\Id\:null},\ListingCn\:null,\ListingEn\:null,\ListingId\:null,\TotalCount\:18,\company_registration_relation\:{\name\:\company\},\website_url\:\\};bulkProcessor.add(new IndexRequest(devintcompany1562219164186).source(source, XContentType.JSON));System.out.println(添加第 i 条数据);}try {bulkProcessor.awaitClose(10, TimeUnit.MINUTES);client.close();} catch (InterruptedException e) {e.printStackTrace();}System.out.println(添加完成);
}
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/88099.shtml
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!