Canal 结合 SpringBoot 源码梳理

1、canal是什么,可以用来作什么

canal是阿里开源的一个用于监听数据库binlog,从而实现数据同步的工具。

2、安装

我使用的是1.1.5版本,太高的版本需要的jdk版本和mysql的驱动版本会更高,可以根据自己的环境选择。
如果是自己玩的话安装 canal.deployer-1.1.5.tar.gz就可以了
地址: Release v1.1.5 · alibaba/canal · GitHub

3、springboot+mysql+canal实现数据同步可以在网上找到很多博客,不在赘述

4、源码梳理

(1)、既然用到springboot,肯定有一个自动注入的autoconfigure的start。

可以看到spring.factories会自动注入几个client。

(2)、找到一个看着顺眼的client进去看看:

  • 我选择的是SimpleClientAutoConfiguration
@Configuration
@EnableConfigurationProperties({CanalSimpleProperties.class})
@ConditionalOnBean({EntryHandler.class})
@ConditionalOnProperty(value = {"canal.mode"},havingValue = "simple",matchIfMissing = true
)
@Import({ThreadPoolAutoConfiguration.class})
public class SimpleClientAutoConfiguration {private CanalSimpleProperties canalSimpleProperties;public SimpleClientAutoConfiguration(CanalSimpleProperties canalSimpleProperties) {this.canalSimpleProperties = canalSimpleProperties;}@Beanpublic RowDataHandler<RowData> rowDataHandler() {return new RowDataHandlerImpl(new EntryColumnModelFactory());}@Bean@ConditionalOnProperty(value = {"canal.async"},havingValue = "true",matchIfMissing = true)public MessageHandler messageHandler(RowDataHandler<RowData> rowDataHandler, List<EntryHandler> entryHandlers, ExecutorService executorService) {return new AsyncMessageHandlerImpl(entryHandlers, rowDataHandler, executorService);}@Bean@ConditionalOnProperty(value = {"canal.async"},havingValue = "false")public MessageHandler messageHandler(RowDataHandler<RowData> rowDataHandler, List<EntryHandler> entryHandlers) {return new SyncMessageHandlerImpl(entryHandlers, rowDataHandler);}@Bean(initMethod = "start",destroyMethod = "stop")public SimpleCanalClient simpleCanalClient(MessageHandler messageHandler) {String server = this.canalSimpleProperties.getServer();String[] array = server.split(":");return SimpleCanalClient.builder().hostname(array[0]).port(Integer.parseInt(array[1])).destination(this.canalSimpleProperties.getDestination()).userName(this.canalSimpleProperties.getUserName()).password(this.canalSimpleProperties.getPassword()).messageHandler(messageHandler).batchSize(this.canalSimpleProperties.getBatchSize()).filter(this.canalSimpleProperties.getFilter()).timeout(this.canalSimpleProperties.getTimeout()).unit(this.canalSimpleProperties.getUnit()).build();}
}
看到会注入SimpleCanalClient。并且指明了初始化方法和销毁的方法。进去看看。发现是继承了一个抽象的client,这个类是关键,内部有start和stop的具体实现。
很明显,start就是启动一个线程 while(true)的去循环执行binlog的获取和处理。
如何获取的代码没有跟进,但是可以猜到,应该是通过连接然后去获取数据。
  • 这里着重看一下处理数据的代码:
public abstract class AbstractMessageHandler implements MessageHandler<Message> {private Map<String, EntryHandler> tableHandlerMap;private RowDataHandler<CanalEntry.RowData> rowDataHandler;public  AbstractMessageHandler(List<? extends EntryHandler> entryHandlers, RowDataHandler<CanalEntry.RowData> rowDataHandler) {this.tableHandlerMap = HandlerUtil.getTableHandlerMap(entryHandlers);this.rowDataHandler = rowDataHandler;}@Overridepublic  void handleMessage(Message message) {List<CanalEntry.Entry> entries = message.getEntries();  第一步 for (CanalEntry.Entry entry : entries) {if (entry.getEntryType().equals(CanalEntry.EntryType.ROWDATA)) {  第二步try {EntryHandler<?> entryHandler = HandlerUtil.getEntryHandler(tableHandlerMap, entry.getHeader().getTableName());   第三步if(entryHandler!=null){CanalModel model = CanalModel.Builder.builder().id(message.getId()).table(entry.getHeader().getTableName()).executeTime(entry.getHeader().getExecuteTime()).database(entry.getHeader().getSchemaName()).build();CanalContext.setModel(model);CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList();    第四步CanalEntry.EventType eventType = rowChange.getEventType();for (CanalEntry.RowData rowData : rowDataList) {rowDataHandler.handlerRowData(rowData,entryHandler,eventType);}}} catch (Exception e) {throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);}finally {CanalContext.removeModel();}}}}}
  • 进入rowDataHandler.handlerRowData(maps, entryHandler, eventType);实现类选择的是RowDataHandlerImpl。
public class RowDataHandlerImpl implements RowDataHandler<CanalEntry.RowData> {private IModelFactory<List<CanalEntry.Column>> modelFactory;public RowDataHandlerImpl(IModelFactory modelFactory) {this.modelFactory = modelFactory;}@Overridepublic <R> void handlerRowData(CanalEntry.RowData rowData, EntryHandler<R> entryHandler, CanalEntry.EventType eventType) throws Exception {if (entryHandler != null) {switch (eventType) {case INSERT:R object = modelFactory.newInstance(entryHandler, rowData.getAfterColumnsList());entryHandler.insert(object);break;case UPDATE:Set<String> updateColumnSet = rowData.getAfterColumnsList().stream().filter(CanalEntry.Column::getUpdated).map(CanalEntry.Column::getName).collect(Collectors.toSet());R before = modelFactory.newInstance(entryHandler, rowData.getBeforeColumnsList(),updateColumnSet);R after = modelFactory.newInstance(entryHandler, rowData.getAfterColumnsList());entryHandler.update(before, after);break;case DELETE:R o = modelFactory.newInstance(entryHandler, rowData.getBeforeColumnsList());entryHandler.delete(o);break;default:break;}}}
}
回想一下springboot中使用canal的时候,会有一个注解@CanalTable和一个实现类EntryHandler。
这里的代码要做的就是(1)、匹配合适的语句类型(insert、delete、update)。(2)、insert和delete只需要记录一下操作的值;update需要记录一下修改前和修改后的值。也很好理解,insert和delete回滚只需要反向重放代码就行,而update需要知道之前的数据采集重新update。
  • 进入newInstance方法,选择AbstractModelFactory:
public abstract class AbstractModelFactory<T> implements IModelFactory<T> {@Overridepublic <R> R newInstance(EntryHandler entryHandler, T t) throws Exception {String canalTableName = HandlerUtil.getCanalTableName(entryHandler);if (TableNameEnum.ALL.name().toLowerCase().equals(canalTableName)) {return (R) t;}Class<R> tableClass = GenericUtil.getTableClass(entryHandler);if (tableClass != null) {return newInstance(tableClass, t);}return null;}abstract <R> R newInstance(Class<R> c, T t) throws Exception;
}

重点来了,有两个HandlerUtil.getCanalTableName和GenericUtil.getTableClass。还记得咱们再springboot中的代码会指定 @CanalTable 处理的是那个表和EntryHandler泛型吗。
第一步判断这个EntryHandler实现类有没有指定要处理那个表,如果指定了All。那么就要就走自定义的返回值,这个返回值通常不是我们需要的。所以在使用中一定尽量指定要处理的表。
第二步需要匹配EntryHandler中的泛型类进行赋值操作了。
  • 最后进入newInstance方法:
public class EntryColumnModelFactory extends AbstractModelFactory<List<CanalEntry.Column>> {......@Override<R> R newInstance(Class<R> c, List<CanalEntry.Column> columns) throws Exception {R object = c.newInstance();Map<String, String> columnNames = EntryUtil.getFieldName(object.getClass());for (CanalEntry.Column column : columns) {String fieldName = columnNames.get(column.getName());if (StringUtils.isNotEmpty(fieldName)) {FieldUtil.setFieldValue(object, fieldName, column.getValue());}}return object;}}

        代码比较简单,通过反射给对象赋值。如果不太清楚这里是怎么把数据解析出来的,可以自己搭建起来服务执行一下看看canal返回的结构体,我下边也提出来我的返回,并且我也会将上边代码中和数据解析的地方标红。
获取消息 Message[id=14,entries=[header {version: 1logfileName: "mysql-bin.000004"logfileOffset: 19806serverId: 1serverenCode: "UTF-8"executeTime: 1706838103000sourceType: MYSQLschemaName: ""tableName: ""eventLength: 80
}
entryType: TRANSACTIONBEGIN
storeValue: " 9"
, header {version: 1logfileName: "mysql-bin.000004"logfileOffset: 19939serverId: 1serverenCode: "UTF-8"executeTime: 1706838103000sourceType: MYSQLschemaName: "test"tableName: "first"eventLength: 53eventType: INSERTprops {key: "rowsCount"value: "1"}
}
entryType: ROWDATA
storeValue: "\b\341\001\020\001P\000b\203\001\022\"\b\000\020\373\377\377\377\377\377\377\377\377\001\032\002id \001(\0010\000B\0016R\006bigint\022%\b\001\020\f\032\aaddress \000(\0010\000B\003333R\vvarchar(10)\0226\b\002\020]\032\vcreate_time \000(\0010\000B\0232024-02-02 09:41:43R\bdatetime"
, header {version: 1logfileName: "mysql-bin.000004"logfileOffset: 19992serverId: 1serverenCode: "UTF-8"executeTime: 1706838103000sourceType: MYSQLschemaName: ""tableName: ""eventLength: 31
}
entryType: TRANSACTIONEND
storeValue: "\022\006381841"
],raw=false,rawEntries=[]]

至此,在springboot中通过canal获取binlog的日志并且解析为自定义的entry对象的流程就已经分析、梳理完了。至于后续要怎么处理就有很多的方式了。
最后在分享一个idea跟踪源码的小技巧:
比如我们看到一个比较重要的注解,但是不知道这个注解具体实现在哪里,可以进入注解中,选中注解名称,然后选择Find Usages。就可以看到哪里使用了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/665489.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何保证MySQL和Redis中的数据一致性?

文章目录 前言一、缓存案例1.1 缓存常见用法1.2 缓存不一致产生的原因 二、解决方案2.1 先删除缓存&#xff0c;再更新数据库2.2 先更新数据库&#xff0c;删除缓存2.3 只更新缓存&#xff0c;由缓存自己同步更新数据库2.4 只更新缓存&#xff0c;由缓存自己异步更新数据库2.5 …

内核升级!IvorySQL 3.1 发版

1. 版本介绍 [发行日期&#xff1a;2024年1月26日] IvorySQL 3.1 基于 PostgreSQL 16.1 &#xff0c;包含来自 PostgreSQL 16.1 的各种修复。有关 PostgreSQL 16.1 中更详细的更新和错误修复&#xff0c;请参阅官方PostgreSQL 16.1 发行说明 。 &#x1f517;https://www.po…

MySQL-----DDL基础操作

SQL通用语法 1.SQL语句可以单行或多行书写&#xff0c;以分号结尾。 2. SQL语句可以使用空格/缩进来增强语句的可读性。 3. MySQL数据库的SQL语句不区分大小写&#xff0c;关键字建议使用大写。 4&#xff0e;注释: 单行注释:--注释内容或#注释内容(MySQL特有) 多行注释:/*注释…

【Iot】什么是串口?什么是串口通信?串口通信(串口通讯)原理,常见的串口通信方式有哪些?

串口通信原理 1. 串口2. 串口通信4. 波特率与比特率5. 帧格式3. 串口通讯的通讯协议3.1. RS2323.2. RS485 总结 1. 串口 串行接口简称串口&#xff0c;也称串行通信接口或串行通讯接口&#xff08;通常指COM接口&#xff09;&#xff0c;是采用串行通信方式的扩展接口。 串口可…

jstack命令解析

前言 如果有一天&#xff0c;你的Java程序长时间停顿&#xff0c;也许是它病了&#xff0c;需要用jstack拍个片子分析分析&#xff0c;才能诊断具体什么病症&#xff0c;是死锁综合征&#xff0c;还是死循环等其他病症&#xff0c;本文我们一起来学习jstack命令~ jstack 的功…

一、cadence PDK 自学笔记-心法

我这边ADS /Cadence PDK基本大部分都是自学完成的。 当然也非常感谢我的前同事周**的帮忙&#xff0c;教了我很多基础的。另外也感谢我现在同事&#xff0c;李**和程*的帮忙&#xff0c;学习了很多cad的视角。 其实对于自学写PDK的小伙伴&#xff0c;一般都要如何学习呢&…

深度学习入门笔记(六)线性回归模型

本节&#xff0c;我们用线性回归为例子&#xff0c;回顾一些基本概念 6.1 相关性 相关性的取值范围是-1 到 1&#xff0c;越接近 1 或者-1 代表越相关&#xff0c;越接近 0 则越不相关。相关系数大于 0 称为正相关&#xff0c;小于 0 称为负相关。 假如 A 与 B 正相关&#…

[UI5 常用控件] 05.FlexBox, VBox,HBox,HorizontalLayout,VerticalLayout

文章目录 前言1. FlexBox布局控件1.1 alignItems 对齐模式1.2 justifyContent 对齐模式1.3 Direction1.4 Sort1.5 Render Type1.6 嵌套使用1.7 组件等高显示 2. HBox,VBox3. HorizontalLayout&#xff0c;VerticalLayout 前言 本章节记录常用控件FlexBox,VBox,HBox,Horizontal…

快速理解复杂系统组成学习内容整合

目录 一、复杂系统组成 二、接入系统 (Access System) 三、应用系统 (Application System) 四、基础平台 (Foundation Platform) 五、中间件 (Abundant External Middleware) 六、支撑系统 (Supporting System) 参考文章 一、复杂系统组成 复杂系统是由多个相互关联、相…

React中的事件处理逻辑

在React中&#xff0c;事件处理逻辑是通过在组件上添加事件监听器来实现的。事件监听器是一个函数&#xff0c;它会在特定的事件发生时被调用。可以通过以下几个步骤来处理事件逻辑: 在组件类中定义事件处理函数。例如&#xff0c;可以在组件类中定义一个叫做handleClick的事件…

比瓴科技入围软件供应链安全赛道!为关键信息基础设施安全建设注入新动力

1月20日&#xff0c;中关村华安关键信息基础设施安全保护联盟会员大会暨关键信息基础设施安全保护论坛在北京成功举办&#xff0c;比瓴科技作为会员单位受邀出席。 本次论坛发布了《关键信息基础设施安全保护支撑能力白皮书&#xff08;2023&#xff09;》&#xff0c;比瓴科技…

蓝桥杯客观题练习笔记

1、蓝桥杯单片机开发板供电为几V&#xff1f; 5V DS18B20是单工通讯&#xff0c;还是半双工通讯&#xff1f;为什么&#xff1f; 将时钟线&#xff0c;数据线&#xff0c;控制线和电源线合并为一根线&#xff0c;只要符合单总线协议的器件都能以从机的身份挂接到单总线上与主…

【开源】SpringBoot框架开发大学计算机课程管理平台

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 实验课程档案模块2.2 实验资源模块2.3 学生实验模块 三、系统设计3.1 用例设计3.2 数据库设计3.2.1 实验课程档案表3.2.2 实验资源表3.2.3 学生实验表 四、系统展示五、核心代码5.1 一键生成实验5.2 提交实验5.3 批阅实…

许昌路280号3号办公楼室内精装修项目

许昌路280号3号办公楼室内精装修项目 (招标编号:2024-上水管线-201) 项目所在地区:上海市 一、招标条件 本许昌路280号3号办公楼室内精装修项目已由项目审批/核准/备案机关批准&#xff0c;项目资金来源为国有资金0万元:私有资金0万元;境外资金0万元;自筹资金480万元;外国政府及…

获取真实 IP 地址(二):绕过 CDN(附链接)

一、DNS历史解析记录 DNS 历史解析记录指的是一个域名在过去的某个时间点上的DNS解析信息记录。这些记录包含了该域名过去使用的IP地址、MX记录&#xff08;邮件服务器&#xff09;、CNAME记录&#xff08;别名记录&#xff09;等 DNS 信息。DNS 历史记录对于网络管理员、安全研…

架构师为什么要写文档?又该如何写文档呢?

大家好&#xff0c;我是冰河~~ 最近有很多小伙伴&#xff0c;也不乏身边的一些同事问我&#xff1a;哎&#xff0c;架构师为什么要写这么多文档啊&#xff1f;有啥用呢&#xff1f;不能跟开发一样多写写代码吗&#xff1f;天天写文档&#xff0c;又感觉自己的文档写不好&#…

2023年12月CCF-GESP编程能力等级认证Python编程五级真题解析

Python等级认证GESP(1~6级)全部真题・点这里 一、单选题(共15题,共30分) 第1题 通讯卫星在通信网络系统中主要起到( )的作用。 A:信息过滤 B:信号中继 C:避免攻击 D:数据加密 答案:B 第2题 小杨想编写一个判断任意输入的整数N是否为素数的程序,下面哪个方法…

怪物联萌小游戏

欢迎来到程序小院 怪物联萌 玩法&#xff1a;将怪物连体&#xff0c;怪物只能直线上下左右移动&#xff0c;躲过障碍物&#xff0c;共16关卡&#xff0c; 不同关卡不同界面&#xff0c;遇到金币记得吃掉金币哦&#xff0c;会获得更高分数&#xff0c;快去闯关吧^^。开始游戏ht…

纯血鸿蒙来了,鸿蒙App开发该如何提速

“全世界做产品挣钱的公司很多&#xff0c;但有能力打造操作系统的公司没有几家&#xff0c;最后世界上的操作系统就只有三套&#xff1a;鸿蒙、iOS和安卓。” --- 360集团创始人、董事长周鸿祎 “HarmonyOS实现了AI框架、大模型、设计系统、编程框架、编程语言、编译器等全栈…

git的分支操作

目录 简介&#xff1a; 操作&#xff1a;查看 操作&#xff1a;创建 操作&#xff1a;切换​编辑 操作&#xff1a;本地分支推送到远程 操作&#xff1a;git merge [name]合并分支​编辑 简介&#xff1a; 在Git中&#xff0c;可以通过分支来管理和处理不同的版本和功能。分…