seatunnel配置mysql2hive

SeaTunnel安装教程

# ====执行流程
# 下载,解压
# https://mirrors.aliyun.com/apache/seatunnel/2.3.8/?spm=a2c6h.25603864.0.0.2e2d3f665eBj1E
# https://blog.csdn.net/taogumo/article/details/143608532
tar -zxvf apache-seatunnel-2.3.8-bin.tar.gz -C /opt/module/ 
# 改名
mv apache-seatunnel-2.3.8 seatunnel
# 导入连接器 /seatunnel/connectors/
# 链接: https://pan.baidu.com/s/1Q4lTMtiBWlP5-3epmCC6jw?pwd=ejkx 提取码: ejkx 
mysql hive hdoop
# 测试,可以正常执行,说明安装成功
cd /opt/module/seatunnel/ 
./bin/seatunnel.sh 
--config ./config/v2.batch.config.template 
-m local

模拟数据到hive-fake2hive

编辑测试脚本fake2hive.config ,source为模拟数据,sink配置hive

env {parallelism = 1job.mode = "BATCH"job.name = "HiveSinkExample"
}
source {FakeSource {  # 示例数据源schema = {fields {id = intname = stringscore = double}}rows = [{ kind = INSERT, fields = [1, "Alice", 90.5] },{ kind = INSERT, fields = [2, "Bob", 85.0] },{ kind = INSERT, fields = [3, "Charlie", 92.0] }]}
}
sink {Hive {table_name = "default.test_hive_sink"metastore_uri = "thrift://hadoop1:9083"hdfs_site_path = "/opt/module/hadoop/etc/hadoop/hdfs-site.xml"hive_site_path = "/opt/module/hive/conf/hive-site.xml"save_mode = "append"file_format = "text"                  # 必须与Hive表存储格式一致}
}

配置hive连接,并启动同步脚本

# 上传对应连接器
connector-hive-2.3.8.jar
connector-file-hadoop-2.3.8.jar
# 将hive和hadoop的相关依赖包复制到seatunnel的lib下(本地集群hive为3.1.3版本,hadoop为3.3.4,spark为3.3.1)
cp /opt/module/hive/lib/hive-metastore-3.1.3.jar /opt/module/seatunnel/lib/
cp /opt/module/hive/lib/hive-exec-3.1.3.jar /opt/module/seatunnel/lib/
cp /opt/module/hive/lib/libfb303-0.9.3.jar /opt/module/seatunnel/lib/
cp $HADOOP_HOME/share/hadoop/common/*.jar /opt/module/seatunnel/lib/
cp $HADOOP_HOME/share/hadoop/hdfs/*.jar /opt/module/seatunnel/lib/
# 先启动metastore服务,前后台启动命令
hive --service metastore
nohup hive --service metastore > metastore.log 2>&1 &
# 在hive cli中执行建表语句,创建测试表,配置中设置了自动建表但没生效
CREATE TABLE IF NOT EXISTS default.test_hive_sink (id INT,name STRING,score DOUBLE
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','  
STORED AS TEXTFILE;  
# 执行数据同步命令
cd /opt/module/seatunnel/ 
./bin/seatunnel.sh 
--config ./config/fake2hive.config 
-m local #如果去掉,需要单独配置spark或flink分布式引擎
# 验证数据
hive --database default -e "SELECT * FROM test_hive_sink;"

mysql2console

创建表、导入数据,dbeaver可以直接从数据库1导入数据库2。也可以不用创建表,直接将表及数据从数据库1导入数据库2.

创建配置文件,主要是source的设置

# Defining the runtime environment
env {parallelism = 4job.mode = "BATCH"job.name = "MysqlExample"
}
source{Jdbc {url = "jdbc:mysql://hadoop1:3306/finance?serverTimezone=GMT%2b8&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"driver = "com.mysql.cj.jdbc.Driver"connection_check_timeout_sec = 100user = "root"password = "xx"query = "select * from index_def limit 16"}
}
sink {Console {}
}

执行

# 导入mysql引擎到seatunnel的plugin文件下
# /opt/module/seatunnel/plugins
mysql-connector-j-8.0.31.jar
# 启动,配置的source的前面要用Jdbc,MYSQL报错
cd /opt/module/seatunnel/ 
./bin/seatunnel.sh 
--config ./config/mysql2console.config
-m local

mysql2hive

在hive中创建要同步的表

先创建数据库,CREATE DATABASE IF NOT EXISTS finance;

编辑配置脚本mysql2hive

env {parallelism = 1job.mode = "BATCH"job.name = "HiveSinkExample"
}
source{Jdbc {url = "jdbc:mysql://hadoop1:3306/finance?serverTimezone=GMT%2b8&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"driver = "com.mysql.cj.jdbc.Driver"connection_check_timeout_sec = 100user = "root"password = "xx"query = "select * from index_def"}
}
sink {Hive {table_name = "finace.index_def"metastore_uri = "thrift://hadoop1:9083"hdfs_site_path = "/opt/module/hadoop/etc/hadoop/hdfs-site.xml"hive_site_path = "/opt/module/hive/conf/hive-site.xml"save_mode = "append"file_format = "text"                  # 必须与Hive表存储格式一致}
}

启动

cd /opt/module/seatunnel/ 
./bin/seatunnel.sh 
--config ./config/mysql2hive.config
-m local

同步多张表

env {parallelism = 1job.mode = "BATCH"job.name = "HiveSinkExample"
}
source{Jdbc {name = "source1"url = "jdbc:mysql://hadoop1:3306/finance?serverTimezone=GMT%2b8&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"driver = "com.mysql.cj.jdbc.Driver"connection_check_timeout_sec = 100user = "root"password = "xx"query = "select * from index_def1"result_table_name = "index_def1_result"}Jdbc {name = "source2"url = "jdbc:mysql://hadoop1:3306/finance?serverTimezone=GMT%2b8&useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"driver = "com.mysql.cj.jdbc.Driver"connection_check_timeout_sec = 100user = "root"password = "xx"query = "select * from index_def2"result_table_name = "index_def2_result"}    
}
sink {Hive {name = "sink1"table_name = "finace.index_def1"metastore_uri = "thrift://hadoop1:9083"hdfs_site_path = "/opt/module/hadoop/etc/hadoop/hdfs-site.xml"hive_site_path = "/opt/module/hive/conf/hive-site.xml"save_mode = "append"file_format = "text"                 source_table_name = "index_def1_result" }Hive {name = "sink2"table_name = "finace.index_def2"metastore_uri = "thrift://hadoop1:9083"hdfs_site_path = "/opt/module/hadoop/etc/hadoop/hdfs-site.xml"hive_site_path = "/opt/module/hive/conf/hive-site.xml"save_mode = "append"file_format = "text"        source_table_name = "index_def2_result" 
}
}

启动

cd /opt/module/seatunnel/ 
./bin/seatunnel.sh 
--config ./config/n2hive.config
-m local

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/74758.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SSH项目负载均衡中的Session一致性解决方案‌

SSH项目负载均衡中的Session一致性解决方案‌ 1. 粘性会话(Session Sticky)‌2. Session复制(集群同步)‌3. 集中式Session存储‌4. 客户端存储(Cookie加密)‌方案选型建议‌注意事项‌ 1. 粘性会话&#x…

MySQL 表连接(内连接与外连接)

🏝️专栏:Mysql_猫咪-9527的博客-CSDN博客 🌅主页:猫咪-9527-CSDN博客 “欲穷千里目,更上一层楼。会当凌绝顶,一览众山小。” 目录 1、表连接的核心概念 1.1 为什么需要表连接? 2、内连接&a…

解锁Spring Boot异步编程:让你的应用“飞“起来!

引言:从点餐说起 🍔 想象你在快餐店点餐: 同步模式:排队等餐,队伍越来越长(就像卡死的服务器)异步模式:拿号后去旁边坐着等(服务员喊号通知你) 今天我们就…

做一个有天有地的css及html画的旋转阴阳鱼

<!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>天地阴阳</title><style>/* 重置默认样…

ngx_http_core_main_conf_t

定义在 src\http\ngx_http_core_module.h typedef struct {ngx_array_t servers; /* ngx_http_core_srv_conf_t */ngx_http_phase_engine_t phase_engine;ngx_hash_t headers_in_hash;ngx_hash_t variables_hash;…

计算机二级(C语言)考试高频考点总汇(二)—— 控制流、函数、数组和指针

目录 六、控制流 七、函数 八、数组和指针 六、控制流 76. if 语句可以&#xff08;嵌套&#xff09;&#xff0c; if 语句可以嵌套在另一个 if 语句内部&#xff0c;形成&#xff08;嵌套的条件判断结构&#xff09;&#xff0c;用于处理更复杂的条件判断逻辑。 77. els…

WebRTC协议全面教程:原理、应用与优化指南

一、WebRTC协议概述 **WebRTC&#xff08;Web Real-Time Communication&#xff09;**是一种开源的实时通信协议&#xff0c;支持浏览器和移动应用直接进行音频、视频及数据传输&#xff0c;无需插件或第三方软件。其核心特性包括&#xff1a; P2P传输&#xff1a;点对点直连…

使用 WSL + Ubuntu + Go + GoLand(VSCode) 开发环境配置指南

1. 安装和配置 WSL 与 Ubuntu 启用 WSL 功能(以管理员身份运行 PowerShell): wsl --install 或手动启用: dism.exe /online /enable-feature /featurename:Microsoft-Windows-Subsystem-Linux /all /norestart dism.exe /online /enable-feature /featurename:VirtualMachi…

element-plus中,Tour 漫游式引导组件的使用

目录 一.Tour 漫游式引导组件的简单介绍 1.作用 2.基本使用 3.展示效果 二.实战1&#xff1a;介绍患者病历表单 1.要求 2.实现步骤 3.展示效果 结语 一.Tour 漫游式引导组件的简单介绍 1.作用 快速了解一个功能/产品。 2.基本使用 从官网复制如下代码&#xff1a; &…

39-Ajax工作原理

1. 简明定义开场 “AJAX(Asynchronous JavaScript and XML)是一种允许网页在不重新加载整个页面的情况下&#xff0c;与服务器交换数据并更新部分网页内容的技术。它通过JavaScript的XMLHttpRequest对象或现代的Fetch API实现异步通信。” 2. 核心工作原理 "AJAX的工作…

Python 爬虫案例

以下是一些常见的 Python 爬虫案例&#xff0c;涵盖了不同的应用场景和技术点&#xff1a; 1. 简单网页内容爬取 案例&#xff1a;爬取网页标题和简介 import requests from bs4 import BeautifulSoup url "https://www.runoob.com/" response requests.get(url) …

【C++进阶】函数:深度解析 C++ 函数的 12 大进化特性

目录 一、函数基础 1.1 函数定义与声明 1.2 函数调用 1.3 引用参数 二、函数重载&#xff1a;同名函数的「多态魔法」&#xff08;C 特有&#xff09; 2.1 基础实现 2.2 重载决议流程图 2.3 与 C 语言的本质区别 2.4 实战陷阱 三、默认参数&#xff1a;接口的「弹性设…

Redis的基础,经典,高级问题解答篇

目录 一&#xff0c;基础 二&#xff0c;经典 缓存雪崩&#xff1a; 1. Redis事务的原子性 2. 与MySQL事务的区别 1. 主从复制原理 2. 哨兵模式故障转移流程 3. 客户端感知故障转移 三&#xff0c;高级 一&#xff0c;基础 Redis的5种基础数据类型及使用场景&#xf…

【蓝桥杯】好数

好数 问题描述代码解释代码 好数 问题描述 一个整数如果按从低位到高位的顺序&#xff0c;奇数位 (个位、百位、万位 ⋯ ) 上的数字是奇数&#xff0c;偶数位 (十位、千位、十万位 ⋯ ) 上的数字是偶数&#xff0c;我们就称之为 “好数”。 给定一个正整数 N&#xff0c;请计算…

利用 Patroni + etcd + HAProxy 搭建高可用 PostgreSQL 集群

在生产环境中&#xff0c;数据库的高可用性是系统稳定运行的关键。本文将详细讲解如何利用 Docker 部署一个由 etcd、Patroni 和 HAProxy 组成的 PostgreSQL 高可用集群&#xff0c;实现自动故障转移和负载均衡。 架构概述 本架构主要包括三部分&#xff1a; etcd 集群 etcd …

bash 和 pip 是两种完全不同用途的命令,分别用于[系统终端操作]和[Python 包管理]

bash 和 pip 是两种完全不同用途的命令&#xff0c;分别用于 系统终端操作 和 Python 包管理。以下是它们的核心区别、用法及常见场景对比&#xff1a; 1. 本质区别 特性bashpip类型Shell 命令解释器&#xff08;一种脚本语言&#xff09;Python 包管理工具作用执行系统命令、…

分布式系统的CAP理论、事务和锁实现

分布式系统核心概念 1. CAP理论 CAP理论指出&#xff0c;分布式系统最多同时满足以下三项中的两项&#xff1a; 一致性&#xff08;CC&#xff09;&#xff1a;所有节点访问同一份最新数据。可用性&#xff08;AA&#xff09;&#xff1a;每个请求都能在合理时间内获得非错误…

鸿蒙UI开发

鸿蒙UI开发 本文旨在分享一些鸿蒙UI布局开发上的一些建议&#xff0c;特别是对屏幕宽高比发生变化时的应对思路和好的实践。 折叠屏适配 一般情况&#xff08;自适应布局/响应式布局&#xff09; 1.自适应布局 1.1自适应拉伸 左右组件定宽 TypeScript //左右定宽 Row() { …

FreeRTOS 五种内存管理算法深度对比分析

FreeRTOS 提供了五种动态内存管理算法&#xff08;heap_1 至 heap_5&#xff09;&#xff0c;针对不同应用场景在实时性、内存效率、碎片控制等方面进行了差异化设计。以下从实现原理、性能指标及适用场景进行全面对比&#xff1a; 一、Heap_1&#xff1a;静态分配优先 ​核心…

基于EFISH-SBC-RK3576的无人机智能飞控与数据存储方案

一、方案背景 民用无人机在电力巡检、农业植保、应急救援等领域快速普及&#xff0c;但传统方案面临‌多协议设备兼容性差‌、‌野外环境数据易丢失‌、‌复杂电磁干扰‌三大痛点。 电鱼智能推出‌EFISH-SBC-RK3576‌&#xff0c;可集成双冗余总线接口与工业级加固存储&#x…