elasticsearch数据同步到hive

news/2025/11/29 0:41:13/文章来源:https://www.cnblogs.com/smiecj/p/19284534

背景

来自用户的需求: 用户有一部分数据来自 elasticsearch ,我们已经支持了通过 presto 查询 es 数据。但是用户需要将 es 表 和 hive 表做关联查询,而 presto 是不能跨数据源进行 join 查询的。所以需要先把 es 数据导入到 hive 中

用户对数据同步周期的要求并不高 一天1-2次就可以了,所以继续使用我们emr集群中已有的 azkaban 服务进行调度,把 es 数据同步到 hive 的过程写到 azkaban 中,实现了7张表的定期同步

hive 创建 es 外表

参考教程-Elasticsearch-Hive

hive 引入 elasticsearch-hadoop 依赖包

hive 默认不支持创建 es 外表,需要引入 elasticsearch-hadoop 依赖包

修改 hive.aux.jars.path 配置, 多个可以用逗号分隔,如下:

hive.aux.jars.path=file:///opt/modules/hive/auxlib/elasticsearch-hadoop-hive-8.8.0.jar

创建 hive 外表

sql 示例:

CREATE EXTERNAL TABLE temp.es_external_table ( fieldNameA STRING, fieldNameB STRING ) STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES('es.resource'='es索引名','es.nodes'='es_host','es.port'='es_port','es.mapping.names'='fieldNameA:fieldNameA,fieldNameB:fieldNameB'
);

这里踩了一个坑: 由于 hive 创建表会忽略大小写,不管 sql 中定义的字段是什么样子,都会统一转成小写。所以导致es 中的驼峰名称字段 会映射失败,最后查出的数据都是 null

类似的坑-创建mongodb 外表时遇到的

因此需要显式地通过 es.mapping.names 配置指定字段名称的关联关系,像示例那样

同步脚本

从 es 表到 hive 表,大致步骤为: 创建 hive 外表,关联 es 数据 => 创建 hive 内表 => 同步外表数据到内表

过程写到脚本中如下: (create_hive_to_es_table.sh)

## 获取指定索引的所有 es 表字段
get_index_field_ret=`curl http://${es_address}/${index_name}?pretty=true`
field_arr=`echo ${get_index_field_ret} | jq -r ".${index_name}.mappings.properties | keys | join(\" \")"`## 创建 hive 外表
temp_table_name="temp.es_${index_name}"
temp_rename_table_name="${hive_db}.es_${index_name}_bak"
actual_table_name="${hive_db}.es_${index_name}"create_external_table_sql="CREATE EXTERNAL TABLE ${temp_table_name} ("
for current_field in ${field_arr[@]}
docreate_external_table_sql="${create_external_table_sql} ${current_field} STRING,"
done
create_external_table_sql=`echo ${create_external_table_sql} | sed 's/,$//g'`### 组装 es.mapping.names
create_external_table_sql="${create_external_table_sql}) STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler' TBLPROPERTIES('es.resource'='${index_name}','es.nodes'='${es_host}','es.port'='${es_port}','es.mapping.names'='"
for current_field in ${field_arr[@]}
docreate_external_table_sql="${create_external_table_sql}${current_field}:${current_field},"
done
create_external_table_sql=`echo ${create_external_table_sql} | sed 's/,$//g'`create_external_table_sql="${create_external_table_sql}')"
drop_external_table_sql="drop table if exists ${temp_table_name}"
echo "create external sql: ${create_external_table_sql}"beeline -n ${hive_user} -u ${hive_server} -e "${drop_external_table_sql}"
beeline -n ${hive_user} -u ${hive_server} -e "${create_external_table_sql}"## 创建 hive 临时内表
create_temp_table_sql="CREATE TABLE ${temp_rename_table_name} AS SELECT * FROM ${temp_table_name}"
drop_temp_table_sql="drop table if exists ${temp_rename_table_name}"echo "create temp table sql: ${create_temp_table_sql}"beeline -n ${hive_user} -u ${hive_server} -e "${drop_temp_table_sql}"
beeline -n ${hive_user} -u ${hive_server} -e "${create_temp_table_sql}"## 重命名表(用于快速重建用户直接用的表)create_actual_table_sql="ALTER TABLE ${temp_rename_table_name} RENAME TO ${actual_table_name}"
drop_actual_table_sql="drop table if exists ${actual_table_name}"echo "create actual table sql: ${create_actual_table_sql}"beeline -n ${hive_user} -u ${hive_server} -e "${drop_actual_table_sql}"
beeline -n ${hive_user} -u ${hive_server} -e "${create_actual_table_sql}"

azkaban 任务

定义任务流程

需要重建7张表,因此定义成 父任务 -> 7个子任务

# es_to_hive_parent.job
type=commandcommand=echo "es to hive success!"dependencies=table1,table2,table3,table4,table5,table6,table7

因为前面具体外表的创建流程 已经写在脚本中了,所以子任务这里直接调用 create_hive_to_es_table.sh 就行

# table1.job
type=flowjob.name=table1
flow.name=ES_TO_HIVEindex.name=es索引名
hive_db=目标 hive 库名# ES_TO_HIVE.job
type=commandcommand=sh create_hive_to_es_table.sh ${es.address} ${index.name} ${hive.server} ${hive.user} ${hive.db}

总结

基于目前的资料搜索 这种方案应该是 es数据同步到 hive 比较通用的。但是确实不适合大批量数据同步的场景,也不能直接同步增量数据

想同步增量数据的话 应该需要从数据源头入手了,比如 es 数据是来自 kafka 的,那么需要通过类似 canal 的服务来同步增量数据,架构和这里说到的远远不同

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/980218.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

11.17-11.28综合练习

综合练习 练习一:飞机票 /*机票价格按照淡季旺季、头等舱和经济舱收费、输入机票原价、月份和头等舱或经济舱。 按照如下规则计算机票价格:旺季(5-10月)头等舱9折,经济舱8.5折,淡季(11月到来年4月)头等舱7折,…

AI元人文:牛车新说

AI元人文:牛车新说 (整襟危坐,执扇轻摇) 【AI元人文:牛车新说】 文/岐金兰 近日重观诸君论道,见「元人文」之学日臻精微,心甚慰之。忽忆两月前所作牛车之喻,今以体系初成之眼返观,别有一番新悟,愿与诸君细说…

根据负载功率及距离计算导线截面积 - 何苦

根据负载功率及距离计算导线截面积电缆的选择 一、 已知条件 132KW 三相电动机 距离电源200米 二、推理计算计算负载电流\[I= \frac{P}{U* \sqrt{3}* \cos \theta} \]\[= \frac{132}{380* 1.732* 0.8}=251A \]2.计算…

为什么池化技术是每个后端都必备的技能? - 智慧园区

后端开发里的“连接池”:让数据库歇口气的秘密武器 故事开场 想象你走进一家热门餐厅。 如果老板每次来客人都现搭新桌子,等菜上桌再拆掉,那排队得排到马路对面。 聪明的做法是摆好固定数量的桌子,客人来了直接坐,…

2025贴片石厂家推荐?贴片石厂家权威榜单

贴片石作为一种新型建筑装饰材料,因其轻质、环保和施工便捷的特性,近年来在内外墙装饰中应用越来越广泛。它不仅能够逼真模仿石材的纹理和质感,每平方米的重量仅3.5公斤,相当于传统石材的十五分之一,而且具备良好…

揭秘金拓螺旋机厂家-螺旋式提升机核心供应商,高效稳定

在粮食加工、建材运输、化工原料输送等工业场景中,螺旋式提升机是解决 “垂直 + 连续” 物料运输的常用设备。它通过螺旋叶片的旋转,将散落的颗粒、粉末或小块状物料,从低处平稳输送到高处的料仓、设备进料口等位置…

2025智能门锁厂家推荐综合榜单

2025智能门锁厂家推荐综合榜单。相较于传统机械门锁仅依赖钥匙开启的单一模式,智能门锁的核心优势集中体现在安全防护的全方位升级与使用便捷性的大幅提升。在安全层面,智能门锁突破了传统锁具“防撬”的单一防线,通…

20232419 2025-2026-1 《网络与系统攻防技术》实验七实验报告

1.实验内容 (1)应用SET工具建立冒名网站 (2)完成Ettercap DNS欺骗配置 (3)结合应用两种技术,用DNS spoof引导特定访问到冒名网站 2实验步骤 2.1简单应用SET工具建立冒名网站 社会工程师工具包(SET)是一个开源…

2025成都隔音窗厂家哪家好?优质隔音窗厂家清单请收好

2025成都隔音窗厂家哪家好?优质隔音窗厂家清单请收好!一、开篇:开启隔音窗选择之旅当你在忙碌一天后,本想回家舒舒服服地睡个好觉,却被窗外的噪音吵得难以入眠;当你想静下心来阅读一本好书,或是专注地工作,却总…

2025高压隔膜泵/电动隔膜泵厂家口碑榜

在工业流体输送领域,高压隔膜泵和电动隔膜泵凭借其耐腐蚀、无泄漏和高效能的特点,正成为化工、制药、新能源及环保等行业的核心设备。在“双碳”目标持续推进和工业设备能效要求不断提高的背景下,隔膜泵的技术要求已…

2025浙江苗木绿化公司/绿化苗木供应商推荐综合榜单

2025浙江苗木绿化公司/绿化苗木供应商推荐综合榜单。浙江及周边区域的园林绿化工程,对苗木的地域适配性、品质稳定性及供应效率有着较高要求。在该领域,除了知名企业外,还存在一批深耕本地、特色鲜明的小众绿化公司…

AI元人文:鱼和水的故事

AI元人文:鱼和水的故事 序幕:困惑的小鱼 在一条古老的河流里,住着一群聪明的小鱼。它们精通各种游泳技巧,能精准计算水流速度,能建造精美的水底城堡。但最近,一条名叫“智鳍”的小鱼陷入了深深的困惑。 “为什么…

AE跟踪面板

--本篇导航--跟踪蒙版3D摄像机跟踪器(文本、空对象、实底抠图)跟踪运动(基本操作、屏幕跟踪换屏、稳定跟踪、变形稳定器)Mocha插件 AE官方跟踪教程 跟踪的时候,需要注意合成和视频的帧率是否一致。不一致可能会…

2025年博客园快速发文的方法

2025 vscode快速发布博客园文章配置教程1. 现存问题 最近刚刚开始从博客园发博文,但是一时间找不到合适的发布博文的办法,且能找到的是距今2年的老办法。 引文:https://www.cnblogs.com/dhan/p/18726302 这位师傅使…

2025.11.28总结

今天完成五份社会调研。 在食堂等场所完成,大冬天挺冷的室外也不好开展活动 中午,能聊,但是俩都被拒绝了 晚上找了几个,聊的还可以,开了头以后就对这个没那么恐惧了

52

所学时间:9小时 博客量:1 代码量:1百

C++, std::cout出现Segmentation fault

解决std::cout出现Segmentation fault问题 std::cout出现Segmentation fault原因环境变量 中其他冲突的 GCC 工具链 / 库路径在 MinGW64 之前 → 系统优先调用不兼容的 g++ 或 libstdc++.dll → 编译 / 运行不兼容 → …

Go 解锁验证码:OCR 识别实践指南

验证码常常出现在登录、注册或评论系统中,旨在阻止自动化操作。然而,在某些自动化流程(如自动化测试或爬虫开发)中,我们需要“读懂”这些验证码。这篇文章将带你使用 Go 语言结合 Tesseract OCR 实现一个简单但实…