Canal 解析与 Spring Boot 整合实战

一、Canal 简介

1.1 Canal 是什么?

Canal 是阿里巴巴开源的一款基于 MySQL 数据库增量日志解析(Binlog)中间件,它模拟 MySQL 的从机(Slave)行为,监听 MySQL 主机的二进制日志(Binlog),并解析出数据变更事件(DML 和 DDL),然后将这些事件转发给下游应用,从而实现数据的实时同步和处理。

1.2 Canal 的应用场景

  • 数据同步:将 MySQL 数据库的变更数据实时同步到其他存储系统,如 Redis、Elasticsearch 等。

  • 数据备份:实时备份 MySQL 数据库的变更数据,用于数据恢复或异地备份。

  • 数据一致性:在分布式系统中,保证 MySQL 数据库和缓存(如 Redis)的数据一致性。

  • 业务解耦:通过消息队列(如 Kafka)将数据变更事件传递给下游应用,实现业务系统的解耦。

二、Canal 工作原理

2.1 MySQL 主从复制原理

在介绍 Canal 的工作原理之前,我们先复习一下 MySQL 的主从复制原理。MySQL 主从复制主要涉及以下几个步骤:

  1. 主库记录二进制日志:主库将数据变更操作(如 INSERT、UPDATE、DELETE)记录到二进制日志(Binlog)中。

  2. 从库读取二进制日志:从库连接到主库,请求二进制日志,并将日志内容写入到本地的中继日志(Relay Log)中。

  3. 从库应用中继日志:从库的 SQL 线程读取中继日志,并将日志中的数据变更操作应用到从库的数据库中。

2.2 Canal 模拟从库行为

Canal 模拟了 MySQL 从库的行为,通过以下步骤实现数据的监听和解析:

  1. 连接到 MySQL 主库:Canal 以从库的身份连接到 MySQL 主库,并请求二进制日志。

  2. 解析二进制日志:Canal 解析二进制日志中的数据变更事件(DML 和 DDL),并将其转换为 Canal 自定义的事件格式。

  3. 转发数据变更事件:Canal 将解析后的数据变更事件转发给下游应用,如 Redis、Kafka 等。

2.3 Canal 的核心组件

  • Canal Server:负责与 MySQL 主库建立连接,监听二进制日志,并解析数据变更事件。

  • Canal Client:负责接收 Canal Server 转发的数据变更事件,并进行相应的处理。

  • Canal Filter:用于过滤二进制日志中的数据变更事件,只处理感兴趣的表和字段。

三、Canal 安装与配置

3.1 开启 MySQL 二进制日志

在使用 Canal 之前,需要确保 MySQL 的二进制日志已经开启。可以通过以下命令查看二进制日志是否开启:

SHOW VARIABLES LIKE 'log_bin';

如果 log_bin 的值为 OFF,则需要在 MySQL 配置文件中开启二进制日志:

[mysqld]
log-bin=mysql-bin

3.2 创建 Canal 用户

为了保证数据安全,建议为 Canal 创建一个专用的 MySQL 用户,并授予其必要的权限:

CREATE USER canal@'%' IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT, SUPER ON *.* TO 'canal'@'%';
ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'canal';
FLUSH PRIVILEGES;

3.3 使用 Docker 安装 Canal

Canal 提供了 Docker 镜像,可以通过以下命令安装并启动 Canal:

docker pull canal/canal-server:v1.1.5docker run -p 11111:11111 --name canal \
-e canal.destinations=tingshuTopic \
-e canal.instance.master.address=192.168.200.130:3306  \
-e canal.instance.dbUsername=canal  \
-e canal.instance.dbPassword=canal  \
-e canal.instance.connectionCharset=UTF-8 \
-e canal.instance.tsdb.enable=true \
-e canal.instance.gtidon=false  \
-e canal.instance.filter.regex=.*\\..* \
-d canal/canal-server:v1.1.5

四、Spring Boot 整合 Canal

4.1 创建 Spring Boot 工程

使用 Spring Initializr 创建一个新的 Spring Boot 工程

4.2 添加依赖

pom.xml 文件中添加以下依赖:

  • 目前canal不支持jdk17,变成jdk8版本

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.atguigu</groupId><artifactId>service-cdc</artifactId><version>1.0-SNAPSHOT</version><packaging>jar</packaging><name>service-cdc</name><url>http://maven.apache.org</url><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.6.RELEASE</version><relativePath/></parent><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><!--web 需要启动项目--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>top.javatool</groupId><artifactId>canal-spring-boot-starter</artifactId><version>1.2.1-RELEASE</version></dependency><!--    起到监听的作用     --><dependency><groupId>javax.persistence</groupId><artifactId>persistence-api</artifactId><version>1.0</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency></dependencies>
</project>

4.3 配置 Canal 和 Redis

一个canal服务器有很多个客户端,每个客户端有自己的通道名字

application-dev.yml 文件中配置 Canal 和 Redis 的连接信息:

server:port: 7080
#canal配置
canal:destination: tingshuTopic #Canal服务端发送数据的话题名称跟上面容器里参数destinations的一样server: 192.168.200.130:11111spring:redis:host: 192.168.200.130port: 6379

4.4 创建实体类

通过实体类监听到mysql变化的数据,但因为不同表的数据都不一样,所以每个实体类的字段都不一样,但是每个表都会有id,所以在实体类中加上变化字段的id

import lombok.Data;
import javax.persistence.Column;@Data
public class CDCEntity {// 注意Column 注解必须是persistence包下的,表示监听表中的一个字段@Column(name = "id")private Long id;
}

4.5 创建 Canal 处理类

创建一个类实现 EntryHandler 接口,用于处理 Canal 解析的数据变更事件:它定义了三个方法,分别对应 INSERTUPDATEDELETE 操作:

import com.alibaba.otter.canal.client.adapter.support.EntryHandler;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;@Slf4j
@Component
@CanalTable("album_info")
public class AlbumInfoCdcHandler implements EntryHandler<CDCEntity> {@Autowiredprivate RedisTemplate redisTemplate;@Overridepublic void insert(CDCEntity cdcEntity) {log.info("监听到数据添加,ID: {}", cdcEntity.getId());}@Overridepublic void update(CDCEntity before, CDCEntity after) {log.info("监听到数据更新,ID: {}", after.getId());String key = "album:info:" + after.getId();redisTemplate.delete(key);}@Overridepublic void delete(CDCEntity cdcEntity) {log.info("监听到数据删除,ID: {}", cdcEntity.getId());}
}

实例:

@Slf4j
@Component
@CanalTable("album_info") 监听变更表
public class AlbumInfoCdcHandler implements EntryHandler<CDCEntity> {@Autowiredprivate RedisTemplate redisTemplate;//mysql执行添加操作,这个方法执行public void insert(CDCEntity cdcEntity) {log.info("监听到数据修改,ID:{}", cdcEntity.getId());}//mysql执行修改操作,这个方法执行public void update(CDCEntity before, CDCEntity after) {log.info("监听到数据修改,ID:{}", after.getId());String key = "album:info:" + after.getId();redisTemplate.delete(key);}//mysql执行删除操作,这个方法执行public void delete(CDCEntity cdcEntity) {log.info("监听到数据修改,ID:{}", cdcEntity.getId());}
}

 

五、总结

Canal 是一款强大的 MySQL 数据库增量日志解析中间件,通过模拟 MySQL 从库的行为,实现数据的实时同步和处理。在本文中,我们详细介绍了 Canal 的工作原理、安装配置方法以及如何与 Spring Boot 进行整合。通过 Canal,我们可以轻松地实现 MySQL 数据库与 Redis 等其他存储系统的数据一致性,为分布式系统的开发提供了有力的支持。

希望本文对你有所帮助。如果有任何问题或建议,欢迎在评论区留言。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/73940.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

《论语别裁》第01章 学而(31) 诗的人生

不过这句话研究起来有一个问题&#xff0c;是诗的问题。我们知道中国文化&#xff0c;在文学的境界上&#xff0c;有一个演变发展的程序&#xff0c;大体的情形&#xff0c;是所谓汉文、唐诗、宋词、元曲、明小说&#xff0c;到了清朝&#xff0c;我认为是对联&#xff0c;尤其…

笔记本运行边缘计算

笔记本电脑可以用来运行PCDN&#xff08;Peer-to-Peer Content Delivery Network&#xff09;服务。实际上&#xff0c;如果你有闲置的笔记本电脑&#xff0c;并且它具备一定的硬件条件和网络环境&#xff0c;那么它可以成为一个不错的PCDN节点。 运行PCDN的基本要求 硬件需求…

暗光增强技术研究进展与产品落地综合分析(2023-2025)

一、引言 暗光增强技术作为计算机视觉与移动影像领域的核心研究方向之一,近年来在算法创新、硬件适配及产品落地方面取得了显著进展。本文从技术研究与产业应用两个维度,系统梳理近三年(2023-2025)该领域的关键突破,并对比分析主流手机厂商的影像技术优劣势。 二、暗光增…

多维array和多维视图std::mdspan

多维数组 这个特性用于访问多维数组&#xff0c;之前C operator[] 只支持访问单个下标&#xff0c;无法访问多维数组。 因此要访问多维数组&#xff0c;以前的方式是&#xff1a; 重载operator()&#xff0c;于是能够以m(1, 2) 来访问第1 行第2 个元素。但这种方式容易和函数…

Python标准库之os模块常用方法

一、os模块简介 os模块是Python标准库中与操作系统交互的一个重要模块。它提供了非常丰富的方法来处理文件、目录以及与操作系统相关的操作&#xff0c;让我们可以编写跨平台的代码&#xff0c;无论是在Windows、Linux还是macOS系统上都能运行。 二、文件和目录操作 获取当前…

利用AI让数据可视化

1. 从问卷星上下载一份答题结果。 序号用户ID提交答卷时间所用时间来源来源详情来自IP总分1、《中华人民共和国电子商务法》正式实施的时间是&#xff08;&#xff09;。2、&#xff08;&#xff09;可以判断企业在行业中所处的地位。3、&#xff08;&#xff09;是指店铺内有…

K8S学习之基础三十五:k8s之Prometheus部署模式

Prometheus 有多种部署模式&#xff0c;适用于不同的场景和需求。以下是几种常见的部署模式&#xff1a; 1. 单节点部署 这是最简单的部署模式&#xff0c;适用于小型环境或测试环境。 特点&#xff1a; 单个 Prometheus 实例负责所有的数据采集、存储和查询。配置简单&…

【第14节】windows sdk编程:进程与线程介绍

目录 一、进程与线程概述 1.1 进程查看 1.2 何为进程 1.3 进程的创建 1.4 进程创建实例 1.5 线程查看 1.6 何为线程 1.7 线程的创建 1.8 线程函数 1.9 线程实例 二、内核对象 2.1 何为内核对象 2.2 内核对象的公共特点 2.3 内核对象句柄 2.4 内核对象的跨进程访…

Python简单爬虫实践案例

学习目标 能够知道Web开发流程 能够掌握FastAPI实现访问多个指定网页 知道通过requests模块爬取图片 知道通过requests模块爬取GDP数据 能够用pyecharts实现饼图 能够知道logging日志的使用 一、基于FastAPI之Web站点开发 1、基于FastAPI搭建Web服务器 # 导入FastAPI模…

uniapp工程中解析markdown文件

在uniapp中如何导入markdown文件&#xff0c;同时在页面中解析成html&#xff0c;请参考以下配置&#xff1a; 1. 安装以下3个依赖包 npm install marked highlight.js vite-plugin-markdown 2. 创建vite.config.js配置文件 // vite.config.js import { defineConfig } fro…

sass介绍

1、Sass简介 Sass 是一种 CSS 的预编译语言。它提供了 变量&#xff08;variables&#xff09;、嵌套&#xff08;nested rules&#xff09;、 混合&#xff08;mixins&#xff09;、 函数&#xff08;functions&#xff09;等功能&#xff0c;并且完全兼容 CSS 语法。Sass 能…

[JavaScript]如何利用作用域块避免闭包内存泄漏?

出自《你不知道的JavaScript》上卷 以下是本书给出的反例: function process (data) {...} var bigdata{...} process(bigdata); var btn document.getElementById(x); btn.addEventListener(click, function click{...});click会被回调在其他位置, 在addEventListener函数内…

leetcode hot100(五)

11. 盛最多水的容器 给定一个长度为 n 的整数数组 height 。有 n 条垂线&#xff0c;第 i 条线的两个端点是 (i, 0) 和 (i, height[i]) 。 找出其中的两条线&#xff0c;使得它们与 x 轴共同构成的容器可以容纳最多的水。 返回容器可以储存的最大水量。 说明&#xff1a;你…

Unity 云渲染本地部署方案

Unity Render Streaming 云渲染环境搭建 0.安装 Unity Render Streaming 实现原理: 服务器与客户端实现功能包括: 详细内容见官方文档&#xff1a; 官方文档: https://docs.unity3d.com/Packages/com.unity.renderstreaming3.1/manual/tutorial.html Unity 流送云渲染介绍: …

洛谷 P3986 斐波那契数列

P3986 斐波那契数列 题目描述 定义一个数列&#xff1a; f ( 0 ) a , f ( 1 ) b , f ( n ) f ( n − 1 ) f ( n − 2 ) f(0) a, f(1) b, f(n) f(n - 1) f(n - 2) f(0)a,f(1)b,f(n)f(n−1)f(n−2) 其中 a, b 均为正整数&#xff0c;n ≥ 2。 问有多少种 (a, b)&…

【java面型对象进阶】------继承实例

继承结构下的标准Javabean 代码如下&#xff1a; package demo10;//定义员工父类 public class Employee {private String id;private String name;private double salary;//构造方法public Employee(){}public Employee(String id,String name,double salary){this.idid;thi…

Vitis 2024.1 无法正常编译custom ip的bug(因为Makefile里的wildcard)

现象&#xff1a;如果在vivado中&#xff0c;添加了自己的custom IP&#xff0c;比如AXI4 IP&#xff0c;那么在Vitis&#xff08;2024.1&#xff09;编译导出的原本的.xsa的时候&#xff0c;会构建build失败。报错代码是&#xff1a; "Compiling blank_test_ip..."…

【图论】并查集的学习和使用

目录 并查集是什么&#xff1f; 举个例子 组成 父亲数组&#xff1a; find函数&#xff1a; union函数&#xff1a; 代码实现&#xff1a; fa[] 初始化code: find code&#xff1a; 递归实现: 非递归实现: union code : 画图模拟&#xff1a; 路径压缩&#xff1a…

Java使用FFmpegFrameGrabber进行视频拆帧,结合Thumbnails压缩图片保存到文件夹

引入依赖 <dependency><groupId>net.coobird</groupId><artifactId>thumbnailator</artifactId><version>0.4.17</version></dependency><dependency><groupId>org.bytedeco</groupId><artifactId>ja…

mysql与redis的日志策略

MySQL 和 Redis 在日志记录方面采用了不同的策略&#xff0c;分别对应写前日志&#xff08;Write-Ahead Logging, WAL&#xff09;和写后日志&#xff08;Write-After Logging&#xff09;。以下是它们的详细说明&#xff1a; 1. MySQL&#xff1a;写前日志&#xff08;Write-A…