centos7安装Elasticsearch Service Sink Connector【CDC实战系列十】

news/2025/11/4 17:33:14/文章来源:https://www.cnblogs.com/hbuuid/p/16457309.html

简介:数据同步技术(此处指CDC)发展迅速,目前主流的同步技术是使用Confluen-platform,他基于Kafaka开发,包含你能想到的任何组件。
核心是kafka connect,kafka connect 通过两种类型的连接器工作:

  • 源连接器——摄取整个数据库并将表更新流式传输到 Kafka 主题。源连接器还可以从您的所有应用程序服务器收集指标并将其存储在 Kafka 主题中,从而使数据可用于低延迟的流处理。
  • 接收器连接器——将来自 Kafka 主题的数据传送到二级索引(如 Elasticsearch)或批处理系统(如 Hadoop)以进行离线分析。

原文:

  • “Kafka Connect includes two types of connectors:
    • Source connector – Ingests entire databases and streams table updates to Kafka topics. A source connector can also collect metrics from all your application servers and store these in Kafka topics, making the data available for stream processing with low latency.
    • Sink connector – Delivers data from Kafka topics into secondary indexes such as Elasticsearch, or batch systems such as Hadoop for offline analysis.”

比如著名的Source Connector有:Debezium connector for MySQL,著名的Sink Connector有Elasticsearch Service Sink Connector
基于以上组件,可以实现MySQL高效实时同步数据到Elasticsearch。

目前有两种使用方式:
第一种:直接使用kafka,kafka connect,手动安装connector plugins(连接器插件)
【本文就是基于第一种方式安装】
第二种:安装confluent platform,然后使用confluent hub安装

软件版本信息

Debezium:1.9.4.Final
MySQL:5.7.x.
Elasticsearch:7.15.1
confluentinc-kafka-connect-elasticsearch:13.1.0

1、下载zip包,手动安装(即不使用confluent platform安装cdc)

 

--将下载的zip包上传到 /usr/local/kafka/kafka_2.13-3.2.0/libs/confluent 目录下,解压--
修改connect-distributed.properties配置文件的plugin.path如下:

plugin.path=/usr/local/kafka/kafka_2.13-3.2.0/libs,/usr/local/kafka/kafka_2.13-3.2.0/libs/debezium/debezium-connector-mysql,/usr/local/kafka/kafka_2.13-3.2.0/libs/confluent/confluentinc-kafka-connect-elasticsearch-13.1.0

 


2、编写配置文件:

创建文件elasticsearch-sink.json,内容为:

{
"name": "elasticsearch-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "goods.goods.t_sku",
"key.ignore": "false",
"connection.url": "http://127.0.0.1:9200",
"name": "elasticsearch-sink",
"transforms": "unwrap,key",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "true",
"transforms.unwrap.delete.handling.mode": "drop",
"transforms.key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.key.field": "id",
"transforms.changetopic.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.changetopic.regex": "(.*)",
"transforms.changetopic.replacement": "$1"
}
}

参考官方文档

"transforms.unwrap.drop.tombstones": "true",  
"transforms.unwrap.delete.handling.mode": "drop", 
这么配置到sink中就可以实现mysql删除数据,然后ealsticsearch中也删除改数据。
看了官方文档,暂时没有理解配置的说明。

3、重启kafka-connect
重启kafka-connect的步骤:

ps- aux | grep 8083
或者
ps- aux | grep ConnectDistributed
或者 jps
找到ConnectDistributed对应的pid
kill pid
不要使用kill -9 pid
然后重新执行:
./bin/connect-distributed.sh  -daemon config/connect-distributed.properties
查看结果:
netstat -nltp | grep 8083
查看加载的插件:
curl http://localhost:8083/connector-plugins
查看连接器:
curl http://localhost:8083/connectors
加入source连接器:
curl -d @"connect-mysql-source.json"   -H"Content-Type: application/json" -X POST http://localhost:8083/connectors
加入sink连接器:
curl -d @"elasticsearch-sink.json"   -H"Content-Type: application/json" -X POST http://localhost:8083/connectors
再次查看连接器
curl http://localhost:8083/connectors以上就可以重启连接器,并查看kafka connect 是否工作正常
如果,连接器已存在,需要删除重新加入,则是如下命令删除:
curl -X DELETE http://localhost:8083/connectors/elasticsearch-sink
重新加入sink连接器:
curl -d @"elasticsearch-sink.json" -H"Content-Type: application/json" -X POST http://localhost:8083/connectors

4,查看elasticsearch已经生成了索引名称为goods.goods.t_sku的索引,并同步成功了数据。

5、在mysql的goods.t_sku表中增加一条数据,同时观察索引goods.t_sku中也会增加一条数据。成功!
===========需要注意的点=======

一、transforms
(1)RegexRouter :可以灵活对kafka的topic进行区分管理

(2)ExtractNewRecordState  :如果没有,输入数据会包含:before、after记录修改前对比信息以及元数据信息(source,op,ts_ms等)。这些信息在后续数据写入Elasticsearch是不需要的。(注意结合自己业务场景)。

(3)ExtractField :实现id和elasticsearch的_id一致,配合 "key.ignore": "false"一起使用,必须配置false,否则es生成的_id不是mysql表的id,这样是不能实现更新和删除的

二、ES的Mapping设置

1、自定义mapping,直接在同步之前创建好
2、动态映射mapping,可参考这篇文章
比如本例子中:mysql约定所有时间类型字段后缀以_time结尾,然后使用dynamic_templates指定es的时间mapping为date类型

PUT /goods.goods.t_sku
{"mappings": {"date_detection": false,"dynamic_templates": [{"dates": {"match": ".*_time","match_pattern": "regex","mapping": {"type": "date"}}}]}
}

 给索引设置alias,使用起来了更灵活、便捷

POST _aliases
{"actions": [{"add": {"index": "goods.goods.t_sku","alias": "goods.goods.t_sku_alias"}}]
}

参考一:kafka connect
参考二:debezium
参考三:Elasticsearch Service Sink


==部署了这个cdc从mysql同步数据到elasticsearch的应用后,2核4G的虚拟机已经到极限了!==

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/955904.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2025年诚信的PU线条厂家TOP5推荐,PU线条厂家全解析

在室内装饰的精细化浪潮中,PU线条凭借环保、耐用、美观的特质成为空间设计的点睛之笔,而选择诚信可靠的厂家则是品质落地的关键。面对市场上良莠不齐的供应商,如何避开劣质陷阱与适配难题?以下依据产品品质、服务能…

2025年度阀门涂装制造厂排名:5家的阀门创新涂装厂家推荐

石油机械行业作为能源产业链的核心环节,其阀门设备的涂装质量直接决定了油气开采、炼化输送的安全性与耐久性。在高含硫、深海盐雾等极端工况下,传统涂装工艺已难以满足行业对防腐性能、精度控制、生产效率的严苛要求…

Java-143 深入浅出 MongoDB NoSQL:MongoDB、Redis、HBase、Neo4j应用场景与对比 - 详解

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

2025年代办注册公司哪家口碑好?代办注册公司找哪家?

在创业浪潮席卷全国的当下,注册公司是企业踏入市场的第一步。但繁杂的流程、模糊的政策、隐蔽的风险,让无数创业者望而却步。代办注册公司哪家口碑好?代办注册公司服务找哪家更靠谱?代办注册公司怎么联系到专业团队…

内网即时通讯软件新选择:吱吱企业即时通讯软件的安全与协作一体化

在数字化办公浪潮中,企业即时通讯软件已经成为商业机密的重要载体。对于金融、医疗、科技等数据敏感型企业,以内网形式保证企业通讯安全与协作效率的成为重中之重。吱吱企业即时通讯软件凭借内网通讯、数据私有化以及…

leetcode热题100-49:字母异位词分组

给你一个字符串数组,请你将 字母异位词 组合在一起。可以按任意顺序返回结果列表。示例 1:输入: strs = ["eat", "tea", "tan", "ate", "nat", "bat"] …

2025年哈尔滨PU线条供应商推荐:靠谱的PU线条企业全解析

在装修装饰行业,PU线条凭借其轻便、美观、耐用等优势,成为众多业主和设计师的材料。但面对市场上琳琅满目的PU线条供应商,如何选择靠谱、口碑好的合作伙伴,成为许多人头疼的问题。本文将为您推荐哈尔滨及周边地区的…

工业CMOS相机的原理及基础知识

我们知道在图像采集和处理的过程,最基本的是要把实物尽量真实地反映到虚拟的图像上。在机器视觉领域,图像采集和处理的过程需要用到工业相机。 工业相机是机器视觉系统中的一个关键组件,其最本质的功能就是将光信号…

2025年有效减肥训练机构推荐,瘦身训练课程与间歇性减肥训练企业全解析

在健康意识觉醒的时代,专业减肥训练机构是人们实现体重管理与健康生活的重要伙伴。面对市场上众多减肥训练机构,如何选择靠谱且适合自己的?以下依据不同特色,为你推荐2025年值得关注的有效减肥训练机构,涵盖有效减…

东北区域PU线条设计服务TOP5推荐,PU雕花线条与PU顶角线企业全解析

在装修装饰行业精细化发展的当下,PU线条凭借质轻、防潮、环保等优势,逐渐取代传统石膏线成为空间设计的点睛之笔。而PU线条设计服务的专业性直接决定了终呈现效果——从PU雕花线条的立体肌理到PU顶角线的无缝衔接,都…

2025年北京一对一上门辅导机构年度排名:龙文教育集团一对一上门辅导推荐

在教育个性化需求激增的当下,一对一上门辅导因精准适配学习场景、省时高效提升成绩成为众多家庭的刚需选择。然而市场机构鱼龙混杂,如何避开师资不匹配、服务无保障的坑?以下结合家长真实体验与教学数据,为你盘点2…

2025 年散热器厂家最新推荐榜:涵盖电子 / 插片 / 型材 / 铲齿 / 新能源等多品类,权威测评精选实力企业

引言 随着电子产业向高功率、小型化方向快速发展,散热器作为保障设备稳定运行的核心部件,市场需求持续增长,行业对产品品质与性能的要求也不断提升。为帮助下游企业精准筛选优质供应商,行业协会联合专业测评机构开…

2025 年过滤器厂家最新推荐榜单:品牌综合实力测评发布,五大优质企业脱颖而出润滑油过滤器/自清洗过滤器/全自动除污过滤器/双联过滤器/烛式过滤器厂家推荐

引言 近日,行业权威协会发布 2025 年度过滤器制造商综合测评报告,本次测评覆盖全自动过滤器、油过滤器、海水过滤器等 12 大类产品,从技术实力、产品性能、服务能力、市场口碑四大维度设置 28 项细分指标,通过实地…

docker学习笔记详记 - 教程

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

Windows 11 常规设置

Windows 11 常规设置本文来自博客园,作者:VipSoft 转载请注明原文链接:https://www.cnblogs.com/vipsoft/p/19190918

浏览器共享存储导致身份标识冲突

在开发心理咨询的项目的时候,出现了困扰很长时间的一个问题,就是用户身份混淆 用户A、B 同一个浏览器 A先登录,B后登录,B登录完,A刷新一下,就变成B的身份了, 反反复复,因为我采用的是Session,在登录之后将用户…

2025年激光干涉仪生产厂家权威推荐榜单:球杆仪/激光雷达/悬臂三坐标测量机源头厂家精选

近年来,随着高端制造业对精度要求的提升,激光干涉仪市场规模显著增长。据行业报告显示,2025年全球激光干涉仪市场年复合增长率预计达4.5%,其中中国市场的贡献率持续攀升。激光干涉仪凭借纳米级分辨率和多维几何量检…

详细介绍:计算机网络第四章(10)——网络层《路由算法+路由协议》

详细介绍:计算机网络第四章(10)——网络层《路由算法+路由协议》pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "…

2025 年商用直饮机租赁公司最新推荐榜,技术实力与市场口碑深度解析,聚焦优质品牌综合表现

引言 随着商用净饮水需求持续攀升,商用直饮机市场竞争愈发激烈,为帮助企事业单位、公共场所精准筛选优质产品,本次推荐榜结合行业协会最新测评数据制定科学测评体系。测评从技术实力(净化技术先进性、水温控制精度…

2025母婴用品双11营销解码与AI应用洞察报告|附40+份报告PDF、数据、绘图模板汇总下载

原文链接:https://tecdat.cn/?p=44195原文出处:拓端抖音号@拓端tecdat2024年双11小红书母婴搜索人数突破2400万,2025年618进一步飙升至3500万——短短一年,母婴行业的“流量逻辑”已从“广撒网”转向“精捕捉”。…