FlinkCDC实战:将 MySQL 数据同步至 ES

?? 当前需要处理的业务场景:

  • 将订单表和相关联的表(比如: 商品表、子订单表、物流信息表)组织成宽表, 放入到 ES 中, 加速订单数据的查询.

  • 同步数据到 es.

  • 概述

    • 1. 什么是 CDC

    • 2. 什么是 Flink CDC

    • 3. Flink CDC Connectors 和 Flink 的版本映射

  • 实战

    • 1. 宽表查询

      • 1.1 创建 mysql 表

      • 1.2 启动 Flink 集群和 Flink SQL CLI

      • 1.3 在 Flink SQL CLI 中使用 Flink DDL 创建表

      • 1.4 关联订单数据并且将其写入 Elasticsearch 中

      • 1.5 修改 MySQL 中的数据

本文用到的 mysql 版本为 8.0.28,ES 版本为 7.17.3,flink 版本为 1.13.6,flink-sql-connector-mysql-cdc 版本为 2.2.1

flink-sql-connector-elasticsearch 版本为 7_2.11-1.13.2

概述

1. 什么是 CDC

CDC (Change Data Capture) 是 变更数据获取的简称。核心思想是监测并捕获数据库的变动(数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整地记录下来,写入到消息中间件中以供其他服务进行订阅并消费。

2. 什么是 Flink CDC

Flink 社区开发了 flink-cdc-connectors 组件,这个一个可以直接从 MySQL、PostgreSQL等数据库直接读取全量数据增量变更数据的source 组件。

3. Flink CDC Connectors 和 Flink 的版本映射

Flink CDC Conectors Version

Flink Version

1.0.0

1.11.*

1.1.0

1.11.*

1.2.0

1.12.*

1.3.0

1.12.*

1.4.0

1.13.*

2.0.*

1.13.*

实战

1. 宽表查询

在商城项目中,商品、订单、物流的数据往往是存储在 MySQL 中,为了加速查询的效率,可以将数据组织成一张宽表,并实时地把它写到 ElasticSearch 中。

确保要监听的 mysql 服务器开启了 binlog 和对应监听 binlog 的账号有相对应的权限。

1.1 创建 mysql 表
-- MySQL CREATE DATABASE mydb; USE mydb; CREATE TABLE products ( id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, name VARCHAR(255) NOT NULL, description VARCHAR(512) ); ALTER TABLE products AUTO_INCREMENT = 101; ? INSERT INTO products VALUES (default,"scooter","Small 2-wheel scooter"), ? ? ? (default,"car battery","12V car battery"), ? ? ? (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"), ? ? ? (default,"hammer","12oz carpenter's hammer"), ? ? ? (default,"hammer","14oz carpenter's hammer"), ? ? ? (default,"hammer","16oz carpenter's hammer"), ? ? ? (default,"rocks","box of assorted rocks"), ? ? ? (default,"jacket","water resistent black wind breaker"), ? ? ? (default,"spare tire","24 inch spare tire"); ? CREATE TABLE orders ( order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, order_date DATETIME NOT NULL, customer_name VARCHAR(255) NOT NULL, price DECIMAL(10, 5) NOT NULL, product_id INTEGER NOT NULL, order_status BOOLEAN NOT NULL -- Whether order has been placed ) AUTO_INCREMENT = 10001; ? INSERT INTO orders VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false), ? ? ? (default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false), ? ? ? (default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false); ? ? ? CREATE TABLE shipments ( shipment_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, order_id INTEGER NOT NULL, origin VARCHAR(255) NOT NULL, destination VARCHAR(255) NOT NULL, is_arrived BOOLEAN NOT NULL ) AUTO_INCREMENT = 10001; INSERT INTO shipments VALUES (default,10001,'Beijing','Shanghai',false), ? ? ? (default,10002,'Hangzhou','Shanghai',false), ? ? ? (default,10003,'Shanghai','Hangzhou',false);
1.2 启动 Flink 集群和 Flink SQL CLI
# 进入 Flink 目录 cd flink-13.6 ? # 启动 Flink 集群.启动成功的话,可以在 http://ip:8081/ 访问到对应的 Flink Web UI ./bin/start-cluster.sh ? # 启动 Flink SQL CLI ./bin/sql-client.sh
1.3 在 Flink SQL CLI 中使用 Flink DDL 创建表

首先开启checkpoint, 每隔 3s 做一次checkpoint.

-- Flink SQL Flink SQL> SET execution.checkpointing.interval = 3s;

然后,对于数据库中的表productsordersshipments,使用 Flink SQL CLI 创建对应的表,用于同步这些底层数据库表的数据。

-- Flink SQL Flink SQL> CREATE TABLE products ( ? id INT, ? name STRING, ? description STRING, ? ?PRIMARY KEY (id) NOT ENFORCED ?) WITH ( ? ?'connector' = 'mysql-cdc', ? ?'hostname' = '192.168.110.100', ? ?'port' = '3380', ? ?'username' = 'root', ? ?'password' = 'root', ? ?'database-name' = 'flink_db', ? ?'table-name' = 'products' ?); ? ? Flink SQL> CREATE TABLE orders ( ? order_id INT, ? order_date TIMESTAMP(0), ? customer_name STRING, ? price DECIMAL(10, 5), ? product_id INT, ? order_status BOOLEAN, ? PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( ? 'connector' = 'mysql-cdc', ? ?'hostname' = '192.168.110.100', ? ?'port' = '3380', ? ?'username' = 'root', ? ?'password' = 'root', ? ?'database-name' = 'flink_db', ? 'table-name' = 'orders' ); ? ? Flink SQL> CREATE TABLE shipments ( ? shipment_id INT, ? order_id INT, ? origin STRING, ? destination STRING, ? is_arrived BOOLEAN, ? PRIMARY KEY (shipment_id) NOT ENFORCED ) WITH ( ? 'connector' = 'mysql-cdc', ? ?'hostname' = '192.168.110.100', ? ?'port' = '3380', ? ?'username' = 'root', ? ?'password' = 'root', ? ?'database-name' = 'flink_db', ? 'table-name' = 'shipments' );

最后,创建enriched_orders表,用来将关联后的订单数据写入 Elasticsearch 中

-- Flink SQL Flink SQL> CREATE TABLE enriched_orders ( ? order_id INT, ? order_date TIMESTAMP(0), ? customer_name STRING, ? price DECIMAL(10, 5), ? product_id INT, ? order_status BOOLEAN, ? product_name STRING, ? product_description STRING, ? shipment_id INT, ? origin STRING, ? destination STRING, ? is_arrived BOOLEAN, ? PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( ? ? 'connector' = 'elasticsearch-7', ? ? 'hosts' = 'http://114.132.43.99:9200', ? ? 'index' = 'enriched_orders' );
1.4 关联订单数据并且将其写入 Elasticsearch 中

使用 Flink SQL 将订单表order与商品表products,物流信息表shipments关联,并将关联后的订单信息写入 Elasticsearch 中。

-- Flink SQL Flink SQL> INSERT INTO enriched_orders SELECT o.*, p.name, p.description, s.shipment_id, s.origin, s.destination, s.is_arrived FROM orders AS o LEFT JOIN products AS p ON o.product_id = p.id LEFT JOIN shipments AS s ON o.order_id = s.order_id;

现在可以通过 Kibana 查询包含商品和物流信息的订单数据。

首先访问 http://localhost:5601/app/kibana#/management/kibana/index_pattern 创建 index patternenriched_orders.

然后就可以在 http://localhost:5601/app/kibana#/discover 看到写入的数据了.

1.5 修改 MySQL 中的数据
--MySQL INSERT INTO orders VALUES (default, '2020-07-30 15:22:00', 'Jark', 29.71, 104, false); ? INSERT INTO shipments VALUES (default,10004,'Shanghai','Beijing',false);

每执行一步就刷新一次 Kibana,可以看到 Kibana 中显示的订单数据将实时更新.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/1125395.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

从原理切入,看大模型的未来,非常详细收藏我这一篇就够了

相信大家都接触过大模型,比如 DeepSeek、豆包、ChatGPT 等生成式 AI 应用,当用户输入相关信息后,大模型就会快速输出相应的结果:文字、图片,甚至是视频。这是大家对大模型最常见的认识——效率工具。可当笔者看到25年底…

DBeaver连接本地MySQL、创建数据库表的基础操作

一、连接本地MySQL 1、新建连接 打开DBeaver,点击左上角的文件或者点击箭头所指的连接按钮。新建数据库连接-选择数据库(mysql),点击“下一步”输入服务器地址、端口、用户名、密码(数据库自己选填,不填则连接所有数据库&#xff…

docker网络模式及配置

一、Docker网络模式 docker run 创建docker容器时,可以用-net选项指定容器的网络模式,docker有以下4种网络模式: host 模式,使用-nethost指定。container模式,使用-netcontainer:NAME_or_ID指定。none模式&#xff0…

docker中配置redis

1、常规操作 docker pull redis(默认你的docker中没有redis) 2、查看redis是否拉取成功 docker images redis 3、创建目录,在你的宿主机,(我是在虚机中建的centos7)为了给redis配置文件使用 4、下载redis…

error @achrinzanode-ipc@9.2.5 The engine “node“ is incompatible with this module. 解决node.js版本不兼容问题

目录 很多人运行项目的时候会出现报错信息: 解决方案 然后再次运行就好了: 此时,我点击链接就可以看到了: 很多人运行项目的时候会出现报错信息: 这个错误是因为你的项目中使用了一个模块 achrinza/node-ipc&#…

ERROR 1524 (HY000) Plugin ‘mysql_native_password‘ is not loaded

你遇到的错误是由于 MySQL 版本不再默认支持 mysql_native_password 认证插件导致的。从 MySQL 8.0 开始,默认的认证插件是 caching_sha2_password,而不是 mysql_native_password。 解释: 错误 ERROR 1524 (HY000): Plugin mysql_native_pass…

docker下搭建redis集群

1. 环境准备 准备好Linux系统机器,并安装好docker,阅读这篇文章前请先了解清楚docker的基本知识并且会熟悉运用docker的常用命令。学习docker基础知识可以参考这篇博文 安装好并启动docker后就可以开始搭建redis了 2. docker容器下安装redis 本篇文章…

深度解析:为什么传统操作系统的最小权限原则在智能体世界失灵?

前言 如果我们把“AgentOS”理解为一种为智能体长期运行、持续决策、主动调用资源而设计的操作系统,那么它会暴露出一个根本性矛盾:我们正在用为“被动程序”设计的安全模型,去约束“主动行为体”。 传统操作系统的安全模型,无论是 Unix 时代的“用户-组-其他”,还是后来…

交通仿真软件:Aimsun_(1).Aimsun概述v1

Aimsun概述 1. Aimsun的基本功能 Aimsun是一款功能强大的交通仿真软件,广泛应用于交通规划、交通管理和交通研究等领域。它能够模拟从微观到宏观的交通流,提供详细的交通分析和优化建议。Aimsun的基本功能包括: 1.1 交通网络建模 Aimsun允许用…

ChatGLM2-6B模型推理流程和模型架构详解

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言1 ChatGLM是什么?2 一代GLM 2.1 大模型架构2.2 GLM特点 2 二代GLM:ChatGLM2-6B为例拆解 2.1 ChatGLM2-6B模型推理架构和流程2.2 细节详…

go桌面框架Fyne最全api文档

Fyne 是一个 Go 语言的跨平台 GUI 库 相关命令 //全局安装fyne打包工具 go install fyne.io/fyne/v2/cmd/fynelatest// 引入fyne库 go get fyne.io/fyne/v2latest go mod tidy//以窗口形式启动 go run main.go //以手机模拟器形式启动 go run -tags mobile main.go//打包 //桌面…

基于微信小程序的新能源汽车租赁换电管理系统(毕设源码+文档)

课题说明本课题聚焦基于微信小程序的新能源汽车租赁换电管理系统的设计与实现,核心目标是解决传统新能源汽车租赁及换电服务中流程繁琐、车辆与换电站信息不透明、租赁订单管理混乱、换电预约低效、电池状态监控缺失及服务进度追踪困难等痛点问题。系统深度依托微信…

交通仿真软件:Aimsun_(3).Aimsun基本操作

Aimsun基本操作 创建和配置仿真网络 在Aimsun中,创建和配置仿真网络是仿真过程的第一步。仿真网络是交通仿真的基础,它包括道路网络、交叉口、交通信号、交通流等基本元素。本节将详细介绍如何在Aimsun中创建和配置仿真网络,以及如何导入和导…

代码随想录刷题——二叉树篇(十二)

112. 路径总和 递归法: class Solution{ public:bool sumPath(TreeNode* node,int count){# 如果该节点是叶子节点且count被减到0了,那么就返回trueif(!node->left&&!node->right&&count0) return true;# 如果该节点是叶子节点且c…

代码随想录刷题——二叉树篇(十二)

112. 路径总和 递归法: class Solution{ public:bool sumPath(TreeNode* node,int count){# 如果该节点是叶子节点且count被减到0了,那么就返回trueif(!node->left&&!node->right&&count0) return true;# 如果该节点是叶子节点且c…

eclipse配置Spring

1、从eclipse下载Spring工具 进入 help – install new software… ,如下图: 点击 add ,按以下方式输入: Name : Spring Location : http://dist.springsource.com/release/TOOLS/update/e4.10/ 之后点击 add ,等待…

Go基础之环境搭建

文章目录 1 Go 1.1 简介 1.1.1 定义1.1.2 特点用途 1.2 环境配置 1.2.1 下载安装1.2.2 环境配置 1.2.2.1 添加环境变量1.2.2.2 各个环境变量理解 1.2.3 验证环境变量 1.3 包管理工具 Go Modules 1.3.1 开启使用1.3.2 添加依赖包1.3.3 配置国内包源 1.3.3.1 通过 go env 配置1.…

C#数据库操作系列---SqlSugar完结篇

1. 不同寻常的查询 之前介绍了针对单个表的查询,同样也是相对简单的查询模式。虽然开发完全够用,但是难免会遇到一些特殊的情况。而下面这些方法就是为了解决这些意料之外。 1.1 多表查询 SqlSugar提供了一种特殊的多表查询方案,使用IQueryab…

docker启动redis简单方法

1、拉取redis镜像 docker pull redis2、在本地某个位置创建以下内容 建议将以下内容放在一起,方便以后管理和查看 # 以/docker/redis为例 mkdir -p /docker/redis mkdir -p /docker/redis/data touch /docker/redis/redis.conf touch /docker/redis/redis.bash3、…

DVWA靶场通关——SQL Injection篇

一,Low难度下unionget字符串select****注入 1,首先手工注入判断是否存在SQL注入漏洞,输入1这是正常回显的结果,再键入1’ You have an error in your SQL syntax; check the manual that corresponds to your MySQL server versio…