基于 Maxwell 实现 MySQL 数据实时迁移到 Mongodb

在 DB 运维层而非应用层实现需求,以降低应用层的业务侵入性及性能影响。

maxwell 是一款 ETL 工具,基本原理是 实时解析 MySQL 的 binlog 丢到相应的 MQ 中供具体业务逻辑去消费。

比如最典型的一种大数据日志路径:

mysql binlog -> maxwell -> kafka

站内搜索引擎的路径:

mysql binlog -> maxwell -> mq -> logstash -> elasticsearch

Redis 路径:

mysql binlog -> maxwell -> redis

这几种路径都可以通过进一步消费来达成 数据迁移到 Mongodb 的目的,但是依赖路径稍长,耗费更多服务器资源。

反复比较后,选择利用 maxwell 的 custom producer 机制实现直接写入数据到 mongodb。搜索了一下,网上似乎没有直接实现的适配,需要自助。

参考文档:https://maxwells-daemon.io/producers/#custom-producer

准备 maxwell 环境

maxwell 和 canal 等一样,都是Java开发的应用,因此首先要准备好Java 开发环境。

然后下载 maxwell 运行包,以便得到依赖的 jar lib、以及进行相应调试。

安装指引:https://maxwells-daemon.io/quickstart/

 /opt/maxwell/bin/maxwell --config /opt/maxwell/config.properties

主要用到的 jar 包 maxwell-*.jar 在 /opt/maxwell/lib/ 目录下。

配置 maven 项目

<dependencies><dependency><groupId>com.zendesk</groupId><artifactId>maxwell</artifactId><version>1.44.0</version><scope>system</scope><systemPath>/opt/maxwell/lib/maxwell-1.44.0.jar</systemPath></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>slf4j-api</artifactId><version>2.0.0</version><scope>system</scope><systemPath>/opt/maxwell/lib/slf4j-api-2.0.0.jar</systemPath></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>2.17.1</version><scope>system</scope><systemPath>/opt/maxwell/lib/log4j-core-2.17.1.jar</systemPath></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j2-impl</artifactId><version>2.24.3</version><scope>system</scope><systemPath>/opt/maxwell/lib/log4j-slf4j2-impl-2.24.3.jar</systemPath></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.11</version><scope>system</scope><systemPath>/opt/maxwell/lib/commons-lang3-3.11.jar</systemPath></dependency><dependency><groupId>org.mongodb</groupId><artifactId>mongodb-driver-sync</artifactId><version>5.5.1</version><type>jar</type></dependency></dependencies>

代码

参照官方 Example 代码和 Redis Producer 的写法,实现一个基本可用的代码。

两个类:

MaxwellMongodbProducer.java

package com.abc.maxwell.producer;import com.zendesk.maxwell.MaxwellContext;
import com.zendesk.maxwell.producer.AbstractProducer;
import com.zendesk.maxwell.row.RowMap;
import com.zendesk.maxwell.util.StoppableTask;import java.util.ArrayList;
import java.util.Collection;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;import com.mongodb.client.MongoDatabase;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import static com.mongodb.client.model.Filters.eq;
import com.mongodb.client.model.Updates;
import org.bson.Document;
import org.bson.conversions.Bson;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.lang3.StringUtils;/****/
public class MaxwellMongodbProducer extends AbstractProducer implements StoppableTask {private static final Logger logger = LoggerFactory.getLogger(MaxwellMongodbProducer.class);private static MongoClient mongoClient;private final String dbUri;private final String targetDbName;private String[] assignedDbNames = null;private String[] assignedTableNames = null;public MaxwellMongodbProducer(MaxwellContext context) {super(context);Properties props = context.getConfig().customProducerProperties;dbUri = props.getProperty("target_db_uri", "mongodb://localhost/");String dbNames = props.getProperty("assigned_dbs");if (dbNames != null) {assignedDbNames = dbNames.split(",");}String tableNames = props.getProperty("assigned_tables");if (tableNames != null) {assignedTableNames = tableNames.split(",");}targetDbName = props.getProperty("target_db_name");}@Overridepublic void push(RowMap r) throws Exception {if (!r.shouldOutput(outputConfig)) {context.setPosition(r.getNextPosition());return;}boolean sentToMongodb = false;for (int cxErrors = 0; cxErrors < 2; cxErrors++) {try {this.sendToMongodb(r);sentToMongodb = true;break;} catch (Exception e) {logger.error("Exception during put", e);if (!context.getConfig().ignoreProducerError) {throw new RuntimeException(e);}}}if (r.isTXCommit()) {context.setPosition(r.getNextPosition());}}private void sendToMongodb(RowMap msg) throws Exception {if (assignedDbNames != null && !Arrays.asList(assignedDbNames).contains(msg.getDatabase())) {return;}if (assignedTableNames != null && !Arrays.asList(assignedTableNames).contains(msg.getTable())) {return;}if (logger.isDebugEnabled()) {logger.debug("->  mongodb sync msg:{}", msg);}String pk = "id"; // 假定主键都是idif (msg.getRowType().contains("insert")) {createCollection(msg.getDatabase(), msg.getTable());Document doc = new Document(msg.getData());getCollection(msg.getDatabase(), msg.getTable()).insertOne(doc);} else if (msg.getRowType().contains("update")) {Long id = (Long) msg.getData().get(pk);if (id <= 0) {return;}Bson updateQuery = eq(pk, id);List<Bson> updates = new ArrayList<>();if (msg.getData() != null) {for (Map.Entry entry : msg.getData().entrySet()) {updates.add(Updates.set((String) entry.getKey(), entry.getValue()));}getCollection(msg.getDatabase(), msg.getTable()).updateOne(updateQuery,Updates.combine(updates));}} else if (msg.getRowType().contains("delete")) {Document doc = new Document(msg.getData());Long id = (Long) doc.get(pk);if (id <= 0) {return;}Bson deleteQuery = eq(pk, id);getCollection(msg.getDatabase(), msg.getTable()).deleteOne(deleteQuery);} else {logger.error("unsupported msg type", msg.getRowType());}}protected MongoCollection<Document> getCollection(String dbName, String collectionName) {return getDb(dbName).getCollection(collectionName);}protected void createCollection(String dbName, String collectionName) {try {getDb(dbName).createCollection(collectionName);} catch (Exception e) {System.out.println(e);}}protected MongoDatabase getDb(String dbName) {return getClient().getDatabase(targetDbName(dbName));}private String targetDbName(String dbName) {return !StringUtils.isBlank(targetDbName) ? targetDbName : dbName;}private MongoClient getClient() {if (mongoClient == null) {mongoClient = MongoClients.create(dbUri);}return mongoClient;}@Overridepublic void requestStop() {getClient().close();}@Overridepublic void awaitStop(Long timeout) {}@Overridepublic StoppableTask getStoppableTask() {return this;}}

工厂类 MaxwellMongodbProducerFactory.java:


package com.abc.maxwell.producer;import com.zendesk.maxwell.MaxwellContext;
import com.zendesk.maxwell.producer.AbstractProducer;
import com.zendesk.maxwell.producer.ProducerFactory;/****/
public class MaxwellMongodbProducerFactory implements ProducerFactory {@Overridepublic AbstractProducer createProducer(MaxwellContext context) {return new MaxwellMongodbProducer(context);}
}

调试

将 生成 jar 包的目标路径指向 /opt/maxwell/lib 目录。

然后修改 maxwell 配置文件, /opt/maxwell/config.properties

custom_producer.factory=包名加类名。target_db_uri="mongodb://localhost/
target_db_name=
assigned_dbs=
assigned_tables=

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/953771.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

CSP2025-S 坠机记

CSP2025-S 坠机记 省流:未能完成 AK 的心愿。初赛随便考的,94 分。 复赛依然是在七高,成功在 14:20 抵达教室,发现我所在的机房电脑性能要烂一些,然而键盘要好一些,应该是好事。位置是靠窗,比较好,开考前准备写…

世界计划:无法歌唱的初音未来

标题是骗你点进来的。 Day 0 上午爽爽玩神秘构造小游戏,连砍到 C 不知道多少。 然后写了点 AGC 题,感觉都很水啊,自己强爆了。 中午回去刷本,fgo 真特么好玩。 下午进考场,发现自己的座位附近散布了一堆熟人,但貌…

11.2 每日一题 赦免战俘

本题要求将一个矩阵分为四个子矩阵,在将四个子矩阵不断继续分成个子矩阵,直到无法分出,并且每次对左上角的矩阵进行处理。 本题可用dfs对矩阵不断进行划分如图所示,我们的目标为对每次划分出来的矩阵的左上角进行处…

jenkins安装排错

jenkins安装排错[root@localhost ~]# docker history jenkins/jenkins:2.497-jdk17 IMAGE CREATED CREATED BY SIZE COMMENT 2cef2c7f7799 8 months ago…

一、RK3562板卡上手

一、概述 板卡是搞活动的时候一块99元的RK3562,配置1GB的DDR、8GB的EmmC。 二、如何使用USB烧录系统到Emmc安装DriverAssitant_v5.13,安装瑞芯微的USB驱动,先点击卸载再点击安装 安装RKDevTool_Release_v3.31,安装…

【题解】CCPC 2024 Jinan Site [J] Temperance

题目链接 CCPC 2024 Jinan Site [J] Temperance 题目大意 题干看上去很复杂,但是我们可以发现,一个植物如果合法,那一定意味着它对应的 \(xy\) \(yz\) \(xz\)平面中,至少有一个平面整个平面合法,如果一个点不合法…

2025 年 11 月金属件去毛刺机,五金去毛刺机,自动去毛刺机厂家最新推荐,聚焦资质、案例、售后的五家机构深度解读!

在制造业精细化发展趋势下,金属件去毛刺机、五金去毛刺机、自动去毛刺机的品质直接决定零件加工质量与生产效率。为助力企业精准选型,十家行业权威机构联合开展测评,从厂家资质(生产认证、专利数量)、实际案例(不…

原来求凸包这么简单

awd我发现我没正经学过求凸包,导致经常会在排序的 corner cases 上想很久。 今天终于发现,原来排序一遍求上凸包,再求下凸包的时候直接 reverse 一下再跑一遍就好了,不用改 cross 的大于号小于号,连拼接上下凸包的…

2025 年 11 月全自动激光去毛刺机,金属件去毛刺机,自动去毛刺机厂家最新推荐,精准检测与稳定性能深度解析!

在制造业精度要求不断提升的背景下,全自动激光去毛刺机、金属件去毛刺机、自动去毛刺机的品质与性能,直接影响产品生产效率与质量。为帮助企业精准筛选优质设备,行业权威协会近期开展专项测评,通过对设备加工精度、…

2025 年 11 月数控激光去毛刺机,冲压件去毛刺机,精密去毛刺机厂家最新推荐,实力品牌深度解析采购无忧之选!

随着制造业对零件精度要求的不断提高,数控激光去毛刺机、冲压件去毛刺机、精密去毛刺机已成为提升产品质量的关键设备。为帮助企业精准选购,行业权威协会开展专项测评,从设备精度、运行稳定性、材料适配性、能耗控制…

AT ARC156C Tree and LCS 题解

Solutionlink 贪心考虑,要使得 \(x, P\) 最小,要么出现的共同节点最少,要么共同节点尽可能出现在某一(些)节点的异侧。从极端情况出发,如果 \(|x| = |P| = 1\),显然 \(\text{LCS} = 1\);如果 \(|x| = |P| = n\…

2025 年 11 月回转式风机厂家最新推荐,实力品牌深度解析采购无忧之选!

当前工业领域对回转式风机的品质与服务要求不断提升,为帮助企业精准筛选实力品牌、实现无忧采购,本次推荐基于行业权威协会的全面测评。测评历时 3 个月,覆盖数十家主流厂家,采用 “多维度实力评估 + 实地考察验证…

CSPT漏洞浅析

CSPT全称是Client-Side Path Traversal ,即客户端路径遍历。概念说明CSPT 全称 Client-Side Path Traversal(客户端路径遍历),是一种针对前端应用的漏洞,核心是攻击者通过篡改 URL 参数、请求参数等,让浏览器(客…

【题解】CCPC 2024 Jinan Site [F] The Hermit

题目链接 CCPC 2024 Jinan Site [F] The Hermit 题目大意 给定一个 \({1, 2, 3 ... m}\) 的集合 \(U\) ,要求从中抽取 \(n\) 个数组成子集 \(S\) ,对于每个 \(S \subset U\),定义 \(gcd(S) \neq min(S)\) 为合法,现…

Ubunt 搭建Samba服务

用户及目录结构管理 创建用户 sudo groupadd fileusers创建个人目录和用户 sudo mkdir -p /data/dataShare sudo useradd -d /data/dataShare/xiaoming -g fileusers -s /usr/sbin/nologin xiaoming规划创建目录结构 s…

2025 年 11 月精密无缝钢管,镀锌无缝钢管,定制无缝钢管厂家最新推荐,产能、专利、环保三维数据透视!

近期,行业权威协会针对精密无缝钢管、镀锌无缝钢管、定制无缝钢管领域开展 2025 年 11 月专项测评,测评覆盖近百家主流厂家,以 “产能规模、专利技术、环保标准” 为三维核心指标,结合产品质量、售后服务进行综合评…

2025 年 11 月合金无缝钢管,大口径无缝钢管,厚壁无缝钢管厂家最新推荐,技术实力与市场口碑深度解析!

近期,行业权威协会针对合金无缝钢管、大口径无缝钢管、厚壁无缝钢管领域开展 2025 年 11 月专项测评,测评覆盖近百家主流厂家,以 “技术实力、市场口碑” 为核心维度,结合产品质量、售后服务进行综合评估。技术实力…

题解:AT_abc131_e [ABC131E] Friendships

前言 这是本人第一篇题解。 题意 构造一个简单图(没有重边与自环)。图中总共有 $N$ 个节点,分别为 $1$ 到 $N$。总共有 $M$ 条边,每一条边的长度均为 $1$。有且仅有 $K$ 对节点 $(u,v)$ 满足 $u$ 到 $v$ 的最短距离…

C 运算符、表达式、语句

本文记录一下,在C语言中的基本概念;在很多书籍中运算符、表达式、语句会被分为多个章节,细致地介绍;但本人记录这个系列主要是为了拾遗,所以就将觉得有必要提及的知识点记录下来。 运算符大概分为算术运算、逻辑运…

题解:AT_abc036_d [ABC036D] 塗り絵

题意 有一棵由 $N$ 个点 $N-1$ 条边构成的树,求出树上每个点染成白色或黑色,但相邻两个点不能同时染成黑色的染色方案数量,并取模 $10^9+7$。 思路 对于这种求合法方案数的题目,一般可以考虑 $dp$ 。设 $dp_{i,1}$…