MySQL与Canal、RabbitMQ集成指南

MySQL 部分

1. 查看是否开启 binlog

MySQL 8 默认开启 binlog。可以通过以下命令查看是否开启:

SHOW VARIABLES LIKE 'log_bin';

如果返回结果为 ON,则表示 binlog 已开启。

Variable_nameValue
log_binON

2. 若未开启 binlog,则需手动配置

如果 binlog 未开启,需要在 MySQL 配置文件中添加以下配置:

log-bin=mysql-bin  # 开启 binlog
server_id=1        # 配置 MySQL replication 需要定义,确保不与 Canal 的 slaveId 重复

修改完成后,重启 MySQL 使配置生效。

3. 创建 Canal 使用的 MySQL 用户

Canal 需要连接到 MySQL 并读取 binlog,因此需要创建一个专门的用户并授予相应权限。

# 创建用户
CREATE USER canal IDENTIFIED WITH mysql_native_password BY 'canal';# 授予权限
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';# 刷新权限
FLUSH PRIVILEGES;

MQ 部分

在 RabbitMQ 中,我们需要创建一个交换机和队列,并将它们绑定在一起。我使用的是已经创建过的 Virtual Host : trovebox_dev,你可以根据实际情况决定是否创建新的 Virtual Host。

1. 新建交换机

在 RabbitMQ 管理界面中,创建一个新的交换机,命名为 canal.exchange

新建交换机

2. 添加队列

创建一个新的队列,命名为 canal.queue

添加队列

3. 绑定交换机

将队列 canal.queue 绑定到交换机 canal.exchange,并设置路由键为 canal.routing.key

绑定交换机

绑定交换机

Canal 部分

Docker 安装 Canal

使用 Docker 安装 Canal 非常简单,以下是安装步骤:

  1. 拉取 Canal 镜像:没有tag默认最新的

    docker pull canal/canal-server
    
  2. 运行 Canal 容器:

    docker run -p 11111:11111 -p 11110:11110 -p 11112:11112 \--name canal \-e canal.destinations=destination \-e canal.instance.master.address=ip:port \-e canal.instance.dbUsername=canal \-e canal.instance.dbPassword=canal \-e canal.instance.connectionCharset=UTF-8 \-e canal.instance.tsdb.enable=true \-e canal.instance.gtidon=false \-e canal.instance.filter.regex=dataBaseName\\..* \-d canal/canal-server:latest
    
  3. 将 Canal 的配置文件和日志文件拷贝到宿主机:

    docker cp containerId:/home/admin/canal-server/conf /www/dk_project/dk_app/canal/
    docker cp containerId:/home/admin/canal-server/logs /www/dk_project/dk_app/canal/
    
  4. 修改配置文件 conf/canal.properties

    canal.serverMode = rabbitMQ
    rabbitmq.host = ip
    rabbitmq.virtual.host = trovebox_dev
    rabbitmq.exchange = canal.exchange
    rabbitmq.username = trovebox_dev
    rabbitmq.password = troveboxadmin
    
  5. 修改配置文件 conf/destination/canal.properties

    canal.instance.dbUsername=canal
    canal.instance.dbPassword=canal
    canal.mq.topic=canal.routing.key
    
  6. 删除并重新创建 Canal 容器:

    docker rm -f canaldocker run -p 11111:11111 -p 11110:11110 -p 11112:11112 \--name canal \-e canal.destinations=destination \-e canal.instance.master.address=ip:port \-e canal.instance.dbUsername=canal \-e canal.instance.dbPassword=canal \-e canal.instance.connectionCharset=UTF-8 \-e canal.instance.tsdb.enable=true \-v /www/dk_project/dk_app/canal/conf:/home/admin/canal-server/conf/ \-v /home/admin/canal-server/logs:/home/admin/canal-server/logs/ \-e canal.instance.filter.regex=dataBaseName\\..* \-d canal/canal-server:latest
    

Java 部分代码

BinLogDto.java

BinLogDto 类用于解析 Canal 发送的 binlog 数据。

package online.trovebox.ruyiai.common.dto;import cn.hutool.json.JSONArray;
import cn.hutool.json.JSONObject;
import lombok.Data;import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;@Data
public class BinLogDto {private String database;  // 数据库private String table;     // 表private String type;      // 操作类型private JSONArray data;   // 操作数据private JSONArray old;    // 变更前数据private JSONArray pkNames; // 主键名称private String sql;       // 执行 SQL 语句private Long es;private String gtid;private Long id;private Boolean isDdl;private JSONObject mysqlType;private JSONObject sqlType;private Long ts;public <T> List<T> getData(Class<T> clazz) {if (this.data == null || this.data.size() == 0) {return null;}return this.data.toList(clazz);}public <T> List<T> getOld(Class<T> clazz) {if (this.old == null || this.old.size() == 0) {return null;}return this.old.toList(clazz);}public List<String> getPkNames() {if (this.pkNames == null || this.pkNames.size() == 0) {return null;}List<String> pkNames = new ArrayList<>();for (Object pkName : this.pkNames) {pkNames.add(pkName.toString());}return pkNames;}public Map<String, String> getMysqlType() {if (this.mysqlType == null) {return null;}Map<String, String> mysqlTypeMap = new HashMap<>();this.mysqlType.forEach((k, v) -> {mysqlTypeMap.put(k, v.toString());});return mysqlTypeMap;}public Map<String, Integer> getSqlType() {if (this.sqlType == null) {return null;}Map<String, Integer> sqlTypeMap = new HashMap<>();this.sqlType.forEach((k, v) -> {sqlTypeMap.put(k, Integer.valueOf(v.toString()));});return sqlTypeMap;}
}

Listener.java

Listener 类用于监听 RabbitMQ 中的消息,并处理 binlog 数据。

package online.trovebox.ruyiai.listener;import com.alibaba.fastjson2.JSON;
import jakarta.annotation.Resource;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import online.trovebox.ruyiai.common.dto.BinLogDto;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.data.redis.core.Cursor;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ScanOptions;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.Set;@Component
@Slf4j
@RequiredArgsConstructor
public class CanalListener {@Resourceprivate RedisTemplate<String, Object> redisTemplate;String[] prefixes = new String[]{"coin_change_log","log","message",};@RabbitListener(bindings = {@QueueBinding(value = @Queue(value = "canal.queue", durable = "true"),exchange = @Exchange(value = "canal.exchange"),key = "canal.routing.key")})public void handleDataChange(@Payload Message message) {String content = new String(message.getBody(), StandardCharsets.UTF_8);BinLogDto binLog = JSON.parseObject(content, BinLogDto.class);String type = binLog.getType();if (type.equalsIgnoreCase("select")) {return;}String table = binLog.getTable();for (String prefix : prefixes) {if (table.startsWith(prefix)) {System.err.println(table);return;}}log.info("表:{} 操作类型:{}", table, binLog.getType());log.info("操作后数据:{} ", binLog.getData().toStringPretty());deleteKeysStartingWith(table);}public void deleteKeysStartingWith(String prefix) {String cursor = "0";do {Set<String> keys = scanKeys(cursor, prefix);cursor = keys.isEmpty() ? "0" : "1";if (!keys.isEmpty()) {redisTemplate.delete(keys);}} while (!"0".equals(cursor));}private Set<String> scanKeys(String cursor, String prefix) {return redisTemplate.execute((RedisCallback<Set<String>>) connection -> {ScanOptions options = ScanOptions.scanOptions().match(prefix + "*").count(1000).build();Cursor<byte[]> cursorScan = connection.scan(options);Set<String> keys = new HashSet<>();while (cursorScan.hasNext()) {byte[] keyBytes = cursorScan.next();keys.add(new String(keyBytes, StandardCharsets.UTF_8));}return keys;});}
}

效果图

效果图

知识点说明

  1. Binlog:MySQL 的二进制日志,用于记录数据库的所有更改操作。Canal 通过读取 binlog 来获取数据库的变更数据。
  2. Canal:阿里巴巴开源的数据库同步工具,基于 MySQL 的 binlog 实现数据同步。
  3. RabbitMQ:消息队列中间件,用于在分布式系统中传递消息。Canal 可以将 binlog 数据发送到 RabbitMQ,供其他服务消费。
  4. Redis:内存数据库,用于缓存数据。在监听 binlog 变更时,可以通过 Redis 缓存相关数据,并在数据变更时清除缓存。

通过以上步骤和代码,你可以实现 MySQL 数据库的变更监听,并将变更数据通过 RabbitMQ 发送到其他服务进行处理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/897980.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

X86 RouterOS 7.18 设置笔记十:上海电信IPTV使用msd_lite实现组播转单拨

X86 j4125 4网口小主机折腾笔记五&#xff1a;PVE安装ROS RouterOS X86 RouterOS 7.18 设置笔记一&#xff1a;基础设置 X86 RouterOS 7.18 设置笔记二&#xff1a;网络基础设置(IPV4) X86 RouterOS 7.18 设置笔记三&#xff1a;防火墙设置(IPV4) X86 RouterOS 7.18 设置笔记四…

Select 选择器选项位置偏移的解决方案

Select 选择器选项位置偏移的解决方案 在使用 Select 组件时&#xff0c;可能会遇到下拉选项位置偏移的问题。这通常由 CSS 样式、组件 渲染方式 或 父级元素的影响 造成。以下是详细的排查步骤和解决方案。 一、常见原因 position: relative; 或 overflow: hidden; 影响下拉菜…

LeetCode 解题思路 17(Hot 100)

解题思路&#xff1a; 找到链表中点&#xff1a; 使用快慢指针法&#xff0c;快指针每次移动两步&#xff0c;慢指针每次移动一步。当快指针到达末尾时&#xff0c;慢指针指向中点。递归分割与排序&#xff1a; 将链表从中点处分割为左右两个子链表&#xff0c;分别对这两个子…

数学建模历程之初见

第一次接触数学建模是在上大学前&#xff0c;当时只是听过。起源于我在大学的老乡群里聊天&#xff0c;由于当时年轻有点傻&#xff0c;说的话太多了&#xff0c;什么都问哈哈哈哈哈。 后来有个学长从老乡群里加我&#xff0c;问我怎么话那么多&#xff0c;你们懂当时对我幼小…

Python 科学计算与机器学习入门:NumPy + Scikit-Learn 实战指南

Langchain系列文章目录 01-玩转LangChain&#xff1a;从模型调用到Prompt模板与输出解析的完整指南 02-玩转 LangChain Memory 模块&#xff1a;四种记忆类型详解及应用场景全覆盖 03-全面掌握 LangChain&#xff1a;从核心链条构建到动态任务分配的实战指南 04-玩转 LangChai…

「自动驾驶背后的数学:从传感器数据到控制指令的函数嵌套」—— 揭秘人工智能中的线性函数、ReLU 与复合函数

引言 自动驾驶技术是人工智能领域的一个重要应用&#xff0c;其核心在于如何将传感器数据转化为车辆控制指令。这一过程涉及大量的数学知识&#xff0c;包括线性函数、激活函数&#xff08;如 ReLU&#xff09;以及复合函数的嵌套使用。本文将深入探讨自动驾驶中的数学原理&am…

详解SQL数据定义功能

数据定义 1. 数据库模式&#xff08;Schema&#xff09;的定义与删除定义模式删除模式 2. 基本表的定义、修改与删除定义表约束1. NOT NULL 约束2. DEFAULT 约束3. UNIQUE 约束4. PRIMARY KEY 约束多列主键示例&#xff1a; 5. FOREIGN KEY 约束6. CHECK 约束7. AUTO_INCREMENT…

Redis超高并发分key实现

Redis扛并发的能力是非常强的&#xff0c;所以高并发场景下经常会使用Redis&#xff0c;但是Redis单分片的写入瓶颈在2w左右&#xff0c;读瓶颈在10w左右&#xff0c;如果在超高并发下即使是集群部署Redis&#xff0c;单分片的Redis也是有可能扛不住的&#xff0c;如下图所示&a…

AI Agent 时代开幕-Manus AI与OpenAI Agent SDK掀起新风暴

【本周AI新闻: AI Agent 时代开幕-Manus AI与OpenAI Agent SDK掀起新风暴】 https://www.bilibili.com/video/BV1bkQyYCEvQ/?share_sourcecopy_web&vd_source32ed33e1165d68429b2e2eb4749f3f26 最近AI圈子里最火的话题非Manus莫属&#xff01;这款由中国武汉创业公司“蝴…

多时间尺度的配电网深度强化学习无功优化策略的Python示例代码框架

以下是一个简单的多时间尺度的配电网深度强化学习无功优化策略的Python示例代码框架&#xff0c;用于帮助你理解如何使用深度强化学习&#xff08;以深度Q网络 DQN 为例&#xff09;来处理配电网的无功优化问题。在实际应用中&#xff0c;你可能需要根据具体的配电网模型和需求…

剑指 Offer II 081. 允许重复选择元素的组合

comments: true edit_url: https://github.com/doocs/leetcode/edit/main/lcof2/%E5%89%91%E6%8C%87%20Offer%20II%20081.%20%E5%85%81%E8%AE%B8%E9%87%8D%E5%A4%8D%E9%80%89%E6%8B%A9%E5%85%83%E7%B4%A0%E7%9A%84%E7%BB%84%E5%90%88/README.md 剑指 Offer II 081. 允许重复选择…

Webpack 前端性能优化全攻略

文章目录 1. 性能优化全景图1.1 优化维度概览1.2 优化效果指标 2. 构建速度优化2.1 缓存策略2.2 并行处理2.3 减少构建范围 3. 输出质量优化3.1 代码分割3.2 Tree Shaking3.3 压缩优化 4. 运行时性能优化4.1 懒加载4.2 预加载4.3 资源优化 5. 高级优化策略5.1 持久化缓存5.2 模…

虚拟电商-数据库分库分表(二)

本文章介绍&#xff1a;使用Sharding-JDBC实现数据库分库分表&#xff0c;数据库分片策略&#xff0c;实现数据库按月分表 一、Sharding-JDBC使用 1.1.准备环境 步骤一&#xff1a;分库分表sql脚本导入 创建了两个数据库&#xff1a;chongba_schedule0 和chongba_schedule1…

向量数据库对比以及Chroma操作

一、向量数据库与传统类型数据库 向量数据库&#xff08;Vector Storage Engine&#xff09;与传统类型的数据库如关系型数据库&#xff08;MySQL&#xff09;、文档型数据库&#xff08;MongoDB&#xff09;、键值存储&#xff08;Redis&#xff09;、全文搜索引擎&#xff0…

python列表基础知识

列表 创建列表 1.列表的定义&#xff1a;可变的&#xff0c;有序的数据结构&#xff0c;可以随时添加或者删除其中的元素 2.基本语法&#xff1a;字面量【元素1&#xff0c;元素2&#xff0c;元素3】使用[]创建列表 定义变量&#xff1a;变量名称【元素1&#xff0c;元素2&…

Node.js 的模块作用域和 module 对象详细介绍

目录 代码示例 1. 创建模块文件 module-demo.js 2. 导入模块并使用 module-demo.js 运行结果 总结 在 Node.js 中&#xff0c;每个文件都是一个独立的模块&#xff0c;具有自己的作用域。与浏览器 JavaScript 代码不同&#xff0c;Node.js 采用模块作用域&#xff0c;这意味…

美畅物联丨WebRTC 技术详解:构建实时通信的数字桥梁

在互联网技术飞速发展的今天&#xff0c;实时通信已成为数字生活的核心需求。WebRTC作为一个开源项目&#xff0c;凭借卓越的技术实力与创新理念&#xff0c;为网页和移动应用带来了颠覆性的实时通信能力。它突破了传统通信方式的限制&#xff0c;实现了音频、视频和数据在用户…

excel中两个表格的合并

使用函数&#xff1a; VLOOKUP函数 如果涉及在excel中两个工作表之间进行配对合并&#xff0c;则&#xff1a; VLOOKUP(C1,工作表名字!A:B,2,0) 参考&#xff1a; excel表格中vlookup函数的使用方法步骤https://haokan.baidu.com/v?pdwisenatural&vid132733503560775…

单引号与双引号在不同编程语言中的使用与支持

在编程语言中&#xff0c;单引号和双引号是常见的符号&#xff0c;它们通常用来表示字符和字符串。然而&#xff0c;如何使用这两种符号在不同的编程语言中有所不同&#xff0c;甚至有一些语言并不区分单引号和双引号的用途。本文将详细介绍不同编程语言中单引号与双引号的支持…

怎么鉴别金媒v10.51和v10.5的区别!单单从CRM上区分!

2.怎么鉴别程序是10.5还是10.51 &#xff1f;* 作为商业用户&#xff0c;升级完全没有这个担心&#xff0c;但是这次升级从全局来看清晰度不是很高&#xff0c;不像10.5的升级后台UI都变化了&#xff01;你说有漏洞但是我没遇到过 所以我也不知道升级了啥只能看版本数字是无法区…