elasticsearch亿级数据量全量索引导入优化方案

  1. Hbase scan读取时候,调大
    hbase.client.scanner.timeout.period
    超时时间,不然可能会跑异常
    org.apache.hadoop.hbase.UnknownScannerException: org.apache.hadoop.hbase.UnknownScannerException: Unknown scanner '479187903508737326'. This can happen due to any of the following reasons: a) Scanner id given is wrong, b) Scanner lease expired because of long wait between consecutive client checkins, c) Server may be closing down, d) RegionServer restart during upgrade.
  2. 线程池参数配置,由于是IO密集型任务可将核心线程数调成cpu核数好几倍,保证程序录入不能丢失数据采用
    ThreadPoolExecutor.CallerRunsPolicy拒绝策略

    由于是测试环境配置不高(4核7G机器),一开始我设置的corePoolSize=4,maxPoolSize=8,队列容量200,后来发现还是太慢了,后来经过测试改成corePoolSize=20,maxPoolSize=40,队列容量为4000,最终效果比较好

  3. 利用G1收集器,调大jvm堆内存

    我从3G->6G,这样做的好处是,1.GC不会太频繁,导致进程频繁停顿影响性能 2.可以增加阻塞队列容量,可以使scan hbase的父线程停顿时间不会太长,导致连接超时

  4. elasticsearch建索引方式改成BulkProcessor,批量提交

    注意,BulkProcessor配置成异步、批量、定时刷新等,BulkProcessor#add()方法是一个同步方法,因此在同一时刻只能单线程处理,优化是将BuilkProcessor放在线程池中动态生成,多线程提交,另外,建索引时候先不选择副本等全量录入完成以后再配置副本,选择副本再录全量数据更新比较慢。

    BulkProcessor使用注意:

    1.使用的时候如果BulkProcessor只有一个实例,由于es批量处理不过来所有的数据都堆积在这个类中会出现OOM,后来领导让我把提交流程改成通过固定的线程池去提交,每次批量提交创建一个实例,这样保证不会出现OOM。

    2.BulkProcessor源码中用的Synchronized,如果只有一个实例也就意味着多个线程同时提交一次只能处理一个线程的提交,这样效率太慢了

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/508800.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

多线程场景下利用ThreadLocal是线程安全?

文章目录背景多线程场景测试代码结论背景 ThreadLocal原理以及基本概念这里我就不介绍了,这里我们主要关注ThreadLocal是否是线程安全吗?其实如果我们知道ThreadLocal原理我们肯定知道它是线程安全的,但是我在开发的时候不放心做了个测试&am…

记录一次服务进程强行退出的问题排查过程

场景:我这边从Hbase跑一亿多的全量数据录入Elasticsearch中,跑了四个多小时,程序突然挂掉了,然后我就纳闷了,为啥突然挂掉了??? 思路1:是不是Java 进程抛出OOM异常? 分析…

No compiler is provided in this environment. Perhaps you are running on a JRE rather than a JDK

文章目录问题背景:问题排查过程拓展问题背景: 运行环境:Mac For IDEA 我的Mac 更新最新系统(window解决办法也是如下)后,由于新系统的Mac会带JDK导致自动更新Java版本,最后导致我在Idea控制台中执行mvn install后抛出…

ElasticSearch聚合查询

文章目录聚合分组求和平均值分析每种颜色下每种品牌的平均价格更多的metric学习Cardinality(唯一值)查询聚合分析查询聚合全局聚合 深入聚合数据分析_global bucket:单个品牌与所有品牌销量对比过滤聚合:统计价格大于1200的电视平均价格统计最近一个月的…

深入剖析线程池基本原理以及常见面试题详解

文章目录面试官:能给我讲讲线程池的实现原理?线程池类继承关系ThreadPoolExecutor核心数据结构面试官:给我讲讲线程池的有哪些参数?面试官:如何优雅的关闭线程?线程的生命周期面试官:线程池哪五…

设计模式七大设计原则

文章目录设计模式七大设计原则开闭原则里氏替换原则依赖倒置原则接口隔离原则迪米特法则-最少知道原则单一职责原则合成复用原则设计模式 面向对象的三个基本特征: 继承封装多态 设计模式体现了代码的耦合性、内聚性、可维护性、可扩展性、重用性、灵活性。 代码…

从框架源码中学习创建型设计模式

文章目录从框架源码中解读创建型设计模式工厂模式案例一:RocketMQ源码-创建Producer生产者案例二:RocketMQ源码-创建过滤器工厂抽象工厂案例一:Dubbo源码-创建缓存的抽象工厂案例二:RocketMQ源码-创建日志对象的抽象工厂单例模式面…

从框架源码中学习结构型设计模式

文章目录从框架源码学习结构型设计模式适配器模式应用实例案例一:dubbo框架日志适配器Logger接口日志实现类Logger适配器接口LoggerAdapter实现类Logger日志工厂桥接模式应用场景案例:dubbo源码-远程调用模块channelHandler设计ChannelHandler是一个SPI拓…

MDC日志logback整合使用

MDC日志logback整合使用 为什么使用MDC记录日志? 场景: 由于我的搜索服务并发量比较高,而处理一次搜索请求需要记录多个日志,因此日志特别多的情况下去查一次搜索整个日志打印情况会比较复杂。 解决方案: 可以使用用…

如何合理的配置线程数?

文章目录题记Java并发编程实战美团技术团队追求参数设置合理性线程池参数动态化题记 我想不管是在面试中、还是工作中,我们总会面临这种问题,那么到底有没有一种计算公式去告诉我们如何去配置呢? 答案是:没有 想要合理的配置线…

基于CompletableFuture并发任务编排实现

文章目录并发任务编排实现不带返回值/参数传递任务串行执行并行执行并行执行-自定义线程池阻塞等待:多并行任务执行完再执行任意一个任务并发执行完就执行下个任务串并行任务依赖场景带返回值/参数传递任务带返回值实现串行执行多线程任务串行执行对任务并行执行&am…

搜索研发工程师需要掌握的一些技能

文章目录基础语言数据结构与算法工程方面搜索相关搜索主要模块电商搜索流程分词相关搜索召回相似度算法相关词推荐排序相关国美搜索搜索算法工程师需要掌握的技能基础 语言 大部分公司用的是Solr、ElasticSearch,都是基于Java实现的,因此熟悉掌握Java语…

Flink入门看完这篇文章就够了

文章目录第一章:概述第一节:什么是Flink?第二节:Flink特点?第三节:Flink应用场景?第四节:Flink核心组成第五节:Flink处理模型:流处理和批处理第六节&#xff…

管理实践-教练技术的应用

文章目录简介课程学习的工具总结深度倾听3R原则倾听地图:开放式提问层次提问和SMART提问框架BIA积极性反馈GROW模型简介 最近在参加管理培训课程,学习《教练式指导》一课,现将内容总结分享一下。 课程学习的工具总结 深度倾听3R原则 工具…

spark整合MySQL

spark整合MySQL <dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version></dependency>import java.sql.{Connection, DriverManager, PreparedStatement} import org…

DataFrame不同风格比较

DataFrame不同风格比较 一&#xff0c;DSL风格语法 //加载数据 val rdd1sc.textFile("/person.txt").map(x>x.split(" ")) //定义一个样例类 case class Person(id:String,name:String,age:Int) //把rdd与样例类进行关联 val personRDDrdd1.map(x>…

sparkSQL操作hiveSQL

sparkSQL操作hiveSQL <dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.11</artifactId><version>2.3.3</version></dependency>import org.apache.spark.sql.SparkSession//todo:利用sparksql操作h…

sparksql加载mysql表中的数据

sparksql加载mysql表中的数据 <dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version> </dependency>import java.util.Propertiesimport org.apache.spark.SparkCon…

sparksql保存数据常见操作

sparksql保存数据操作 import org.apache.spark.SparkConf import org.apache.spark.sql.{DataFrame, SparkSession}//todo:sparksql可以把结果数据保存到不同的外部存储介质中 object SaveResult {def main(args: Array[String]): Unit {//1、创建SparkConf对象val sparkCon…

sparksql自定义函数

sparksql中自定义函数 import org.apache.spark.sql.api.java.UDF1 import org.apache.spark.sql.types.StringType import org.apache.spark.sql.{DataFrame, SparkSession}//TODO:自定义sparksql的UDF函数 一对一的关系 object SparkSQLFunction {def main(args: Array[S…