襄阳住房和城乡建设网站亚马逊品牌网站要怎么做
web/
2025/9/26 2:55:41/
文章来源:
襄阳住房和城乡建设网站,亚马逊品牌网站要怎么做,鞍山+网站建设,网站网站建设公司上海MySQL 部分
1. 查看是否开启 binlog
MySQL 8 默认开启 binlog。可以通过以下命令查看是否开启#xff1a;
SHOW VARIABLES LIKE log_bin;如果返回结果为 ON#xff0c;则表示 binlog 已开启。
Variable_nameValuelog_binON
2. 若未开启 binlog#xff0c;则需手动配置 …MySQL 部分
1. 查看是否开启 binlog
MySQL 8 默认开启 binlog。可以通过以下命令查看是否开启
SHOW VARIABLES LIKE log_bin;如果返回结果为 ON则表示 binlog 已开启。
Variable_nameValuelog_binON
2. 若未开启 binlog则需手动配置
如果 binlog 未开启需要在 MySQL 配置文件中添加以下配置
log-binmysql-bin # 开启 binlog
server_id1 # 配置 MySQL replication 需要定义确保不与 Canal 的 slaveId 重复修改完成后重启 MySQL 使配置生效。
3. 创建 Canal 使用的 MySQL 用户
Canal 需要连接到 MySQL 并读取 binlog因此需要创建一个专门的用户并授予相应权限。
# 创建用户
CREATE USER canal IDENTIFIED WITH mysql_native_password BY canal;# 授予权限
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO canal%;# 刷新权限
FLUSH PRIVILEGES;MQ 部分
在 RabbitMQ 中我们需要创建一个交换机和队列并将它们绑定在一起。我使用的是已经创建过的 Virtual Host trovebox_dev你可以根据实际情况决定是否创建新的 Virtual Host。
1. 新建交换机
在 RabbitMQ 管理界面中创建一个新的交换机命名为 canal.exchange。 2. 添加队列
创建一个新的队列命名为 canal.queue。 3. 绑定交换机
将队列 canal.queue 绑定到交换机 canal.exchange并设置路由键为 canal.routing.key。 Canal 部分
Docker 安装 Canal
使用 Docker 安装 Canal 非常简单以下是安装步骤 拉取 Canal 镜像没有tag默认最新的 docker pull canal/canal-server运行 Canal 容器 docker run -p 11111:11111 -p 11110:11110 -p 11112:11112 \--name canal \-e canal.destinationsdestination \-e canal.instance.master.addressip:port \-e canal.instance.dbUsernamecanal \-e canal.instance.dbPasswordcanal \-e canal.instance.connectionCharsetUTF-8 \-e canal.instance.tsdb.enabletrue \-e canal.instance.gtidonfalse \-e canal.instance.filter.regexdataBaseName\\..* \-d canal/canal-server:latest将 Canal 的配置文件和日志文件拷贝到宿主机 docker cp containerId:/home/admin/canal-server/conf /www/dk_project/dk_app/canal/
docker cp containerId:/home/admin/canal-server/logs /www/dk_project/dk_app/canal/修改配置文件 conf/canal.properties canal.serverMode rabbitMQ
rabbitmq.host ip
rabbitmq.virtual.host trovebox_dev
rabbitmq.exchange canal.exchange
rabbitmq.username trovebox_dev
rabbitmq.password troveboxadmin修改配置文件 conf/destination/canal.properties canal.instance.dbUsernamecanal
canal.instance.dbPasswordcanal
canal.mq.topiccanal.routing.key删除并重新创建 Canal 容器 docker rm -f canaldocker run -p 11111:11111 -p 11110:11110 -p 11112:11112 \--name canal \-e canal.destinationsdestination \-e canal.instance.master.addressip:port \-e canal.instance.dbUsernamecanal \-e canal.instance.dbPasswordcanal \-e canal.instance.connectionCharsetUTF-8 \-e canal.instance.tsdb.enabletrue \-v /www/dk_project/dk_app/canal/conf:/home/admin/canal-server/conf/ \-v /home/admin/canal-server/logs:/home/admin/canal-server/logs/ \-e canal.instance.filter.regexdataBaseName\\..* \-d canal/canal-server:latestJava 部分代码
BinLogDto.java
BinLogDto 类用于解析 Canal 发送的 binlog 数据。
package online.trovebox.ruyiai.common.dto;import cn.hutool.json.JSONArray;
import cn.hutool.json.JSONObject;
import lombok.Data;import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;Data
public class BinLogDto {private String database; // 数据库private String table; // 表private String type; // 操作类型private JSONArray data; // 操作数据private JSONArray old; // 变更前数据private JSONArray pkNames; // 主键名称private String sql; // 执行 SQL 语句private Long es;private String gtid;private Long id;private Boolean isDdl;private JSONObject mysqlType;private JSONObject sqlType;private Long ts;public T ListT getData(ClassT clazz) {if (this.data null || this.data.size() 0) {return null;}return this.data.toList(clazz);}public T ListT getOld(ClassT clazz) {if (this.old null || this.old.size() 0) {return null;}return this.old.toList(clazz);}public ListString getPkNames() {if (this.pkNames null || this.pkNames.size() 0) {return null;}ListString pkNames new ArrayList();for (Object pkName : this.pkNames) {pkNames.add(pkName.toString());}return pkNames;}public MapString, String getMysqlType() {if (this.mysqlType null) {return null;}MapString, String mysqlTypeMap new HashMap();this.mysqlType.forEach((k, v) - {mysqlTypeMap.put(k, v.toString());});return mysqlTypeMap;}public MapString, Integer getSqlType() {if (this.sqlType null) {return null;}MapString, Integer sqlTypeMap new HashMap();this.sqlType.forEach((k, v) - {sqlTypeMap.put(k, Integer.valueOf(v.toString()));});return sqlTypeMap;}
}Listener.java
Listener 类用于监听 RabbitMQ 中的消息并处理 binlog 数据。
package online.trovebox.ruyiai.listener;import com.alibaba.fastjson2.JSON;
import jakarta.annotation.Resource;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import online.trovebox.ruyiai.common.dto.BinLogDto;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.data.redis.core.Cursor;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ScanOptions;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.Set;Component
Slf4j
RequiredArgsConstructor
public class CanalListener {Resourceprivate RedisTemplateString, Object redisTemplate;String[] prefixes new String[]{coin_change_log,log,message,};RabbitListener(bindings {QueueBinding(value Queue(value canal.queue, durable true),exchange Exchange(value canal.exchange),key canal.routing.key)})public void handleDataChange(Payload Message message) {String content new String(message.getBody(), StandardCharsets.UTF_8);BinLogDto binLog JSON.parseObject(content, BinLogDto.class);String type binLog.getType();if (type.equalsIgnoreCase(select)) {return;}String table binLog.getTable();for (String prefix : prefixes) {if (table.startsWith(prefix)) {System.err.println(table);return;}}log.info(表{} 操作类型:{}, table, binLog.getType());log.info(操作后数据{} , binLog.getData().toStringPretty());deleteKeysStartingWith(table);}public void deleteKeysStartingWith(String prefix) {String cursor 0;do {SetString keys scanKeys(cursor, prefix);cursor keys.isEmpty() ? 0 : 1;if (!keys.isEmpty()) {redisTemplate.delete(keys);}} while (!0.equals(cursor));}private SetString scanKeys(String cursor, String prefix) {return redisTemplate.execute((RedisCallbackSetString) connection - {ScanOptions options ScanOptions.scanOptions().match(prefix *).count(1000).build();Cursorbyte[] cursorScan connection.scan(options);SetString keys new HashSet();while (cursorScan.hasNext()) {byte[] keyBytes cursorScan.next();keys.add(new String(keyBytes, StandardCharsets.UTF_8));}return keys;});}
}效果图 知识点说明
BinlogMySQL 的二进制日志用于记录数据库的所有更改操作。Canal 通过读取 binlog 来获取数据库的变更数据。Canal阿里巴巴开源的数据库同步工具基于 MySQL 的 binlog 实现数据同步。RabbitMQ消息队列中间件用于在分布式系统中传递消息。Canal 可以将 binlog 数据发送到 RabbitMQ供其他服务消费。Redis内存数据库用于缓存数据。在监听 binlog 变更时可以通过 Redis 缓存相关数据并在数据变更时清除缓存。
通过以上步骤和代码你可以实现 MySQL 数据库的变更监听并将变更数据通过 RabbitMQ 发送到其他服务进行处理。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/81963.shtml
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!