利用canal进行MySQL到ES的数据实时同步

1. 背景

项目中业务数据量比较大,每类业务表都达到千万级别,虽然做了分库分表,每张表数据控制在300W以下,但是效率还是达不到要求,为了提高查询效率,打算使用ES进行数据查询。

2. 同步原理

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送 dump 协议

  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )

  • canal 解析 binary log 对象(原始为 byte 流),转换为json格式

  • Canal 客户端通过 TCP 协议或 MQ 形式监听 Canal 服务端,同步数据到 ES。

优点: 可以完全和业务代码解耦,增量日志订阅。

缺点:实时性不高,订阅mysql日志,DB中数据事务成功后,开始同步至canal。

3. 环境信息

名称版本
MySQL8.0.35
elasticsearch7.17.9
canal1.1.7
jdk1.8

本文以MySQL中的表ibds2.proof_data_history_202311为例进行数据同步实验。

4. 部署配置

本文的的重点是讲述canal的配置与使用,不再详细描述MySQL与ES的部署和配置。

4.1. MySQL

MySQL的部署与配置相对较简单,需要主要的是需要开启binlog,且配置binlog-format 为ROW模式。此处不再做详细描述。

4.2. Elasticsearch

ES的部署与配置也比较简单,此处不再做详细描述。

建议安装同版本的分词器插件。

4.3. canal

版本下载地址:

deployer:

https://download.fastgit.org/alibaba/canal/releases/download/canal-1.1.7/canal.deployer-1.1.7.tar.gz

adapter:

https://download.fastgit.org/alibaba/canal/releases/download/canal-1.1.7/canal.adapter-1.1.7.tar.gz

4.3.1. deployer
4.3.1.1. 配置

canal.deployer-1.1.7.tar.gz上传到/data/canal/Server,并进行解压:

cd /data/canal/Server
tar -zxvf canal.deployer-1.1.7.tar.gz

修改配置文件conf/example/instance.properties,按如下配置即可,主要是修改数据库相关配置:

## mysql serverId , v1.0.26+ will autoGen 不要与MySQL的id重复
canal.instance.mysql.slaveId=10
​
# enable gtid use true/false 是否使用gtid
canal.instance.gtidon=true
​
# position info MySQL数据库信息
canal.instance.master.address=192.168.10.101:3307
​
# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=X87_5w2Anxp3
canal.instance.connectionCharset = UTF-8
​
# table regex 需要订阅binlog的表的过滤正则表达式
canal.instance.filter.regex=.*\\..*
4.3.1.2. 启动

进入bin目录,启动deployer:

cd /data/canal/Server/bin
./startup.sh
4.3.2. adapter

在adapter的配置中,一定要先在ES中创建索引,然后在启动adapter。

4.3.2.1. 配置

canal.adapter-1.1.7.tar.gz上传到/data/canal/Adapter,并进行解压:

cd /data/canal/Adapter
tar -zxvf canal.adapter-1.1.7.tar.gz

修改配置文件conf/application.yml,按如下配置即可,主要是修改canal-server配置、数据源配置和客户端适配器配置:

server:port: 8081
spring:jackson:date-format: yyyy-MM-dd HH:mm:sstime-zone: GMT+8default-property-inclusion: non_null
​
canal.conf:mode: tcp #tcp kafka rocketMQ rabbitMQflatMessage: truezookeeperHosts:syncBatchSize: 1000retries: -1timeout:accessKey:secretKey:consumerProperties:# canal tcp consumercanal.tcp.server.host: 192.168.10.101:11111 # 需要修改canal.tcp.zookeeper.hosts:canal.tcp.batch.size: 500canal.tcp.username:canal.tcp.password:# kafka consumerkafka.bootstrap.servers: 127.0.0.1:9092kafka.enable.auto.commit: falsekafka.auto.commit.interval.ms: 1000kafka.auto.offset.reset: latestkafka.request.timeout.ms: 40000kafka.session.timeout.ms: 30000kafka.isolation.level: read_committedkafka.max.poll.records: 1000# rocketMQ consumerrocketmq.namespace:rocketmq.namesrv.addr: 127.0.0.1:9876rocketmq.batch.size: 1000rocketmq.enable.message.trace: falserocketmq.customized.trace.topic:rocketmq.access.channel:rocketmq.subscribe.filter:# rabbitMQ consumerrabbitmq.host:rabbitmq.virtual.host:rabbitmq.username:rabbitmq.password:rabbitmq.resource.ownerId:
​srcDataSources:defaultDS:url: jdbc:mysql://192.168.10.101:3307/ibds2?useUnicode=true   # 需要修改username: root    # 需要修改password: X87_5w2Anxp3    # 需要修改canalAdapters:- instance: example # canal instance Name or mq topic namegroups:- groupId: g1outerAdapters:- name: logger
#      - name: rdb
#        key: mysql1
#        properties:
#          jdbc.driverClassName: com.mysql.jdbc.Driver
#          jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
#          jdbc.username: root
#          jdbc.password: 121212
#          druid.stat.enable: false
#          druid.stat.slowSqlMillis: 1000
#      - name: rdb
#        key: oracle1
#        properties:
#          jdbc.driverClassName: oracle.jdbc.OracleDriver
#          jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
#          jdbc.username: mytest
#          jdbc.password: m121212
#      - name: rdb
#        key: postgres1
#        properties:
#          jdbc.driverClassName: org.postgresql.Driver
#          jdbc.url: jdbc:postgresql://localhost:5432/postgres
#          jdbc.username: postgres
#          jdbc.password: 121212
#          threads: 1
#          commitSize: 3000
#      - name: hbase
#        properties:
#          hbase.zookeeper.quorum: 127.0.0.1
#          hbase.zookeeper.property.clientPort: 2181
#          zookeeper.znode.parent: /hbase- name: es7   # 需要修改hosts: 192.168.10.101:9300 # 127.0.0.1:9200 for rest mode # 需要修改properties:mode: transport # or rest# security.auth: test:123456 #  only used for rest modecluster.name: my-es   # 需要修改
#      - name: kudu
#        key: kudu
#        properties:
#          kudu.master.address: 127.0.0.1 # ',' split multi address
#      - name: phoenix
#        key: phoenix
#        properties:
#          jdbc.driverClassName: org.apache.phoenix.jdbc.PhoenixDriver
#          jdbc.url: jdbc:phoenix:127.0.0.1:2181:/hbase/db
#          jdbc.username:
#          jdbc.password:

配置重点

  • jdbc:mysql://192.168.10.101:3307/ibds2?useUnicode=true中的数据库名称

  • cluster.name:es集群名称根据自己的实际的配置,我的是my-es

  • -name: es7 这个很重要一会儿要用

添加配置文件canal-adapter/conf/es7/proof_202311.yml,用于配置MySQL中的表与Elasticsearch中索引的映射关系:

dataSourceKey: defaultDS    # 与application.yml中的srcDataSources对应
destination: example    # canal的instance
groupId: g1 # 对应MQ模式下的groupId, 只会同步对应groupId的数据
esMapping:_index: proof_data_history_202311 # 要在es中创建的索引名称_type: _doc_id: id   # 与sql中的id一致upsert: truesql: "SELECTt.id as id,t.biz_id as biz_id,t.biz_type as biz_type,t.encrypt_mode as encrypt_mode,t.sign_user as sign_user,t.tx_hash as tx_hash,t.from_addr as from_addr,t.data as data,t.channel_id as channel_id,t.block_height as block_height,t.tx_time as tx_timeFROMproof_data_history_202311 t"commitBatch: 3000
4.3.2.2. 创建索引

根据MySQL的表结构信息,在ES中创建对应的索引信息。

在ES中创建索引的方法有多种,推荐两种方法:

  • kinbana:

PUT /proof_data_history_202311{"mappings":{"properties":{"id": {"type": "long"},"biz_id": {"type": "text"},"biz_type": {"type": "long"},"encrypt_mode": {"type": "long"},"sign_user": {"type": "text"},"tx_hash": {"type": "text"},"from_addr": {"type": "text"},"data": {"type": "text"},"channel_id": {"type": "text"},"block_height": {"type": "long"},"tx_time": {"type": "date"}}}}
  • 命令行:

curl -XPUT "http://192.168.10.101:9200/proof_data_history_202311" -H 'Content-Type: application/json' -d'
{"mappings":{"properties":{"id": {"type": "long"},"biz_id": {"type": "text"},"biz_type": {"type": "long"},"encrypt_mode": {"type": "long"},"sign_user": {"type": "text"},"tx_hash": {"type": "text"},"from_addr": {"type": "text"},"data": {"type": "text"},"channel_id": {"type": "text"},"block_height": {"type": "long"},"tx_time": {"type": "date"}}}}'
4.3.2.3. 启动adapter
cd /data/canal/Adapter/bin
./startup.sh
4.3.3. 数据初始化

如果mysql中有数据就需要调用一次全量同步,如果mysql没数据,或者数据没用,就不需要调用此步骤。

canal-adapter提供一个REST接口可全量同步数据到ES:

# 全量同步proof_data_history_202311表数据到es的proof_data_history_202311索引中,路径中的es7是上一步的文件夹名
curl  -X POST http://192.168.10.101:8081/etl/es7/proof_202311.yml

以120万行实验数据为例,分别进行ES中没有数据和已有数据的两种不同情况下的全量同步实验。

ES中没有数据,同步时间86秒:

2023年 11月 28日 星期二 09:17:55 CST开始执行全量同步
{"succeeded":true,"resultMessage":"导入ES 数据:1200013 条"}
2023年 11月 28日 星期二 09:19:21 CST完成执行全量同步

ES中已有数据,同步时间87秒:

2023年 11月 28日 星期二 09:27:46 CST开始执行全量同步
{"succeeded":true,"resultMessage":"导入ES 数据:1200013 条"}
2023年 11月 28日 星期二 09:29:03 CST完成执行全量同步

5. 补充资料

在实际生产中,server和adapter往往部署在不同的网络区域,这里就涉及到网络安全访问策略的问题,下面梳理出了需要的网络安全访问策略。

源端目标端
Adapter端IP数据源端MySQL地址和端口
Adapter端IP数据目的地ES地址和端口(9200、9300)
Adapter端IPServer端地址和11110、11111、11112端口
Server端IP数据源端MySQL地址和端口

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/182243.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

csapp-linklab之第二阶段“输出学号”实验报告

本阶段主题是链接中的“重定位”。两次重定位,一次是绝对地址重定位,一次是PC相对地址重定位。 本题目标依旧是输出学号,反汇编phase2.o,看到学号“0000000000”已经存放在只读数据区了。现在任务就是改do_pheas的指令和重定位表…

题目标题:乐乐摘苹果(杨鼎强)

一天乐乐去果园玩,发现一棵苹果树上结出10个苹果。乐乐跑去摘苹果。乐乐找到一个30厘米高的板凳,当她不能直接用手摘到苹果的时候,就会踩到板凳上再试试。 现在已知10个苹果到地面的高度,以及乐乐把手伸直的时候能够达到的最大高度…

Ubuntu系统Springboot项目Nginx安装(编译安装方式)

1.下载 nginx官网下载 Index of /download/ 2.解压 这里我下载的1.25.3版本,系统是ubuntu 解压 tar -zxvf nginx-1.25.3.tar.gz 3.编译安装 安装前需要执行安装一些系统依赖 3.1安装PCRE库 ubuntu:执行以下命令 sudo apt-get install libpcre…

uniapp2023年微信小程序头像+昵称分别获取

1、DOM <view class"m-user"><view class"user-info"><!--头像 GO--><button class"avatar avatar-wrapper" open-type"chooseAvatar" chooseavatar"onChooseAvatar"slot"right"><im…

c++实现程序单例运行的两种方式

第一种使用互斥体 // 使用互斥体保证单体运行 BOOL IsAlreadyRun() {HANDLE hMutex NULL;hMutex CreateMutex(NULL, FALSE, L"MYFLAG");if (hMutex ! NULL){if (ERROR_ALREADY_EXISTS GetLastError()){ReleaseMutex(hMutex);return TRUE;}}return FALSE; } int m…

国家高新技术企业认定,可以提前准备这些!

尽早获取核心自主知识产权 对于高新技术企业认定中有Ⅰ类和Ⅱ类知识产权之分。 其中&#xff0c;企业拥有的专利、植物新品种、国家新药、软件著作权等属于Ⅰ类核心知识产权&#xff0c;是高新技术企业认定的首要和必须条件&#xff0c;在高企认定中也会给申请人带来相应较高…

获取国内城市编码API

获取国内城市编码API接口 一、获取国内城市编码接口二、使用步骤1、接口2、请求参数 三、 案例和demo 一、获取国内城市编码接口 一款免费的帮助你获取取国内城市编码的接口 二、使用步骤 1、接口 重要提示:建议使用https协议,当https协议无法使用时再尝试使用http协议 请求…

【Lidar】基于Python的Open3D库可视化点云数据

1 Open3D库介绍 1.1 介绍 Open3D是一个开源的3D数据处理库&#xff0c;发布于2015年&#xff0c;目前已经更新到0.17.0版本。它基于MIT协议开源许可&#xff0c;使用C11实现&#xff0c;并经过高度优化&#xff0c;还通过Python Pybinding提供了前端Python API。 Open3D为开发…

C语言之atoi函数的使用和模拟实现

C语言之atoi函数的使用和模拟实现 1. atoi函数介绍 函数声明如下&#xff1a; int atoi (const char * str);atoi是用来将字符串中第一次出现的数字字符&#xff0c;转为一个整数 跳过空白字符&#xff0c;&#xff08;空白字符包括&#xff1a;空格 ’ ’ &#xff0c;换页…

我叫:基数排序【JAVA】

1.自我介绍 基数排序(radix sort)属于“分配式排序” (distribution sort)&#xff0c;又称“桶子法” (bucket sort)或bin sort,它是通过键值的各个位的值,将要排序的元素分配至某些“桶”中,是‘桶排序’的扩展 2.基本思想 将所有待比较数值统一为同样的数位长度,数位较短的数…

docker start一个容器之后,怎么进入这个容器界面

要进入已经启动的 docker 容器的交互式终端&#xff0c;可以使用 docker exec 命令。这个命令可用于在运行中的容器内部执行命令或脚本&#xff0c;并以交互模式与容器进行交互&#xff0c;具体格式如下&#xff1a; # docker exec -it [container_id or container_name] /bin/…

专业的调查问卷平台推荐:提升数据收集与分析效率

无论是学生还是职场人士&#xff0c;想做好一份调查问卷&#xff0c;关键先要确定调查的主题&#xff0c;然后确定调查人群&#xff0c;编辑问题&#xff0c;最后能够尽可能的美化问卷调查的主题。 想要做到这几点&#xff0c;就要要求问卷调查平台: 1、能够帮助你快速制作出一…

【开题报告】基于模糊控制的花卉光照时间控制系统

题 目 基于模糊控制的花卉光照时间控制系统 一、研究目的和意义 用光电传感器检测自然光&#xff0c;根据花卉开花时长&#xff0c;用MATLAB软件&#xff0c;使用模糊控制算法&#xff0c;对测得数据进行分析&#xff0c;得出结论&#xff0c;并传回下位机控制电机运动…

软件测试测试文档的编写和阅读

在软件测试中的流程中&#xff0c;测试文档也是一个重要的流程&#xff0c;所以测试人员也需要学习测试文档的编写和阅读。 一、定义&#xff1a; 测试文档&#xff08;Testing Documentation&#xff09;记录和描述了整个测试流程&#xff0c;它是整个测试活动中非常重要的文…

QT应用示例

一个简单的QT应用示例&#xff1a;创建一个窗口程序。 首先&#xff0c;确保已经安装了Qt开发环境。接下来&#xff0c;按照以下步骤创建一个简单的窗口程序&#xff1a; 1. 打开Qt Creator&#xff0c;点击“新建文件或项目”。 2. 选择“应用程序”&#xff0c;然后点击“下…

年终好价节有什么必买的数码好物?值得入手的数码好物推荐

大家是不是都没听说过好价节&#xff1f;直白点说就是原来的双十二购物狂欢节&#xff0c;只不过换一个说法&#xff0c;不过今年毕竟是第一年换个说法&#xff0c;所以淘宝年终好价节优惠还是值得我们期待的&#xff01;作为年前的最后一波大促&#xff0c;一起来看看有哪些好…

QML通用属性 pyside6

在 QML 中&#xff0c;几乎所有组件都继承自 Item 类型&#xff0c;因此它们共享一些通用的属性。 QML 组件通用属性 位置和尺寸 x 和 y: 组件在其父元素中的位置坐标 Item {x: 100y: 100 }width 和 height: 组件的宽度和高度 Item {width: 200height: 100 }z: 组件在 Z 轴…

SQL server界面操作链接服务器

1.打开链接服务器&#xff0c;右击连接服务器“新建链接服务器” 2.输入链接服务器名称和数据源 3.安全性中输入密码建立远程连接&#xff0c;点击确定&#xff1a; 4.打开新建的连接服务器&#xff0c;测试连接&#xff1a; 注意:链接服务器必须在局域网执行&#xff0c;不是同…

【Openstack Train安装】十、Neutron安装

Neutron&#xff0c;是Openstack中的一大核心组件&#xff0c;设计目标是实现“网络即服务&#xff08;Networking as a Service&#xff09;”。为了达到这一目标&#xff0c;在设计上遵循了基于 SDN 实现网络虚拟化的原则&#xff0c;在实现上充分利用了 Linux 系统上的各种网…

Python简单线性回归算法实现及应用示例

简单线性回归&#xff0c;是一种使用单个特征预测响应的方法。 它是机器学习爱好者了解的最基本的机器学习模型之一。 在线性回归中&#xff0c;我们假设两个变量&#xff0c;即因变量和自变量是线性相关的。 因此&#xff0c;我们尝试找到一个线性函数&#xff0c;作为特征或自…