Apache Doris 整合 FLINK CDC 、Paimon 构建实时湖仓一体的联邦查询入门

1.概览

多源数据目录(Multi-Catalog)功能,旨在能够更方便对接外部数据目录,以增强Doris的数据湖分析和联邦数据查询能力。

在之前的 Doris 版本中,用户数据只有两个层级:Database 和 Table。当我们需要连接一个外部数据目录时,我们只能在Database 或 Table 层级进行对接。比如通过 create external table 的方式创建一个外部数据目录中的表的映射,或通过 create external database 的方式映射一个外部数据目录中的 Database。如果外部数据目录中的 Database 或 Table 非常多,则需要用户手动进行一一映射,使用体验不佳。

而新的 Multi-Catalog 功能在原有的元数据层级上,新增一层Catalog,构成 Catalog -> Database -> Table 的三层元数据层级。其中,Catalog 可以直接对应到外部数据目录。目前支持的外部数据目录包括:

  1. Apache Hive
  2. Apache Iceberg
  3. Apache Hudi
  4. Elasticsearch
  5. JDBC: 对接数据库访问的标准接口(JDBC)来访问各式数据库的数据。
  6. Apache Paimon(Incubating)

该功能将作为之前外表连接方式(External Table)的补充和增强,帮助用户进行快速的多数据目录联邦查询。

这篇教程将展示如何使用 Flink + paimon + Doris 构建实时湖仓一体的联邦查询分析,Doris 2.0.3 版本提供了 的支持,本文主要展示 Doris 和 paimon 怎么使用,同时本教程整个环境是都基于伪分布式环境搭建,大家按照步骤可以一步步完成。完整体验整个搭建操作的过程。

2. 环境

本教程的演示环境如下:

  1. Apache doris 2.0.2
  2. Hadoop 3.3.3
  3. hive 3.1.3
  4. Fink 1.17.1
  5. Apache paimon 0.5.0
  6. JDK 1.8.0_311

3. 安装

  1. 下载 Flink 1.17.1
    wget https://dlcdn.apache.org/flink/flink-1.17.1/flink-1.17.1-bin-scala_2.12.tgz
    ## 解压安装
    tar zxf flink-1.17.1-bin-scala_2.12.tgz
  2. 下载相关的依赖到 Flink/lib 目录
cp /Users/zhangfeng/hadoop/hadoop-3.3.6/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.3.6.jar ./lib/
wget https://repo1.maven.org/maven2/org/apache/paimon/paimon-flink-1.17/0.5.0-incubating/paimon-flink-1.17-0.5.0-incubating.jar
wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.4.2/flink-sql-connector-mysql-cdc-2.4.2.jar
wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hive-3.1.3_2.12/1.17.1/flink-sql-connector-hive-3.1.3_2.12-1.17.1.jar
  1. 配置并启动 Flink

配置环境变量,修改flink-conf.yaml配置文件

env.java.opts.all: "-Dfile.encoding=UTF-8"
classloader.check-leaked-classloader: false
taskmanager.numberOfTaskSlots: 3
execution.checkpointing.interval: 10s
state.backend: rocksdb
state.checkpoints.dir: hdfs://zhangfeng:9000/flink/myckp
state.savepoints.dir: hdfs://zhangfeng:9000/flink/savepoints
state.backend.incremental: true

启动 Flink

bin/start-cluster.sh
bin/sql-client.sh embedded 
set 'sql-client.execution.result-mode' = 'tableau';

Catalog

Paimon Catalog可以持久化元数据,当前支持两种类型的metastore

  • 文件系统(默认):将元数据和表文件存储在文件系统中。
  • hive:在hive metastore存储元数据,用户可以直接从hive访问表。

文件系统

下面的 Flink SQL 注册并使用一个名为 paimon_catalog 的catalog。元数据和表文件存放在hdfs://localhost:9000/paimon/data下

CREATE CATALOG paimon_catalog WITH (
'type' = 'paimon',
'warehouse' = 'hdfs://localhost:9000/paimon/data'
);show catalogs;

Hive Catalog

我们也可以直接使用 hive metastore 来存储 paimon 元数据。

下面是创建语句

CREATE CATALOG paimon_hive WITH ('type' = 'paimon','metastore' = 'hive','uri' = 'thrift://localhost:9083','hive-conf-dir' = '/Users/zhangfeng/hadoop/apache-hive-3.1.3-bin/conf/', 'warehouse' = 'hdfs://localhost:9000/paimon/hive'
);
show catalogs;

创建 paimon 表

USE CATALOG paimon_hive;
CREATE TABLE test_paimon_01 (userid BIGINT,age INT,address STRING,regiter_dt STRING  ,PRIMARY KEY(userid, regiter_dt) NOT ENFORCED
) PARTITIONED BY (regiter_dt);show tables

4. 同步MySQL 数据到 Paimon表

下面我们演示怎么基于Flink CDC 快速实时同步 MySQL 表的数据到 Paimon表里。

这里首先你的MySQL 数据库要开启 binlog,具体的方法网上很多,这里不在叙述。

MySQL 表:

CREATE DATABASE emp_1;USE emp_1;
CREATE TABLE employees_1 (emp_no      INT             NOT NULL,birth_date  DATE            NOT NULL,first_name  VARCHAR(14)     NOT NULL,last_name   VARCHAR(16)     NOT NULL,gender      ENUM ('M','F')  NOT NULL,    hire_date   DATE            NOT NULL,PRIMARY KEY (emp_no)
);INSERT INTO `employees_1` VALUES  (10055,'1956-06-06','Georgy','Dredge','M','1992-04-27'),
(10056,'1961-09-01','Brendon','Bernini','F','1990-02-01'),
(10057,'1954-05-30','Ebbe','Callaway','F','1992-01-15'),
(10058,'1954-10-01','Berhard','McFarlin','M','1987-04-13'),
(10059,'1953-09-19','Alejandro','McAlpine','F','1991-06-26'),
(10060,'1961-10-15','Breannda','Billingsley','M','1987-11-02'),
(10061,'1962-10-19','Tse','Herber','M','1985-09-17'),
(10062,'1961-11-02','Anoosh','Peyn','M','1991-08-30'),
(10063,'1952-08-06','Gino','Leonhardt','F','1989-04-08'),
(10064,'1959-04-07','Udi','Jansch','M','1985-11-20'),
(10065,'1963-04-14','Satosi','Awdeh','M','1988-05-18'),
(10066,'1952-11-13','Kwee','Schusler','M','1986-02-26'),
(10067,'1953-01-07','Claudi','Stavenow','M','1987-03-04'),
(10068,'1962-11-26','Charlene','Brattka','M','1987-08-07'),
(10069,'1960-09-06','Margareta','Bierman','F','1989-11-05'),
(10070,'1955-08-20','Reuven','Garigliano','M','1985-10-14'),
(10071,'1958-01-21','Hisao','Lipner','M','1987-10-01'),
(10072,'1952-05-15','Hironoby','Sidou','F','1988-07-21'),
(10073,'1954-02-23','Shir','McClurg','M','1991-12-01'),
(10074,'1955-08-28','Mokhtar','Bernatsky','F','1990-08-13'),
(10075,'1960-03-09','Gao','Dolinsky','F','1987-03-19'),
(10076,'1952-06-13','Erez','Ritzmann','F','1985-07-09'),
(10077,'1964-04-18','Mona','Azuma','M','1990-03-02'),
(10078,'1959-12-25','Danel','Mondadori','F','1987-05-26'),
(10079,'1961-10-05','Kshitij','Gils','F','1986-03-27'),
(10080,'1957-12-03','Premal','Baek','M','1985-11-19'),
(10081,'1960-12-17','Zhongwei','Rosen','M','1986-10-30'),
(10082,'1963-09-09','Parviz','Lortz','M','1990-01-03'),
(10083,'1959-07-23','Vishv','Zockler','M','1987-03-31'),
(10084,'1960-05-25','Tuval','Kalloufi','M','1995-12-15');

在Flink sql-client 下创建 MySQL CDC 表:

CREATE TABLE employees_source (database_name STRING METADATA VIRTUAL,table_name STRING METADATA VIRTUAL,emp_no int NOT NULL,birth_date date,first_name STRING,last_name STRING,gender STRING,hire_date date,PRIMARY KEY (`emp_no`) NOT ENFORCED) WITH ('connector' = 'mysql-cdc','hostname' = 'localhost','port' = '3306','username' = 'root','password' = 'zhangfeng','database-name' = 'emp_1','table-name' = 'employees_1');

使用Create table as select 创建Paimon表,并将数据实时同步到Paimon表里:

create table mysql_to_paimon_01 as select * from default_catalog.default_database.employees_source;

查看Job

我们这个时候可以在Flink sql-client 下查询 paimon ,看到 Paimon 表里已经有数据了。

5. Doris On Paimon

Doris 提供了 Paimon 的 catalog 支持,我们可以通过这种方式,通过Doris 快速的去读 Paimon 表的数据,同时也可以通过 catalog 方式将 paimon 表的数据迁移到 Doris 表里

5.1 Doris 整合查询Paimon表

首先我们创建 Paimon catalog,有两种方式:

  1. 一种是基于 Hive metastore service
  2. 一种是基于 HDFS 文件系统
CREATE CATALOG `paimon_hdfs` PROPERTIES ("type" = "paimon","warehouse" = "hdfs://localhost:9000/paimon/hive","hadoop.username" = "hadoop"
);CREATE CATALOG `paimon_hms` PROPERTIES ("type" = "paimon","paimon.catalog.type" = "hms","warehouse" = "hdfs://localhost:9000/paimon/hive","hive.metastore.uris" = "thrift://localhost:9083"
);

创建成功之后我们通过 show catalogs方式可以看到我们创建好的 paimon catalog;

mysql> show catalogs;
+-----------+-------------+----------+-----------+-------------------------+---------------------+------------------------+
| CatalogId | CatalogName | Type     | IsCurrent | CreateTime              | LastUpdateTime      | Comment                |
+-----------+-------------+----------+-----------+-------------------------+---------------------+------------------------+
|   1308010 | hive        | hms      |           | 2023-11-17 09:42:22.872 | 2023-11-17 09:42:46 | NULL                   |
|   1326307 | hudi        | hms      |           | 2023-11-27 11:33:22.231 | 2023-11-27 11:33:35 | NULL                   |
|         0 | internal    | internal |           | UNRECORDED              | NULL                | Doris internal catalog |
|     35689 | jdbc        | jdbc     |           | 2023-11-03 12:52:24.695 | 2023-11-03 12:52:59 | NULL                   |
|     38003 | mysql       | jdbc     |           | 2023-11-07 11:46:40.006 | 2023-11-07 11:46:54 | NULL                   |
|   1329142 | paimon_hdfs | paimon   |           | 2023-11-27 14:06:13.744 | 2023-11-27 14:06:41 |                        |
|   1328144 | paimon_hms  | paimon   | yes       | 2023-11-27 14:00:32.925 | 2023-11-27 14:00:44 | NULL                   |
+-----------+-------------+----------+-----------+-------------------------+---------------------+------------------------+
7 rows in set (0.00 sec)

切换 paimon catalog,通过下面这些操作我们可以看到我们在 paimon 里创建的表

mysql> switch  paimon_hdfs;
Query OK, 0 rows affected (0.00 sec)mysql> show databases;
+----------+
| Database |
+----------+
| default  |
+----------+
1 row in set (0.02 sec)mysql> use default;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -ADatabase changed
mysql> show tables;
+--------------------------+
| Tables_in_default        |
+--------------------------+
| example_tbl_partition_01 |
| example_tbl_unique_01    |
| mysql_to_paimon_01       |
| test_paimon_01           |
+--------------------------+
4 rows in set (0.00 sec)

通过 Doris 查询 Paimon 表

select * from mysql_to_paimon_01;

5.2 将Paimon 表的数据导入到 Doris

我们也可以快速的利用catalog 方式将 paimon 数据迁移到 Doris 里,我们可以使用 CATS方式:

create table doris_paimon_01
PROPERTIES("replication_num" = "1")  as  select * from paimon_hdfs.`default`.mysql_to_paimon_01;

注意:

1. 查询paimon的时候如果报下面的错误:

org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "hdfs"

需要再 hdfs 需要再core-site.xml 文件中加上下面的配置:

<property><name>fs.hdfs.impl</name><value>org.apache.hadoop.hdfs.DistributedFileSystem</value><description>The FileSystem for hdfs: uris.</description>
</property>

6. 总结

是不是使用非常简单,快快体验Doris 湖仓一体,联邦查询的能力,来加速你的数据分析性能

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/184559.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

一键删除方舟编译器缓存文件js、js.map插件ArkCompilerSupport

新手学习鸿蒙开发&#xff0c;发现DevEco Studio编译过种会生成js、js.map&#xff0c;在论坛上看了其它开发者也提了问题但无没解决&#xff0c;写了一个插件大家试下&#xff1a; https://plugins.jetbrains.com/plugin/23192-arkcompilersupport 源码&#xff1a;https://g…

js 如何实现转驼峰处理

目录 1&#xff0c;需求2&#xff0c;实现和原理3&#xff0c;原理1&#xff0c;正则2&#xff0c;替换函数 1&#xff0c;需求 在开发中&#xff0c;有时需要将中划线 -&#xff0c;下划线 _&#xff0c;冒号 : 这些连接符转为驼峰形式。 如果只有一个连接符&#xff0c;处理…

深度学习——Loss汇总

深度学习——Loss汇总 一、IOU Loss二、L1 Loss 一、IOU Loss 公式&#xff1a; 参考资料: 目标检测回归损失函数——IOU、GIOU、DIOU、CIOU、EIOU 二、L1 Loss 公式&#xff1a; 参考资料: PyTorch中的损失函数–L1Loss /L2Loss/SmoothL1Loss

Day72x.算法训练

739. 每日温度 class Solution {public int[] dailyTemperatures(int[] temperatures) {LinkedList<Integer> st new LinkedList<>();st.push(0);int[] res new int[temperatures.length];for (int i 1; i < temperatures.length; i) {while (!st.isEmpty()…

python机器学习——简单神经网络算法回归分析

利用python实现简单的神经网络算法回归分析 2023年亚太杯数学建模C题可以使用这个代码进行分析 import pandas as pd import numpy as np from sklearn.model_selection import train_test_split from sklearn.preprocessing import StandardScaler from tensorflow.keras.mod…

智慧城市大脑,运维无忧!

运维管理软件在智慧城市中发挥着重要的作用&#xff0c;可以提升IT资源管理效率、保障城市运营安全稳定、实现数据可视化与智能分析、优化资源配置与决策支持、促进智慧城市可持续发展。 在智慧城市中&#xff0c;运维管理软件的应用场景非常广泛。以下是其中几个具体应用场景…

Linux 网络配置

Linux 网络配置 #学习目标 1&#xff0c;掌握Linux中网络配置相关的文件。 2&#xff0c;掌握Linux中网络配置的相关参数。 3&#xff0c;掌握常用的网络配置命令。 4&#xff0c;管理Linux的常用网络服务。Linux系统下配置网络有两种方式&#xff1a; 在安装Linux系统的过程…

高并发架构设计方法:面对高并发,怎么对症下药?

Java全能学习面试指南&#xff1a;https://javaxiaobear.cn 我们知道&#xff0c;“高并发”是现在系统架构设计的核心关键词。一个架构师如果设计、开发的系统不支持高并发&#xff0c;那简直不好意思跟同行讨论。但事实上&#xff0c;在架构设计领域&#xff0c;高并发的历史…

DAPP开发【01】知识简介

系列文章目录 系列文章在DAPP开发专栏 文章目录 系列文章目录前言一、公/私钥是什么&#xff1f;二、区块浏览器三、用户和区块链交互1.infura 四、opensea 前言 DAPP&#xff0c;全称为Decentralized Application&#xff0c;即去中心化应用程序。它是一种基于区块链技术构建…

【C++】程序题( STL标准模板库)

&#x1f383;个人专栏&#xff1a; &#x1f42c; 算法设计与分析&#xff1a;算法设计与分析_IT闫的博客-CSDN博客 &#x1f433;Java基础&#xff1a;Java基础_IT闫的博客-CSDN博客 &#x1f40b;c语言&#xff1a;c语言_IT闫的博客-CSDN博客 &#x1f41f;MySQL&#xff1a…

【element-plus使用】el-select自定义样式、下拉框选项过长等问题解决

1、自定义样式 <template><el-select v-model"value" style"width: 150px"><el-option label"选项一" value"option1"></el-option><el-option label"选项二" value"option2"><…

115. 不同的子序列

给你两个字符串 s 和 t &#xff0c;统计并返回在 s 的 子序列 中 t 出现的个数&#xff0c;结果需要对 109 7 取模。 示例 1&#xff1a; 输入&#xff1a;s "rabbbit", t "rabbit" 输出&#xff1a;3 解释&#xff1a; 如下所示, 有 3 种可以从 s 中…

UG\NX二次开发 设置对象上属性的锁定状态UF_ATTR_set_user_attribute_lock_with_title_and_type

文章作者:里海 来源网站:里海NX二次开发3000例专栏 简介 设置对象上属性的锁定状态UF_ATTR_set_user_attribute_lock_with_title_and_type,用于代替旧版函数UF_ATTR_set_locked,使用旧版函数UF_ATTR_set_locked锁定属性请参照这篇文章《UG\NX二次开发 设置对象上属性的锁定…

PyMuPDF---Python处理PDF的宝藏库详解

1、PyMuPDF简介 1.1 介绍 在介绍PyMuPDF之前&#xff0c;先来了解一下MuPDF&#xff0c;从命名形式中就可以看出&#xff0c;PyMuPDF是MuPDF的Python接口形式。 MuPDF MuPDF 是一个轻量级的 PDF、XPS和电子书查看器。MuPDF 由软件库、命令行工具和各种平台的查看器组成。 …

【网络奇缘】- 计算机网络|分层结构|深入学习ISO模型

&#x1f308;个人主页: Aileen_0v0&#x1f525;系列专栏: 一见倾心,再见倾城 --- 计算机网络~&#x1f4ab;个人格言:"没有罗马,那就自己创造罗马~" 回顾链接&#xff1a;http://t.csdnimg.cn/nRRzR 这篇文章是关于深入学习OSI模型七层结构&#xff0c; “书山…

【精选】Spring框架介绍及Spirng各个版本的特性

Spring框架介绍 Spring框架英文全称Spring Framework&#xff0c;是由Spring团队研发的模块化、轻量级开源框架。其主要目的是为了简化项目开发。在项目开发中&#xff0c;可以说没有刻意使用Spring&#xff0c;却处处有着Spring存在。用官网对Spring框架的介绍&#xff1a;Sp…

Spring-事务支持

目录 一、事务概述 二、引入事务场景 三、Spring对事务的支持 Spring实现事务的两种方式 声明式事务之注解实现方式 1.在Spring配置文件中配置事务管理器 2. 在Spring配置文件引入tx命名空间 3. 在Spring配置文件中配置“事务注解驱动器”&#xff0c;通过注解的方式控…

【算法】算法题-20231130

这里写目录标题 一、290. 单词规律二.、存在重复元素 II三、128. 最长连续序列 一、290. 单词规律 简单 给定一种规律 pattern 和一个字符串 s &#xff0c;判断 s 是否遵循相同的规律。 这里的 遵循 指完全匹配&#xff0c;例如&#xff0c; pattern 里的每个字母和字符串 s…

c++ 打怪升级

内联函数 调用时&#xff0c;直接会把代码拷贝到调用处&#xff1b; 函数指针 可以类比数组 //内联函数 调用时直接将代码拷贝过来 inline const string& longerStr(const string &s1,const string & s2){return s1.size()>s2.size() ? s1:s2; }int main(i…

学会XPath,轻松抓取网页数据

一、定义 XPath&#xff08;XML Path Language&#xff09;是一种用于在 XML 文档中定位和选择节点的语言。XPath的选择功能非常强大&#xff0c;可以通过简单的路径选择语法&#xff0c;选取文档中的任意节点或节点集。学会XPath&#xff0c;可以轻松抓取网页数据&#xff0c…