使用 Spring Boot 和 Canal 实现 MySQL 数据库同步

文章目录

  • 前言
  • 一、背景
  • 二、Canal 简介
  • 三、主库数据库配置
      • 1.主库配置
      • 2.创建 Canal 用户并授予权限
  • 四.配置 Canal Server
      • 1.Canal Server 配置文件
      • 2.启动 Canal Server
  • 五.开发 Spring Boot 客户端
      • 1. 引入依赖
      • 2. 配置 Canal 客户端
      • 3. 实现数据同步逻辑
  • 六.启动并测试
  • 七.注意事项
  • 八.总结


前言

在分布式系统中,数据同步是一个常见的需求。例如,我们可能需要将主库的数据实时同步到多个从库,或者将数据从一个数据库集群同步到另一个集群。本篇内容通过一个实际案例,介绍如何使用 Spring Boot 和 Canal 实现 MySQL 数据库之间的数据同步。


一、背景

假设我们有以下数据库架构:

  • 两个主库:db_1 和 db_2。
    每个主库对应两个从库:db_1_bk_1、db_1_bk_2 和 db_2_bk_1、db_2_bk_2。
  • 我们的目标是:
    将 db_1 的数据同步到 db_1_bk_1 和 db_1_bk_2。
    将 db_2 的数据同步到 db_2_bk_1 和 db_2_bk_2。

二、Canal 简介

Canal 是阿里巴巴开源的一款基于 MySQL Binlog 的增量数据订阅与分发工具。它通过模拟 MySQL 的从节点,实时捕获主库的 Binlog 日志,并将数据变更事件推送给下游消费者。Canal 支持多种下游适配器,如 Kafka、RabbitMQ 和直接消费。

三、主库数据库配置

1.主库配置

为了使 Canal 能够正常解析 Binlog 日志,主库需要进行以下配置:

  • 开启 Binlog 日志:确保主库开启了 Binlog 日志,并且设置为 ROW 模式。
  • 配置 server-id:为每个主库设置唯一的 server-id。
  • 创建 Canal 用户并授予权限:创建一个用户供 Canal 使用,并授予必要的权限。

编辑主库的配置文件(my.cnf 或 my.ini),添加以下内容:

[mysqld]
# 开启 Binlog 日志
log-bin=mysql-bin
# 设置 Binlog 格式为 ROW 模式
binlog-format=ROW
# 设置唯一的 server-id
server-id=1

注意:

  • 如果你有多个主库,每个主库的 server-id 必须是唯一的。
  • 修改配置后,需要重启 MySQL 服务以使配置生效。

2.创建 Canal 用户并授予权限

Canal 需要一个具有读取 Binlog 权限的 MySQL 用户。以下是创建用户并授予权限的步骤:

# 登录 MySQL
mysql -u root -p
# 创建用户
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
# 授予权限
GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal';
# 刷新权限
FLUSH PRIVILEGES;

说明:

  • canal 用户需要足够的权限来读取 Binlog 数据,但不需要对数据库进行写操作。
  • 如果你的 MySQL 版本较新(8.x),可能需要使用 ALTER USER 命令来设置密码:
ALTER USER 'canal'@'%' IDENTIFIED BY 'canal';

四.配置 Canal Server

Canal Server 是 Canal 的核心组件,负责连接主库并解析 Binlog 数据。我们需要为每个主库配置一个 Canal 实例。

1.Canal Server 配置文件

在 Canal Server 的配置目录下,创建两个实例配置文件:conf/db_1/instance.properties 和 conf/db_2/instance.properties。
conf/db_1/instance.properties:

# 主库的地址和端口
canal.instance.master.address=db_1_ip:3306
# Canal 连接主库的用户名和密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
# 需要同步的表正则表达式,这里表示同步 db_1 数据库的所有表
canal.instance.filter.regex=db_1\\..*

conf/db_2/instance.properties:

# 主库的地址和端口
canal.instance.master.address=db_2_ip:3306
# Canal 连接主库的用户名和密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
# 需要同步的表正则表达式,这里表示同步 db_2 数据库的所有表
canal.instance.filter.regex=db_2\\..*

2.启动 Canal Server

使用以下命令启动 Canal Server:

nohup sh bin/canal.sh start &

注意:

  • 确保主库的 Binlog 位置和文件名正确。如果不确定,可以通过 SHOW MASTER STATUS; 命令查看。
  • 如果主库已经运行了一段时间,需要指定 Binlog 的起始位置,避免重复同步旧数据。

五.开发 Spring Boot 客户端

Spring Boot 客户端作为 Canal 的消息消费者,负责接收数据变更事件并同步到目标从库。

1. 引入依赖

在 Spring Boot 项目的 pom.xml文件中,引入 Canal 客户端依赖:

<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.8</version>
</dependency>

2. 配置 Canal 客户端

application.yml 文件中,配置 Canal Server 的地址:

canal:server.ip: canal_server_ipserver.port: 11111

3. 实现数据同步逻辑

创建一个 Canal 客户端服务类,用于接收和处理数据变更事件。
CanalClientService.java:

@Service
public class CanalClientService {private final CanalConnector canalConnector;public CanalClientService(@Value("${canal.server.ip}") String canalServerIp, @Value("${canal.server.port}") int canalServerPort) {this.canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalServerIp, canalServerPort), "example", "", "");}@PostConstructpublic void start() {canalConnector.connect();canalConnector.subscribe("db_1..*, db_2..*"); // 订阅 db_1 和 db_2 的所有表new Thread(this::process).start();}private void process() {while (true) {Message message = canalConnector.getWithoutAck(100);long batchId = message.getId();if (batchId == -1 || message.getEntries().isEmpty()) {continue;}for (Entry entry : message.getEntries()) {handleData(entry);}canalConnector.ack(batchId);}}private void handleData(Entry entry) {String schemaName = entry.getHeader().getSchemaName(); // 数据库名String tableName = entry.getHeader().getTableName();  // 表名EventType eventType = entry.getHeader().getEventType(); // 数据变更类型System.out.println("Schema: " + schemaName + ", Table: " + tableName + ", Type: " + eventType);// 根据来源数据库同步到对应的从库if ("db_1".equals(schemaName)) {syncToBackupDbs(entry, "db_1_bk_1", "db_1_bk_2");} else if ("db_2".equals(schemaName)) {syncToBackupDbs(entry, "db_2_bk_1", "db_2_bk_2");}}private void syncToBackupDbs(Entry entry, String... backupDbs) {// 根据事件类型同步到从库if (entry.getHeader().getEventType() == EventType.INSERT) {for (String db : backupDbs) {syncInsert(entry, db);}} else if (entry.getHeader().getEventType() == EventType.UPDATE) {for (String db : backupDbs) {syncUpdate(entry, db);}} else if (entry.getHeader().getEventType() == EventType.DELETE) {for (String db : backupDbs) {syncDelete(entry, db);}}}private void syncInsert(Entry entry, String backupDb) {// 使用 MyBatis 将数据插入到对应的从库System.out.println("INSERT into " + backupDb);}private void syncUpdate(Entry entry, String backupDb) {// 使用 MyBatis 将数据更新到对应的从库System.out.println("UPDATE into " + backupDb);}private void syncDelete(Entry entry, String backupDb) {// 使用 MyBatis 将数据从对应的从库删除System.out.println("DELETE from " + backupDb);}
}

六.启动并测试

  • 启动 Canal Server。
  • 启动 Spring Boot 应用。
  • 在主库 db_1 或 db_2 中插入、更新或删除数据。
  • 观察从库 db_1_bk_1、db_1_bk_2、db_2_bk_1 和 db_2_bk_2 是否同步成功。

七.注意事项

  • 数据一致性:确保从库的数据与主库保持一致。可以通过事务或锁机制来避免冲突。
  • 性能优化:如果数据量较大,建议结合中间件(如 Kafka)进行缓冲和负载均衡。
  • 错误处理:在同步过程中,需要处理网络异常、数据库连接异常等情况。
  • Canal Server 高可用:在生产环境中,建议部署 Canal Server 的集群,以提高系统的可用性。

八.总结

通过 Spring Boot 和 Canal,我们可以实现 MySQL 数据库之间的高效数据同步。Canal 提供了强大的 Binlog 解析能力,而 Spring Boot 则提供了灵活的开发框架,两者结合可以轻松应对复杂的分布式数据同步需求。希望本文对你有所帮助,如果有任何问题,欢迎在评论区留言。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/70326.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux系统配置阿里云yum源,安装docker

配置阿里云yum源 需要保证能够访问阿里云网站 可以先ping一下看看&#xff08;阿里云可能禁ping&#xff0c;只要能够解析为正常的ip地址即可&#xff09; ping mirrors.aliyun.com脚本 #!/bin/bash mkdir /etc/yum.repos.d/bak mv /etc/yum.repos.d/*.repo /etc/yum.repos…

后端开发:开启技术世界的新大门

在互联网的广阔天地中&#xff0c;后端开发宛如一座大厦的基石&#xff0c;虽不直接与用户 “面对面” 交流&#xff0c;却默默地支撑着整个互联网产品的稳定运行。它是服务器端编程的核心领域&#xff0c;负责处理数据、执行业务逻辑以及与数据库和其他后端服务进行交互。在当…

银河麒麟系统安装mysql5.7【亲测可行】

一、安装环境 cpu&#xff1a;I5-10代&#xff1b; 主板&#xff1a;华硕&#xff1b; OS&#xff1a;银河麒麟V10&#xff08;SP1&#xff09;未激活 架构&#xff1a;Linux 5.10.0-9-generic x86_64 GNU/Linux mysql版本&#xff1a;mysql-5.7.34-linux-glibc2.12-x86_64.ta…

从零开始学习PX4源码9(部署px4源码到gitee)

目录 文章目录 目录摘要1.gitee上创建仓库1.1 gitee上创建仓库PX4代码仓库1.2 gitee上创建子仓库2.固件在gitee部署过程2.1下载固件到本地2.2切换本地分支2.3修改.gitmodules内容2.4同步子模块仓库地址2.5同步子模块仓库地址更新(下载)子模块3.一级子模块和二级子模块的映射关…

【回溯算法2】

力扣17.电话号码的字母组合 链接: link 思路 这道题容易想到用嵌套的for循环实现&#xff0c;但是如果输入的数字变多&#xff0c;嵌套的for循环也会变长&#xff0c;所以暴力破解的方法不合适。 可以定义一个map将数字和字母对应&#xff0c;这样就可以获得数字字母的映射了…

科普:“Docker Desktop”和“Docker”以及“WSL”

“Docker Desktop”和“Docker”这两个概念既有紧密联系&#xff0c;又存在一定区别&#xff1a; 一、联系 核心功能同源&#xff1a;Docker Desktop 本质上是基于 Docker 核心技术构建的。Docker 是一个用于开发、部署和运行应用程序的开源平台&#xff0c;它利用容器化技术…

Flutter 网络请求与数据处理:从基础到单例封装

Flutter 网络请求与数据处理&#xff1a;从基础到单例封装 在 Flutter 开发中&#xff0c;网络请求是一个非常常见的需求&#xff0c;比如获取 API 数据、上传文件、处理分页加载等。为了高效地处理网络请求和数据管理&#xff0c;我们需要选择合适的工具并进行合理的封装。 …

虚拟表格实现全解析

在数据展示越来越复杂的今天&#xff0c;大量数据的渲染就像是“满汉全席”——如果把所有菜肴一次性摆上桌&#xff0c;既浪费资源也让人眼花缭乱。幸运的是&#xff0c;我们有两种选择&#xff1a; 自己动手&#xff1a;通过二次封装 Element Plus 的表格组件&#xff0c;实…

QT 读写锁

一、概述 1、读写锁是一种线程同步机制&#xff0c;用于解决多线程环境下的读写竞争问题。 2、读写锁允许多个线程同时获取读锁&#xff08;共享访问&#xff09;&#xff0c;但只允许一个线程获取写锁&#xff08;独占访问&#xff09;。 3、这种机制可以提高并发性能&…

2025 vue3面试题汇总,通俗易懂

一、基础概念与核心特性 1. Vue3 相比 Vue2 的改进&#xff08;通俗版&#xff09; 问题&#xff1a;Vue3 比 Vue2 好在哪&#xff1f; 答案&#xff1a; 更快&#xff1a; Proxy 代理&#xff1a;Vue2 的响应式像“逐个监听保险箱”&#xff08;每个属性单独监听&#xff0…

第5章:在LangChain中如何使用AI Services

这篇文章详细介绍了 LangChain4j 中的 AI Services 概念&#xff0c;展示了如何通过高层次的抽象来简化与大语言模型&#xff08;LLM&#xff09;的交互。AI Services 的核心思想是隐藏底层复杂性&#xff0c;让开发者专注于业务逻辑&#xff0c;同时支持聊天记忆、工具调用和 …

二叉树(数据结构)

二叉树 二叉树也是用过递归定义的结构 先序遍历又称前序遍历 ​​ ​​ 按照先序遍历的方法去手算处理这个二叉树 ​​ 先A B C 再 A B D E C&#xff08;也就是把B换成BDE再放进去&#xff09; 再 A B D E C F 看这个插入的方法要掌握像二叉树这样向一个…

机器学习笔记——常用损失函数

大家好&#xff0c;这里是好评笔记&#xff0c;公主号&#xff1a;Goodnote&#xff0c;专栏文章私信限时Free。本笔记介绍机器学习中常见的损失函数和代价函数&#xff0c;各函数的使用场景。 热门专栏 机器学习 机器学习笔记合集 深度学习 深度学习笔记合集 文章目录 热门…

Wireshark使用介绍

文章目录 Wireshark介绍Wireshark使用工作模式介绍1. 混杂模式&#xff08;Promiscuous Mode&#xff09;2. 普通模式&#xff08;Normal Mode&#xff09;3. 监视模式&#xff08;Monitor Mode&#xff09; 界面分区捕获过滤器语法基本语法逻辑运算符高级语法使用示例捕获过滤…

#渗透测试#批量漏洞挖掘#畅捷通T+SQL注入漏洞

免责声明 本教程仅为合法的教学目的而准备,严禁用于任何形式的违法犯罪活动及其他商业行为,在使用本教程前,您应确保该行为符合当地的法律法规,继续阅读即表示您需自行承担所有操作的后果,如有异议,请立即停止本文章读。 目录 一、漏洞全景解析 1. 高危漏洞案例库 2.…

【小游戏】C++控制台版本俄罗斯轮盘赌

制作团队&#xff1a;洛谷813622&#xff08;Igallta&#xff09; 989571&#xff08;_ayaka_&#xff09; Mod&#xff1a;_ayaka_ 双人模式&#xff1a;Igallta 公告&#xff1a; 原先的9.8改名为 Alpha 1.0&#xff0c;以后每次更新都增加 0.1。 Alpha 1.11 改为 Beta 1…

nvm安装、管理node多版本以及配置环境变量【保姆级教程】

引言 不同的项目运行时可能需要不同的node版本才可以运行&#xff0c;由于来回进行卸载不同版本的node比较麻烦&#xff1b;所以需要使用node工程多版本管理。 本人在配置时&#xff0c;通过网络搜索教程&#xff0c;由于文章时间过老&#xff0c;或者文章的互相拷贝导致配置时…

框架--Mybatis3

一.特殊符号处理 < < > > " &quot; &apos; & &amp; 除了可以使用上述转义字符外&#xff0c;还可以使<![CDATA[ ]]>用来包裹特殊字符。 二.mybatis 一级缓存二级缓存 1.为什么缓存 缓存&#xff1a;数据缓存&#xf…

纯新手教程:用llama.cpp本地部署DeepSeek蒸馏模型

0. 前言 llama.cpp是一个基于纯C/C实现的高性能大语言模型推理引擎&#xff0c;专为优化本地及云端部署而设计。其核心目标在于通过底层硬件加速和量化技术&#xff0c;实现在多样化硬件平台上的高效推理&#xff0c;同时保持低资源占用与易用性。 最近DeepSeek太火了&#x…

Netty入门详解

引言 Netty 是一个基于 Java 的高性能、异步事件驱动的网络应用框架&#xff0c;用于快速开发可维护的高性能网络服务器和客户端。它提供了一组丰富的 API&#xff0c;使得开发人员能够轻松地处理各种网络协议&#xff0c;如 TCP、UDP 等&#xff0c;并且支持多种编解码方式&a…