阿里mysql 二进制_Mysql binlog 之阿里canal

1、What is Canal?

canal [kə'næl],中文翻译为 水道/管道/沟渠/运河,主要用途是用于 MySQL 数据库增量日志数据的订阅、消费和解析,是阿里巴巴开发并开源的,采用Java语言开发;

历史背景是早期阿里巴巴因为杭州和美国双机房部署,存在跨机房数据同步的业务需求,实现方式主要是基于业务 trigger(触发器) 获取增量变更。从2010年开始,阿里巴巴逐步尝试采用解析数据库日志获取增量变更进行同步,由此衍生出了canal项目;

Github:https://github.com/alibaba/canal

2、工作原理

传统MySQL主从复制工作原理

91fb1300912bbf496eb37da5b5ef0f5f.png

从上层来看,复制分成三步:

MySQL的主从复制将经过如下步骤:

1、当 master 主服务器上的数据发生改变时,则将其改变写入二进制事件日志文件中;

2、salve 从服务器会在一定时间间隔内对 master 主服务器上的二进制日志进行探测,探测其是否发生过改变,如果探测到 master 主服务器的二进制事件日志发生了改变,则开始一个 I/O Thread 请求 master 二进制事件日志;

3、同时 master 主服务器为每个 I/O Thread 启动一个dump  Thread,用于向其发送二进制事件日志;

4、slave 从服务器将接收到的二进制事件日志保存至自己本地的中继日志文件中;

5、salve 从服务器将启动 SQL Thread 从中继日志中读取二进制日志,在本地重放,使得其数据和主服务器保持一致;

6、最后 I/O Thread 和 SQL Thread 将进入睡眠状态,等待下一次被唤醒;

canal 工作原理

1、canal 模拟 MySQL slave 的交互协议,把自己伪装为 MySQL slave,向 MySQL master 发送dump 协议;

2、MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即canal );

3、canal 解析 binary log 对象 (原始数据为byte流)

ba526d41757c9074fed7da2979960732.png

8027f67e66334b6718283810a6a6e22f.png

3、Canal使用场景

Canal是基于MySQL变更日志增量订阅和消费的组件,可以使用在如下一些一些应用场景:

数据库实时备份

业务cache刷新

search build

价格变化等重要业务消息

带业务逻辑的增量数据处理

跨数据库的数据备份(异构数据同步),

例如mysql => oracle,mysql=>mongo,mysql =>redis,

mysql => elasticsearch等;

当前canal 主要是支持源端 MySQL(也支持mariaDB),版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x;Canal搭建环境

1、准备好MySQL运行环境;

2、开启 MySQL的binlog写入功能,配置 binlog-format 为 ROW 模式,my.cnf中配置如下:

[mysqld]

log-bin=mysql-bin #开启 binlog

binlog-format=ROW #选择 ROW 模式

server_id=1 #配置MySQL replaction需要定义,不要和canal的 slaveId重复

3、授权canal连接MySQL账号具有作为MySQL slave的权限, 如果已有账户可直接 grant授权:

启动MySQL服务器;

登录mysql:./mysql -uroot -p -h127.0.0.1 -P3306

CREATE USER canal IDENTIFIED BY 'canal';

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';

FLUSH PRIVILEGES;

4、下载 canal部署程序

Wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz

tar -zxvf canal.deployer-1.1.4.tar.gz -C /usr/local/canal.deployer-1.1.4

5、配置修改

vim conf/example/instance.properties

主要是修改配置文件中与自己的数据库配置相关的信息;

6、启动Canal

./startup.sh

7、查看进程:

ps -ef | grep canal

8、查看 server 日志

cat logs/canal/canal.log

9、查看 instance 的日志

vi logs/example/example.log

10、关闭Canal

./stop.sh

canal server的默认端口号为:11111,如果需要调整的话,可以去到\conf目录底下的canal.properties文件中进行修改;

相关命令:

#是否启用了日志

show variables like 'log_bin';

#怎样知道当前的日志

show master status;

#查看mysql binlog模式

show variables like 'binlog_format';

#获取binlog文件列表

show binary logs;

#查看当前正在写入的binlog文件

show master status\G

#查看指定binlog文件的内容

show binlog events in 'mysql-bin.000002';

注意binlog日志格式要求为row格式;

Binlog的三种基本类型分别为:

ROW模式 除了记录sql语句之外,还会记录每个字段的变化情况,能够清楚的记录每行数据的变化历史,但是会占用较多的空间,需要使用mysqlbinlog工具进行查看;

STATEMENT模式只记录了sql语句,但是没有记录上下文信息,在进行数据恢复的时候可能会导致数据的丢失情况;

MIX模式比较灵活的记录,例如说当遇到了表结构变更的时候,就会记录为statement模式。当遇到了数据更新或者删除情况下就会变为row模式;

启动了canal的server之后,便是基于java的客户端搭建了;

代码集成方式:

com.alibaba.otter

canal.client

1.1.4

package com.unwulian.search.engine.suggestion.service;

import com.alibaba.otter.canal.client.CanalConnector;

import com.alibaba.otter.canal.client.CanalConnectors;

import com.alibaba.otter.canal.protocol.CanalEntry.*;

import com.alibaba.otter.canal.protocol.Message;

import java.net.InetSocketAddress;

import java.util.List;

/**

* canal测试

*

* @author shiye

* @create 2020-11-30 14:22

*/

public class CanalTest {

public static void main(String[] args) {

String ip = "192.168.2.165";

int port = 11111;

String destination = "example";

String username = "";

String password = "";

CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(ip, port), destination, username, password);

try {

connector.connect();

connector.subscribe(".*\\..*");

//跳转到上次进行读取日志的地方

connector.rollback();

while (true) {

//获取指定数量的数据

Message message = connector.getWithoutAck(1);

long id = message.getId();

int size = message.getEntries().size();

if (id == -1 || size == 0) {

//如果没有获取到数据就睡眠疫苗

Thread.sleep(1000);

} else {

System.out.println("messge id:" + id);

printEntry(message.getEntries());

}

//提交确认

connector.ack(id);

// connector.rollback(batchId); // 处理失败, 回滚数据

}

} catch (Exception e) {

e.printStackTrace();

} finally {

connector.disconnect();

}

}

private static void printEntry(List entrys) {

for (Entry entry : entrys) {

if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {

continue;

}

RowChange rowChage = null;

try {

rowChage = RowChange.parseFrom(entry.getStoreValue());

} catch (Exception e) {

throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);

}

EventType eventType = rowChage.getEventType();

System.out.println(String.format("================> binlog日志偏移量[%s:%s] , 库名,表名[%s,%s] , 操作类型 : %s",

entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),

entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),

eventType));

for (RowData rowData : rowChage.getRowDatasList()) {

if (eventType == EventType.DELETE) {

printColumn(rowData.getBeforeColumnsList());

} else if (eventType == EventType.INSERT) {

printColumn(rowData.getAfterColumnsList());

} else {

System.out.println("-------> before");

printColumn(rowData.getBeforeColumnsList());

System.out.println("-------> after");

printColumn(rowData.getAfterColumnsList());

}

}

}

}

private static void printColumn(List columns) {

for (Column column : columns) {

System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());

}

}

}

springboo集成canal# 阿里binlog canal配置

canal:

ip: 192.168.2.13   #192.168.2.165

subscribe: undev.t_bas_xxx1,undev.t_bas_xxx2#配置你要监听的表

port: 11111

destination: dev

username:

password:

package com.unwulian.search.engine.suggestion.config;

import org.springframework.boot.context.properties.ConfigurationProperties;

import org.springframework.context.annotation.Configuration;

import java.io.Serializable;

/**

* binlog canal的配置

*

* @author shiye

* @create 2020-07-17 19:30

*/

@Configuration

@ConfigurationProperties(prefix = "canal")

public class CanalConfig implements Serializable {

/**

* ip

*/

private String ip;

/**

* mq监听表

*/

private String subscribe;

/**

* 端口

*/

private int port;

/**

* 目的地

*/

private String destination;

/**

* 用户名

*/

private String username = "";

/**

* 密码

*/

private String password;

public String getSubscribe() {

return subscribe;

}

public void setSubscribe(String subscribe) {

this.subscribe = subscribe;

}

public String getIp() {

return ip;

}

public void setIp(String ip) {

this.ip = ip;

}

public int getPort() {

return port;

}

public void setPort(int port) {

this.port = port;

}

public String getDestination() {

return destination;

}

public void setDestination(String destination) {

this.destination = destination;

}

public String getUsername() {

return username;

}

public void setUsername(String username) {

this.username = username;

}

public String getPassword() {

return password;

}

public void setPassword(String password) {

this.password = password;

}

}

package com.unwulian.search.engine.suggestion.schedule;

import com.alibaba.otter.canal.client.CanalConnector;

import com.alibaba.otter.canal.client.CanalConnectors;

import com.alibaba.otter.canal.protocol.CanalEntry;

import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;

import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;

import com.alibaba.otter.canal.protocol.Message;

import com.github.structlog4j.ILogger;

import com.github.structlog4j.SLoggerFactory;

import com.unwulian.search.engine.suggestion.config.CanalConfig;

import com.unwulian.search.engine.suggestion.service.CardService;

import com.unwulian.search.engine.suggestion.service.CommunityStructService;

import com.unwulian.search.engine.suggestion.service.HouseService;

import com.unwulian.search.engine.suggestion.service.RoomService;

import org.springframework.beans.factory.InitializingBean;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Component;

import java.net.InetSocketAddress;

import java.util.List;

/**

* 项目启动的时候就初始化canal启动一个线程去监听canal server

*

* @author shiye

* @create 2020-11-30 15:11

*/

@Component

public class CanalTask implements InitializingBean {

private static final ILogger logger = SLoggerFactory.getLogger(CanalTask.class);

@Autowired

private CanalConfig canalConfig;

@Override

public void afterPropertiesSet() throws Exception {

/**

* 启动一下线程一直监听canal server

*/

new Thread(() -> {

logger.info("start Thread to listent canal success....");

CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalConfig.getIp(), canalConfig.getPort()),

canalConfig.getDestination(),

canalConfig.getUsername(),

canalConfig.getPassword());

connector.connect();

connector.subscribe(canalConfig.getSubscribe());

//跳转到上次进行读取日志的地方

connector.rollback();

try {

while (true) {

//获取指定数量的数据

Message message = connector.getWithoutAck(1);

long id = message.getId();

int size = message.getEntries().size();

if (id == -1 || size == 0) {

//如果没有获取到数据就睡眠1s

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

logger.error("sleep 1000ms error...." + e.getMessage());

}

} else {

//处理消息

//logger.info("messge id:" + id);

handlerEntry(message.getEntries());

}

//提交确认

connector.ack(id);

// connector.rollback(batchId); // 处理失败, 回滚数据

}

} finally {

//关闭

connector.disconnect();

}

}).start();

logger.info("start Thread to listent canal ....");

}

/**

* 处理消息

*

* @param entrys

*/

private void handlerEntry(List entrys) {

for (CanalEntry.Entry entry : entrys) {

if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {

//类型是事务开始事务结束不做处理

continue;

}

//库名

//String databaseName = entry.getHeader().getSchemaName();

//表名

String tableName = entry.getHeader().getTableName();

RowChange rowChage = null;

try {

rowChage = RowChange.parseFrom(entry.getStoreValue());

} catch (Exception e) {

logger.error("ERROR 数据转换异常, data:" + entry.toString(), e);

}

switch (tableName) {

case "t_bas_xxx1":

//进行你的业务处理

break;

case "t_bas_xxx2":

//进行你的业务处理

break;

default:

return;

}

}

}

private static void printColumn(List columns) {

for (CanalEntry.Column column : columns) {

System.out.println(column.getName() + " : " + column.getValue() + "    不做处理=" + column.getUpdated());

}

}

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/442436.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

java 基本类型 引用类型_Java中的基本类型和引用类型变量的区别

基本类型:基本类型自然不用说了,它的值就是一个数字,一个字符或一个布尔值。引用类型:是一个对象类型,值是什么呢?它的值是指向内存空间的引用,就是地址,所指向的内存中保存着变量所…

git 怎么提交忽略文件夹_git 设置忽略文件提交的几种方式

在使用git进行项目管理的时候,有时候一些安装包之类,或者自己本地项目使用的一些编译文件,在不需要提交到远程仓库时,可以通过以下几种方式设置忽略提交,包括文件夹和单个文件.之前自己项目里面采用了第二种方法进行了设置,但是时间久远竟然忘记了.....,导致我有一些文件始终无…

java获取行号_java – 如何获取一个方法的行号?

我想做同样的事情,经过一些研究,就解决了javassist.您将需要添加javassist(我使用版本3.15.0-GA).给定以下类确定“x”方法的位置.方法名称“x”是硬编码的,但是如果你和我在一样的船上,反射并不困难,所以我相信你可以得到一个方法名称列表,然后下面将让你得到行号的方法&#…

java测试类和类_【测试开发】从测试角度看Java异常类(错误和异常区别介绍)

在 Java 中,所有的异常都有一个共同的祖先 Throwable(可抛出)。Throwable 指定代码中可用异常传播机制通过 Java 应用程序传输的任何问题的共性。Throwable 有两个重要的子类:Exception(异常)和 Error(错误),二者都是 Java 异常处理的重要子类…

mysql 学生成绩等级_JSP+SSM+Mysql实现的学生成绩管理系统

项目简介本系统是基于JSPSSMMysql实现的学生成绩管理系统。主要实现的功能有教师管理、学生管理、课程管理、学生成绩管理。难度等级:中等技术栈编辑器Eclipse Version: 2020-03 (4.15.0)前端技术基础:htmlcssJavaScript框架:JQueryH-ui后端技…

java ajax 导出excel文件_springMVC(4)---生成excel文件并导出

springMVC(4)---生成excel文件并导出在开发过程中,需要将数据库中的数据以excel表格的方式导出。首先说明。我这里用的是Apache的POI项目,它是目前比较成熟的HSSF接口,用来处理Excel对象。其实POI不仅仅只能处理excel,它还可以处理…

java swing 模拟发牌_用java设计一个发牌程序

展开全部// 发牌程序。import java.awt.*;import java.awt.event.*;import javax.swing.*;public class CardBuffer //加互斥锁的缓冲区{private int value;private boolean isEmpty true; //value是否为空的信号量private int order0; //信号量,e68a8462616964757…

python一节课多久_第一节课 python简介

标签:一、python特性概要1. Python是解释性语言。我们和c比较一下。2. Python特性总结:字节码、动态、缩进2.1 字节码2.2 动态语义 在赋值时确定数据类型2.3 缩进3. Python之禅解释性语言的内部机制,在运行脚本之前,得到结果之后&…

aspose转pdf横版_aspose实现Office转Pdf

标签:aspose实现Office转Pdf关键代码:jar包:aspose-words-14.6.0.jaraspose-cells-10.8.jaraspose.slides-14.4.0.jaraspose-diagram-2.1.0.jarprotected void realTransform(InputStream in, OutputStream out) throws IOException{String l…

java -jar 内存溢出_JAVA系统启动栈内存溢出-StackOverflowError

JAVA系统启动栈内存溢出-StackOverflowError线上服务器启动报错日志如下:Caused by: java.lang.IllegalStateException: Unable to complete the scan for annotations for web application [] due to a StackOverflowError. Possible root causes include a too lo…

java培训就是害人的_[Java教程]粗心害死人啊,我的天。

[Java教程]粗心害死人啊,我的天。02016-08-16 22:00:551 (上述代码是改过后正确的)今天在用ajax的时候!出现了愚蠢的错误!由于括号太多加上自己又粗心了!把最后的第68行的发送请求写到了第63行的大括号里变了~!导致我找…

Java写文件导致io过高_161108、Java IO流读写文件的几个注意点

平时写IO相关代码机会挺少的,但却都知道使用BufferedXXXX来读写效率高,没想到里面还有这么多陷阱,这两天突然被其中一个陷阱折腾一下:读一个文件,然后写到另外一个文件,前后两个文件居然不一样?…

java see 方法_Java 反射常用方法

类名用途Class类代表类的实体,在运行的Java应用程序中表示类和接口Field类代表类的成员变量(成员变量也称为类的属性)Method类代表类的方法Constructor类代表类的构造方法Class类Class代表类的实体,在运行的Java应用程序中表示类和接口。在这个类中提供了…

java接口测试框架搭建_接口自动化测试框架搭建

一、原理及特点参数放在XML文件中进行管理用httpClient简单封装一个httpUtils工具类测试用例管理使用了testNg管理,使用了TestNG参数化测试,通过xml文件来执行case。测试报告这里用到第三方的包ReportNG 项目组织用Maven二、准备使用工具:ecl…

java 树状数据算法_使用递归算法结合数据库解析成Java树形结构的代码解析

这篇文章主要介绍了使用递归算法结合数据库解析成Java树形结构的代码解析的相关资料,需要的朋友可以参考下1、准备表结构及对应的表数据a、表结构:create table TB_TREE(CID NUMBER not null,CNAME VARCHAR2(50),PID NUMBER //父节点)b、表数据:insert i…

java工厂模式 uml_深入浅出设计模式-简单工厂模式

模式定义简单工厂模式是属于创建型模式,又叫做静态工厂方法(Static Factory Method)模式,但不属于23种GOF设计模式之一。简单工厂模式定义了一个创建对象的类,由这个类来封装实例化对象的行为。设计原则遵循的原则:依赖倒置原则迪…

java技术难点_Java核心技术第四章----对象与类重难点总结

一、类之间的关系类和类之间的关系,耦合度从高到低:is -a。继承、实现has-a。组合、聚合、关联user-a。依赖。要求是:高内聚、低耦合。继承(“is-a”)继承(Inheritance),即“is-a”关系,是一种用于表示特殊与一般关系的…

java日志级别的作用_Java系统日志级别对性能的影响性

先介绍下java系统的日志日志框架:是一种日志接口,不负责具体的日志输出形式(有点类似于JDBC),可以灵活的切换日志输出形式。常见的日志框架有slf4j、jcl,只提供Logger、LoggerFactory等接口日志系统:是应用实际使用的日…

java用链表做学生系统_C语言链表实现学生管理系统

本文实例为大家分享了C语言链表实现学生管理系统的具体代码,供大家参考,具体内容如下#include#include#include#include#include#includeusing namespace std;typedef struct ndoe{char id[10];char name[10];char sex[3];char num[10];struct node *nex…

llinux mysql_linux下安装mysql

环境:OS:Linux As 5mysql:5.61.下载跟OS相应的版本[rootnode2 soft]# uname -aLinux node2 2.6.18-274.el5 #1 SMP Fri Jul 8 17:36:59 EDT 2011 x86_64 x86_64 x86_64 GNU/Linux我这里是64位的linux,所有相应的下载64位的mysqlMySQL-5.6.19-1.rhel5.x86_64.rpm-bundle.tar2.解…