ElasticSearch与MySQL如何进行数据同步?

ElasticSearch(ES)与MySQL进行数据同步的需求在实际开发中非常常见,尤其是在需要高效的全文搜索或者数据分析时,通常使用MySQL作为事务数据库,ES作为搜索和查询引擎。那么要实现MySQL与ElasticSearch的数据同步,可以采取多种方式。

常见的同步方式

  1. 手动同步
    • 在每次对MySQL进行增删改操作时,手动将数据更新到ElasticSearch。这种方法适用于小型项目,但在数据量大和频繁更新的场景下不太适用。
  2. 定时同步(全量/增量同步)
    • 定期从MySQL拉取数据(全量或增量),然后将数据同步到ElasticSearch中。例如,使用定时任务每隔一段时间执行同步。
  3. 使用数据库的增量日志(Binlog)进行同步
    • 通过捕获MySQL的Binlog(增量日志),当MySQL的数据发生变化时,实时同步到ElasticSearch。这种方式更加实时,且不需要定时全量更新。

具体的实现方案

方案一:基于消息队列的同步方案
  1. 数据写入MySQL时,发送同步消息到消息队列

    • 当应用向MySQL写入数据时,同时将数据变动的消息发送到消息队列(如RabbitMQ、Kafka等)。
  2. 消费者监听消息并同步数据到ElasticSearch

    • 消费者监听消息队列的变动消息,将数据同步到ElasticSearch。

优点

  • 保证实时性。
  • 能够处理高并发。

实现步骤

  • 应用在插入、更新或删除数据时,发送操作类型(如CREATEUPDATEDELETE)和数据内容到消息队列。
  • 消息消费者从队列中读取消息,根据操作类型将数据插入、更新或删除到ElasticSearch中。

示例代码

1. MySQL插入操作发送消息

// 保存数据到MySQL
orderMapper.insert(order);// 发送消息到消息队列(以RabbitMQ为例)
rabbitTemplate.convertAndSend("orderSyncQueue", order);

2. 消费者同步数据到ElasticSearch

@RabbitListener(queues = "orderSyncQueue")
public void syncOrderToES(Order order) {// 判断操作类型,插入或更新ES中的数据IndexRequest indexRequest = new IndexRequest("orders").id(order.getId().toString()).source(order);elasticsearchClient.index(indexRequest, RequestOptions.DEFAULT);
}
zhuy
方案二:基于Binlog的实时同步方案

MySQL的Binlog记录了所有的增删改操作,通过解析这些日志,可以实时获取数据变动情况,并同步到ElasticSearch中。

1. 使用Canal进行同步

Canal 是阿里巴巴开源的一个MySQL binlog增量订阅&消费组件,可以用于实时地捕获MySQL的Binlog并同步数据到ElasticSearch。

步骤

  1. 启动MySQL的Binlog功能

    • 在MySQL中开启Binlog功能,并配置server_id(唯一标识),确保MySQL能够产生Binlog。
[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server_id=1

      2. 安装并配置Canal

        安装Canal,配置MySQL的连接信息。

        配置Canal去监听MySQL的表,捕获到数据变动时,获取Binlog日志并解析出增删改的操作。

     3.编写消费者逻辑

        当Canal捕获到数据变化时,将相应的数据同步到ElasticSearch。

示例代码:

配置Canal监听MySQL

canal:instance:dbUsername: rootdbPassword: passworddbHost: localhostdbPort: 3306dbName: order_dbtable: orders

捕获MySQL的Binlog变化

@EventListener
public void onOrderBinlogChange(CanalEntry.Entry entry) {List<CanalEntry.RowData> rowDatasList = entry.getRowChange().getRowDatasList();for (CanalEntry.RowData rowData : rowDatasList) {if (entry.getEventType() == CanalEntry.EventType.INSERT) {// 插入操作syncInsertToES(rowData.getAfterColumnsList());} else if (entry.getEventType() == CanalEntry.EventType.UPDATE) {// 更新操作syncUpdateToES(rowData.getAfterColumnsList());} else if (entry.getEventType() == CanalEntry.EventType.DELETE) {// 删除操作syncDeleteFromES(rowData.getBeforeColumnsList());}}
}

将数据同步到ElasticSearch

private void syncInsertToES(List<CanalEntry.Column> columns) {// 将MySQL数据转换成ES文档格式,并插入ElasticSearchIndexRequest indexRequest = new IndexRequest("orders").id(getColumnValue(columns, "id")).source(columnsToMap(columns));elasticsearchClient.index(indexRequest, RequestOptions.DEFAULT);
}private void syncUpdateToES(List<CanalEntry.Column> columns) {// 更新ElasticSearch中的数据UpdateRequest updateRequest = new UpdateRequest("orders", getColumnValue(columns, "id")).doc(columnsToMap(columns));elasticsearchClient.update(updateRequest, RequestOptions.DEFAULT);
}private void syncDeleteFromES(List<CanalEntry.Column> columns) {// 删除ElasticSearch中的数据DeleteRequest deleteRequest = new DeleteRequest("orders", getColumnValue(columns, "id"));elasticsearchClient.delete(deleteRequest, RequestOptions.DEFAULT);
}

注意:

在使用消息中间件(如RabbitMQ、Kafka)实现数据同步时,消息的发送是主动的,由应用程序或服务在执行增删改操作时,主动将消息发送到消息队列。而消息队列本身并不具备监听数据库变化的功能,它的角色是用来存储和传递消息,消息的生产和消费逻辑需要在应用程序中实现。

消息发送的流程:

  1. 生产者(业务逻辑层)主动发送消息: 当应用程序执行数据库的增、删、改操作时,需要主动地将这些操作的信息发送到消息队列中。这通常是在业务代码中,在操作数据库的同时添加发送消息的逻辑。例如,新增一条记录后,会主动发送一个"新增"的消息到队列中。

  2. 消息队列(MQ)接收消息: 消息队列(如RabbitMQ、Kafka)会接收生产者发送的消息,将消息存储在队列中,并根据配置将消息推送给消费者或等待消费者主动拉取。

  3. 消费者监听队列并处理消息: 消费者服务通过监听指定的队列来接收消息,接收到消息后,消费者根据消息类型(新增、修改、删除)来执行相应的操作,比如同步到ElasticSearch或进行其他数据处理操作。

2. 使用Debezium进行同步

Debezium 是一个开源的CDC(Change Data Capture)平台,也可以实时监听MySQL的变化并将数据同步到其他存储系统,包括ElasticSearch。

步骤

        1.安装并配置Debezium连接MySQL。

        2.配置监听的表以及变动捕获逻辑。

        3.实现数据同步逻辑,将数据变动同步到ElasticSearch。

总结

  1. 消息队列同步方案:适用于数据操作频繁的场景,能够保证高并发时的系统稳定性和实时性,常用RabbitMQ或Kafka等消息队列实现。

  2. Binlog同步方案:基于Canal或Debezium的同步可以实现更为实时的同步,能够捕获数据库级别的所有数据变化。Binlog方式不依赖应用层代码改动,适合于对MySQL增删改同步要求较高的场景。

  3. 定时同步方案:适用于不需要实时同步的场景,通过定时任务进行批量同步

不同方案各有优缺点,根据具体项目需求选择合适的同步方式。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/57355.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

88.合并两个有序数组

目录 题目解法原地插入的函数如何使用sort()如何删除其中含0的数&#xff1f; 题目 给你两个按 非递减顺序 排列的整数数组 nums1 和 nums2&#xff0c;另有两个整数 m 和 n &#xff0c;分别表示 nums1 和 nums2 中的元素数目。 请你 合并 nums2 到 nums1 中&#xff0c;使合…

【存储设备专栏 2.4 -- linux 下块设备格式化命令详细介绍】

> 请阅读【嵌入式及芯片开发学必备专栏】< 文章目录 mksf 命令格式化为 FAT32格式化为 ext4格式化为 NTFS举例&#xff1a;格式化为 exFATSummary mksf 命令 在 Linux 中&#xff0c;使用 mkfs&#xff08;Make File System&#xff09;命令可以格式化块设备&#xff08…

20240818 字节跳动 笔试

文章目录 1、编程题1.11.21.31.4岗位:BSP驱动开发工程师-OS 题型:4 道编程题 1、编程题 1.1 小红的三消游戏: 小红在玩一个三消游戏,游戏中 n 个球排成一排,每个球都有一个颜色。若有 3 个颜色相同的球连在一起,则消除这 3 个球,然后剩下的球会重新连在一起。在没有 …

mysql 表

在 MySQL 中&#xff0c;表是数据库存储数据的基本单位。以下是关于 MySQL 表的一些重要知识&#xff1a; 一、表的结构 字段&#xff08;列&#xff09;&#xff1a; 每个表由若干个字段组成&#xff0c;字段定义了表中存储的数据类型和属性。例如&#xff0c;可以有整数类型&…

uniapp学习(007-3 壁纸项目:系统高度等信息的操作)

零基础入门uniapp Vue3组合式API版本到咸虾米壁纸项目实战&#xff0c;开发打包微信小程序、抖音小程序、H5、安卓APP客户端等 总时长 23:40:00 共116P 此文章包含第79p-第p82的内容 文章目录 自定义头部通屏我们自定义一个头部导航小程序的bug代码设计 获取系统的一些高度信…

CSS 设置网页的背景图片

背景 最近正好在写一个个人博客网站“小石潭记”&#xff0c;需要一张有水&#xff0c;有鱼的图片。正好玩原神遇到了类似场景&#xff0c;于是截图保存&#xff0c;添加到网站里面。以下是效果图&#xff1a; css 写个class&#xff0c;加到整个网页的body上 .bodyBg {ba…

【数据结构与算法】队列——数据世界中的“有序使者”

大家好&#xff0c;我是小卡皮巴拉 文章目录 目录 引言 一.队列的基本概念 1.1 队列的定义 1.2 队列的特性 1.3 队列的基本操作 二.队列的实现方式 2.1 基于链表的队列 2.2 基于数组的队列 三.基于链表的队列实现 定义链表队列的结构 初始化 入队列——向队列中插…

Yocto构建i.MX处理器目标镜像

1. 初始化构建环境 首先&#xff0c;通过运行imx-setup-release.sh脚本来初始化Yocto构建环境。此脚本的标准语法如下&#xff1a; $ DISTRO<distro name> MACHINE<machine name> source imx-setup-release.sh -b <build dir>DISTRO<distro configurati…

10-15个工作站用Pr处理25个4K视频 性能要求

在4K非编环境里&#xff0c;10-15台工作站运行Adobe Premiere&#xff0c;工作站跑25个4K ProRes 422 视频流。要求存储至少提供5GB/s的&#xff0c;2GB/s的读&#xff0c;并且提供650TB的空间。只有达到这样的要求&#xff0c;才能保证文件快速访问&#xff0c;以及编辑时做到…

代码工艺:写代码的好习惯

1. 充分校验入参 有一句话叫 “All input is evil”&#xff0c;即一切的输入都可能是恶意的。 因此&#xff0c;经验丰富的工程师会对接口的入参进行严格的校验&#xff0c;从最基础的非空、长度校验&#xff0c;到复杂的业务逻辑校验都不应忽略。例如&#xff0c;在典型的电…

C++ [项目] 飞机大战

现在才发现C游戏的支持率这么高&#xff0c;那就发几篇吧 一、基本介绍 支持Dev-C5.11版本(务必调为英文输入法),基本操作看游戏里的介绍,怎么做的……懒得说,能看懂就看注释,没有的自己猜,如果你很固执……私我吧 二、代码部分 /* 2024.8.13*/ #include<iostream> #i…

学习笔记——交换——STP(生成树)基本概念

三、基本概念 1、桥ID/网桥ID (Bridege ID&#xff0c;BID) 每一台运行STP的交换机都拥有一个唯一的桥ID(BID)&#xff0c;BID(Bridge ID/桥ID)。在STP里我们使用不同的桥ID标识不同的交换机。 (2)BID(桥ID)组成 BID(桥ID)组成(8个字节)&#xff1a;由16位(2字节)的桥优先级…

ObjectMapper简单使用

<!-- 根据自己需要引入相关版本依赖。 --> <dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-core</artifactId><version>2.9.10</version> </dependency><dependency><groupId…

AD如何制作原理图的模版、原理图模板绘制修改以及如何导入原理图模版

作为硬件工程师&#xff0c;制定原理图模板是一项至关重要的任务&#xff0c;旨在标准化和规范原理图的绘制过程。在AD20中制作、绘制修改以及导入原理图模板的步骤如下&#xff1a; 1制作原理图模板 首先需在AD原理图设计环境下新建一个原理图文件&#xff1b; 在原理图界面…

实用的 Python 小脚本

一、引言 在日常办公和电脑使用中&#xff0c;我们经常会遇到一些重复性的任务或需要快速获取特定信息的情况。Python 作为一种强大而灵活的编程语言&#xff0c;可以用来编写各种小脚本&#xff0c;以自动化这些任务并提高工作效率。本文将介绍一些 Python 常用的小脚本&…

VSCode离线安装插件

下载最新的VSCode&#xff0c;安装。 打开VSCODE&#xff0c;打开左边的EXTENSINS(拓展)&#xff0c;打开 Install from VSIX&#xff0c;找到 .vsix的文件&#xff0c;打开安装。完成。 1&#xff09;去哪找插件&#xff0c;当然是插件官网了&#xff0c;插件官网&#xff0c;…

MySQL 之 存储引擎

存储引擎 MySQL体系结构 连接层&#xff1a;最上层是一些客户端和链接服务&#xff0c;主要完成一些类似于连接处理、授权认证、及相关的安全方案。服务器也会安全接入的每个客户端验证它所具有的操作权限。服务层&#xff1a;第二层完成大多数的核心服务功能&#xff0c;如SQ…

12、论文阅读:SpikeYOLO:高性能低能耗目标检测网络

SpikeYOLO:高性能低能耗目标检测网络 前言解释介绍相关工作论文提出的方法网络输入SpikeYOLO架构概述网络输出宏观设计微观设计I-LIF脉冲神经元LIFI-LIF实验代码前言 脉冲神经网络(Spiking Neural Networks, SNNs)具有生物合理性和低功耗的优势,相较于人工神经网络(Artif…

Python实现股票自动交易:步骤、要点与注意事项有哪些?

炒股自动化&#xff1a;申请官方API接口&#xff0c;散户也可以 python炒股自动化&#xff08;0&#xff09;&#xff0c;申请券商API接口 python炒股自动化&#xff08;1&#xff09;&#xff0c;量化交易接口区别 Python炒股自动化&#xff08;2&#xff09;&#xff1a;获取…

机器学习探索性数据分析 (EDA)

机器学习探索性数据分析 (EDA) 探索性数据分析&#xff08;Exploratory Data Analysis, EDA&#xff09;是机器学习工作流中至关重要的一个步骤&#xff0c;通过深入分析和理解数据的结构、分布和相关性&#xff0c;EDA帮助揭示数据背后的故事&#xff0c;并为后续的建模提供有…