原文地址: https://debezium.io/blog/2023/10/05/Debezium-JMX-signaling-and-notifications/
欢迎关注留言,我是收集整理小能手,工具翻译,仅供参考,笔芯笔芯.
Debezium signaling and notifications - Part 3: JMX channel
 October 5, 2023 by Fiore Mario Vitale
 debezium features notifications signaling integration
欢迎收看我们关于德贝兹信号和通知系列的第三部分。在这篇文章中,我们将继续我们对德贝兹信号和通知的探索。特别是,我们将研究如何使用JMDA通道启用和管理这些特性。
我们还将探索如何利用jlokia通过其他API发送信号和获得通知。
通过jmx与德贝兹的相互作用
 jmx代表Java管理扩展,一种用于管理和监控Java应用程序的Java技术。它提供了一种标准化的方法来监控应用程序的性能,配置设置,并与使用各种管理工具和客户端运行的Java应用程序进行交互。对于复杂的、分布式的和企业级的Java应用程序的管理和监控,jmx特别有用。
可通过JMDA通道发出信号
 德贝齐斯中的信号是关于在正常执行期间执行操作的触发动作。正如在前几篇文章中所讨论的,Debezum提供了不同的非常规信号通道。在这篇文章中,我们将重点讨论JMDA通道。
若要开始使用jmx信号通道,我们需要:
在卡夫卡连接服务上启用JDB2服务器
加起来jmx 到signal.enabled.channels 连接器配置属性
使用一个jmx客户端连接到jmx服务器发送信号。
德贝兹暴露了名为MBean的信号debezium.:type=management,context=signals,server= .这豆露出来了signal 接受三个参数的业务:
信号的标识。
信号的类型,例如,执行弹弓。
JSON数据字段,包含关于指定信号类型的附加信息。
通过JMDA通道启用通知
 通知是告诉你在德贝兹铵中会发生什么的关键。通过JMDA通道访问通知允许您轻松地监视Debezns,例如,增量快照的进程。
要开始使用JDB2通知通道,我们需要:
在卡夫卡连接服务上启用JDB2服务器
加起来jmx 到notification.enabled.channels 连接器配置属性
使用一个JDB2客户端连接到JDB2服务器来访问通知。
通知书上的名称debezium.:type=management,context=notifications,server= .这个豆提供了一个Notification 包含一个jmx列表的BeanCompositeData 具有下列属性的类型:
财产 描述
 身份证
分配给通知的唯一标识符。关于增量快照通知,id 是一样的execute-snapshot 信号。
总数_类型
与通知相关的聚合根的数据类型.在域驱动设计中,导出事件总是指聚合。
类型
提供在aggregate_type 场地。
附加_数据
地图<字符串,并附有通知的详细信息。
让我们花点时间,看看如何发送一个增量快照,并通过jmx通道接收关于其进展的通知。
通过jmx通道发送增量快照信号
 对于这个例子,我们将使用带有后GERGSQL数据库的DEBeZMR文档图像。
我们可以使用下面的码头组合文件启动所有需要的服务
version: ‘2’
 services:
 zookeeper:
 container_name: zookeeper
 image: quay.io/debezium/zookeeper:2.4
 ports:
 - 2181:2181
 - 2888:2888
 - 3888:3888
 kafka:
 container_name: kafka
 image: quay.io/debezium/kafka:2.4
 ports:
 - 9092:9092
 links:
 - zookeeper
 environment:
 - ZOOKEEPER_CONNECT=zookeeper:2181
 postgres:
 container_name: postgres
 image: quay.io/debezium/example-postgres:2.4
 ports:
 - 5432:5432
 environment:
 - POSTGRES_USER=postgres
 - POSTGRES_PASSWORD=postgres
 connect:
 container_name: connect
 image: quay.io/debezium/connect:2.4
 ports:
 - 8083:8083
 - 9012:9012
 - 8778:8778
 links:
 - kafka
 - postgres
 environment:
 - BOOTSTRAP_SERVERS=kafka:9092
 - GROUP_ID=1
 - CONFIG_STORAGE_TOPIC=my_connect_configs
 - OFFSET_STORAGE_TOPIC=my_connect_offsets
 - STATUS_STORAGE_TOPIC=my_connect_statuses
 - JMXPORT=9012
 - JMXHOST=0.0.0.0
 - ENABLE_JOLOKIA=true
 这将暴露用于连接到jmx服务器的端口9012
 启用jmx并指定将用于Jmx的端口号。该值用于指定JVM参数 -Dcom.sun.management.jmxremote.port= J M X P O R T . 这 个 地 址 或 可 解 析 的 主 机 名 的 码 头 主 机 , 使 用 它 来 构 造 一 个 发 送 到 j m x 客 户 端 的 U R L 。 局 部 宿 主 值 或 127.0.0.1 将 不 起 作 用 。 通 常 可 使 用 0.0.0.0 。 该 值 用 于 指 定 J V M 参 数 − D j a v a . r m i . s e r v e r . h o s t n a m e = JMX_PORT . 这个地址或可解析的主机名的码头主机,使用它来构造一个发送到jmx客户端的URL。局部宿主值或127.0.0.1将不起作用。通常可使用0.0.0.0。该值用于指定JVM参数 -Djava.rmi.server.hostname= JMXPORT.这个地址或可解析的主机名的码头主机,使用它来构造一个发送到jmx客户端的URL。局部宿主值或127.0.0.1将不起作用。通常可使用0.0.0.0。该值用于指定JVM参数−Djava.rmi.server.hostname=JMXHOST
 在保存文件后debezium.yaml ,所有服务均以:
docker compose -f debezium.yaml up -d
 输出会像这样
[+] Running 5/5
 ✔ Network deploy_default Created 0.1s
 ✔ Container deploy-zookeeper-1 Started 0.1s
 ✔ Container deploy-postgres-1 Started 0.1s
 ✔ Container deploy-kafka-1 Started 0.1s
 ✔ Container deploy-connect-1 Started
 现在我们可以检查所有的服务是否都在运行
docker ps
 输出应该与此相似
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
 f1d49fb79dba quay.io/debezium/connect:2.4 “/docker-entrypoint.…” 3 seconds ago Up 2 seconds 0.0.0.0:8083->8083/tcp, 0.0.0.0:8778->8778/tcp, 0.0.0.0:9012->9012/tcp, 9092/tcp deploy-connect-1
 e164b2651fbf quay.io/debezium/kafka:2.4 “/docker-entrypoint.…” 3 seconds ago Up 2 seconds 0.0.0.0:9092->9092/tcp deploy-kafka-1
 e61116f22f9d quay.io/debezium/example-postgres:2.4 “docker-entrypoint.s…” 4 seconds ago Up 2 seconds 0.0.0.0:5432->5432/tcp deploy-postgres-1
 ccb502882928 quay.io/debezium/zookeeper:2.4 “/docker-entrypoint.…” 4 seconds ago Up 2 seconds 0.0.0.0:2181->2181/tcp, 0.0.0.0:2888->2888/tcp, 0.0.0.0:3888->3888/tcp deploy-zookeeper-1
 此时所有服务都已启动并运行,因此我们可以通过以下配置注册连接器
{
 “name”: “inventory-connector”,
 “config”: {
 “connector.class”: “io.debezium.connector.postgresql.PostgresConnector”,
 “tasks.max”: “1”,
 “database.hostname”: “postgres”,
 “database.port”: “5432”,
 “database.user”: “postgres”,
 “database.password”: “postgres”,
 “database.server.id”: “184054”,
 “database.dbname”: “postgres”,
 “topic.prefix”: “dbserver1”,
 “snapshot.mode”: “NEVER”,
 “schema.history.internal.kafka.bootstrap.servers”: “kafka:9092”,
 “schema.history.internal.kafka.topic”: “schema-changes.inventory”,
 “signal.enabled.channels”: “source,jmx”,
 “signal.data.collection”: “inventory.debezium_signal”,
 “notification.enabled.channels”: “jmx”
 }
 }
 此配置可使 来源 和 Jmx 通道。即使我们只希望使用JDB2来发送信号来执行增量快照, 来源 仍然需要发送信号,因为Debezns需要使用信号表来水印db日志以进行事件复制。
 把用来发信号的表
 现在,别担心 notification.enabled.channels 财产。我们稍后会深入研究的
 把这个配置保存到一个文件中 后记-jmx.json ,我们可以登记。
注册连接器我们可以使用curl 调用卡夫卡连接API
curl -i -X POST -H “Accept:application/json” -H “Content-Type:application/json” localhost:8083/connectors/ -d ‘{“name”:“inventory-connector”,“config”:{“connector.class”:“io.debezium.connector.postgresql.PostgresConnector”,“tasks.max”:“1”,“database.hostname”:“postgres”,“database.port”:“5432”,“database.user”:“postgres”,“database.password”:“postgres”,“database.server.id”:“184054”,“database.dbname”:“postgres”,“topic.prefix”:“dbserver1”,“snapshot.mode”:“NEVER”,“schema.history.internal.kafka.bootstrap.servers”:“kafka:9092”,“schema.history.internal.kafka.topic”:“schema-changes.inventory”,“signal.enabled.channels”:“source,jmx”,“signal.data.collection”:“inventory.debezium_signal”,“notification.enabled.channels”:“log,sink,jmx”,“notification.sink.topic.name”:“io.debezium.notification”}}’
 或者我建议用 Kcctl 工具与卡夫卡互动连接。它是卡夫卡连接的现代直观的命令行客户端。
首先,我们需要创建一个配置上下文来连接卡夫卡连接
kcctl config set-context local --cluster http://localhost:8083
 然后我们可以注册连接器运行以下命令
kcctl apply -f postgres-jmx.json
 我们现在可以得到连接容器的日志
docker logs connect
 检查连接器是否启动了流事件
INFO Postgres|dbserver1|streaming Starting streaming [io.debezium.pipeline.ChangeEventSourceCoordinator]
 为增量快照准备数据库
 因为增量快照需要signal.data.collection 要定义此定义,我们需要在您的服务站数据库中创建信号表。
在使用与GTDS和read.only 准备好了。
 要创建信号表,我们需要连接到我们的ESTGres实例。我们可以利用psql 客户在邮政集装箱内。
docker exec -it postgres bash
 一旦进入容器,我们就可以连接到
psql -h localhost -d postgres -U postgres
 密码是 波斯特格雷斯
 我们就可以检查里面有没有桌子 存货 图解
\dt inventory.*
 命令应该返回类似的东西
            List of relations
Schema | Name | Type | Owner
 -----------±-----------------±------±---------
 inventory | customers | table | postgres
 inventory | geom | table | postgres
 inventory | orders | table | postgres
 inventory | products | table | postgres
 inventory | products_on_hand | table | postgres
 inventory | spatial_ref_sys | table | postgres
 (6 rows)
 我们需要用以下命令创建信号表:
CREATE TABLE inventory.debezium_signal (id VARCHAR(42) PRIMARY KEY, type VARCHAR(32) NOT NULL, data VARCHAR(2048) NULL);
 发送增量快照信号
 我们必须连接到jmx服务器,才能通过jmx通道发送信号。我们使用 Jmxdg 客户,所以下载后,我们可以运行它
java -jar jmxterm-1.0.4-uber.jar
open localhost:9012
beans -d debezium.postgres
run -b debezium.postgres:context=signals,server=dbserver1,type=management signal 12345 execute-snapshot {“data-collections”:[“inventory.orders”],“type”:“INCREMENTAL”}
 帮我查一下客户
 打开一个连接到JDB2服务器
 搜寻豆下 德贝齐姆。 领域
 执行 发信号 执行递增快照的操作 存货. 表
 核对数据
 在那之后,我们要检查所有来自 命令 表在相应的卡夫卡主题中得到了正确的捕捉。
我们可以通过以下命令输入卡夫卡容器:
docker exec -it kafka bash
 一旦进入容器,我们就可以在 dbserver1.inventory.orders 主题:以下命令
kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic dbserver1.inventory.orders --from-beginning
 输出应该是这样的
{
 “schema”: {
 “type”: “struct”,
 “fields”: [
 {
 “type”: “struct”,
 “fields”: [
 {
 “type”: “int32”,
 “optional”: false,
 “default”: 0,
 “field”: “id”
 },
 {
 “type”: “int32”,
 “optional”: false,
 “name”: “io.debezium.time.Date”,
 “version”: 1,
 “field”: “order_date”
 },
 {
 “type”: “int32”,
 “optional”: false,
 “field”: “purchaser”
 },
 {
 “type”: “int32”,
 “optional”: false,
 “field”: “quantity”
 },
 {
 “type”: “int32”,
 “optional”: false,
 “field”: “product_id”
 }
 ],
 “optional”: true,
 “name”: “dbserver1.inventory.orders.Value”,
 “field”: “before”
 },
 {
 “type”: “struct”,
 “fields”: [
 {
 “type”: “int32”,
 “optional”: false,
 “default”: 0,
 “field”: “id”
 },
 {
 “type”: “int32”,
 “optional”: false,
 “name”: “io.debezium.time.Date”,
 “version”: 1,
 “field”: “order_date”
 },
 {
 “type”: “int32”,
 “optional”: false,
 “field”: “purchaser”
 },
 {
 “type”: “int32”,
 “optional”: false,
 “field”: “quantity”
 },
 {
 “type”: “int32”,
 “optional”: false,
 “field”: “product_id”
 }
 ],
 “optional”: true,
 “name”: “dbserver1.inventory.orders.Value”,
 “field”: “after”
 },
 {
 “type”: “struct”,
 “fields”: [
 {
 “type”: “string”,
 “optional”: false,
 “field”: “version”
 },
 {
 “type”: “string”,
 “optional”: false,
 “field”: “connector”
 },
 {
 “type”: “string”,
 “optional”: false,
 “field”: “name”
 },
 {
 “type”: “int64”,
 “optional”: false,
 “field”: “ts_ms”
 },
 {
 “type”: “string”,
 “optional”: true,
 “name”: “io.debezium.data.Enum”,
 “version”: 1,
 “parameters”: {
 “allowed”: “true,last,false,incremental”
 },
 “default”: “false”,
 “field”: “snapshot”
 },
 {
 “type”: “string”,
 “optional”: false,
 “field”: “db”
 },
 {
 “type”: “string”,
 “optional”: true,
 “field”: “sequence”
 },
 {
 “type”: “string”,
 “optional”: false,
 “field”: “schema”
 },
 {
 “type”: “string”,
 “optional”: false,
 “field”: “table”
 },
 {
 “type”: “int64”,
 “optional”: true,
 “field”: “txId”
 },
 {
 “type”: “int64”,
 “optional”: true,
 “field”: “lsn”
 },
 {
 “type”: “int64”,
 “optional”: true,
 “field”: “xmin”
 }
 ],
 “optional”: false,
 “name”: “io.debezium.connector.postgresql.Source”,
 “field”: “source”
 },
 {
 “type”: “string”,
 “optional”: false,
 “field”: “op”
 },
 {
 “type”: “int64”,
 “optional”: true,
 “field”: “ts_ms”
 },
 {
 “type”: “struct”,
 “fields”: [
 {
 “type”: “string”,
 “optional”: false,
 “field”: “id”
 },
 {
 “type”: “int64”,
 “optional”: false,
 “field”: “total_order”
 },
 {
 “type”: “int64”,
 “optional”: false,
 “field”: “data_collection_order”
 }
 ],
 “optional”: true,
 “name”: “event.block”,
 “version”: 1,
 “field”: “transaction”
 }
 ],
 “optional”: false,
 “name”: “dbserver1.inventory.orders.Envelope”,
 “version”: 1
 },
 “payload”: {
 “before”: null,
 “after”: {
 “id”: 10001,
 “order_date”: 16816,
 “purchaser”: 1001,
 “quantity”: 1,
 “product_id”: 102
 },
 “source”: {
 “version”: “2.4.0-SNAPSHOT”,
 “connector”: “postgresql”,
 “name”: “dbserver1”,
 “ts_ms”: 1695631605203,
 “snapshot”: “incremental”,
 “db”: “postgres”,
 “sequence”: “[“34837776”,“34837776”]”,
 “schema”: “inventory”,
 “table”: “orders”,
 “txId”: null,
 “lsn”: null,
 “xmin”: null
 },
 “op”: “r”,
 “ts_ms”: 1695631605204,
 “transaction”: null
 }
 }
 {
 “schema”: {
 “type”: “struct”,
 “fields”: [
 {
 “type”: “struct”,
 “fields”: [
 {
 “type”: “int32”,
 “optional”: false,
 “default”: 0,
 “field”: “id”
 },
 {
 “type”: “int32”,
 “optional”: false,
 “name”: “io.debezium.time.Date”,
 “version”: 1,
 “field”: “order_date”
 },
 {
 “type”: “int32”,
 “optional”: false,
 “field”: “purchaser”
 },
 {
 “type”: “int32”,
 “optional”: false,
 “field”: “quantity”
 },
 {
 “type”: “int32”,
 “optional”: false,
 “field”: “product_id”
 }
 ],
 “optional”: true,
 “name”: “dbserver1.inventory.orders.Value”,
 “field”: “before”
 },
 {
 “type”: “struct”,
 “fields”: [
 {
 “type”: “int32”,
 “optional”: false,
 “default”: 0,
 “field”: “id”
 },
 {
 “type”: “int32”,
 “optional”: false,
 “name”: “io.debezium.time.Date”,
 “version”: 1,
 “field”: “order_date”
 },
 {
 “type”: “int32”,
 “optional”: false,
 “field”: “purchaser”
 },
 {
 “type”: “int32”,
 “optional”: false,
 “field”: “quantity”
 },
 {
 “type”: “int32”,
 “optional”: false,
 “field”: “product_id”
 }
 ],
 “optional”: true,
 “name”: “dbserver1.inventory.orders.Value”,
 “field”: “after”
 },
 {
 “type”: “struct”,
 “fields”: [
 {
 “type”: “string”,
 “optional”: false,
 “field”: “version”
 },
 {
 “type”: “string”,
 “optional”: false,
 “field”: “connector”
 },
 {
 “type”: “string”,
 “optional”: false,
 “field”: “name”
 },
 {
 “type”: “int64”,
 “optional”: false,
 “field”: “ts_ms”
 },
 {
 “type”: “string”,
 “optional”: true,
 “name”: “io.debezium.data.Enum”,
 “version”: 1,
 “parameters”: {
 “allowed”: “true,last,false,incremental”
 },
 “default”: “false”,
 “field”: “snapshot”
 },
 {
 “type”: “string”,
 “optional”: false,
 “field”: “db”
 },
 {
 “type”: “string”,
 “optional”: true,
 “field”: “sequence”
 },
 {
 “type”: “string”,
 “optional”: false,
 “field”: “schema”
 },
 {
 “type”: “string”,
 “optional”: false,
 “field”: “table”
 },
 {
 “type”: “int64”,
 “optional”: true,
 “field”: “txId”
 },
 {
 “type”: “int64”,
 “optional”: true,
 “field”: “lsn”
 },
 {
 “type”: “int64”,
 “optional”: true,
 “field”: “xmin”
 }
 ],
 “optional”: false,
 “name”: “io.debezium.connector.postgresql.Source”,
 “field”: “source”
 },
 {
 “type”: “string”,
 “optional”: false,
 “field”: “op”
 },
 {
 “type”: “int64”,
 “optional”: true,
 “field”: “ts_ms”
 },
 {
 “type”: “struct”,
 “fields”: [
 {
 “type”: “string”,
 “optional”: false,
 “field”: “id”
 },
 {
 “type”: “int64”,
 “optional”: false,
 “field”: “total_order”
 },
 {
 “type”: “int64”,
 “optional”: false,
 “field”: “data_collection_order”
 }
 ],
 “optional”: true,
 “name”: “event.block”,
 “version”: 1,
 “field”: “transaction”
 }
 ],
 “optional”: false,
 “name”: “dbserver1.inventory.orders.Envelope”,
 “version”: 1
 },
 “payload”: {
 “before”: null,
 “after”: {
 “id”: 10002,
 “order_date”: 16817,
 “purchaser”: 1002,
 “quantity”: 2,
 “product_id”: 105
 },
 “source”: {
 “version”: “2.4.0-SNAPSHOT”,
 “connector”: “postgresql”,
 “name”: “dbserver1”,
 “ts_ms”: 1695631605204,
 “snapshot”: “incremental”,
 “db”: “postgres”,
 “sequence”: “[“34837776”,“34837776”]”,
 “schema”: “inventory”,
 “table”: “orders”,
 “txId”: null,
 “lsn”: null,
 “xmin”: null
 },
 “op”: “r”,
 “ts_ms”: 1695631605204,
 “transaction”: null
 }
 }
 {
 “schema”: {
 “type”: “struct”,
 “fields”: [
 {
 “type”: “struct”,
 “fields”: [
 {
 “type”: “int32”,
 “optional”: false,
 “default”: 0,
 “field”: “id”
 },
 {
 “type”: “int32”,
 “optional”: false,
 “name”: “io.debezium.time.Date”,
 “version”: 1,
 “field”: “order_date”
 },
 {
 “type”: “int32”,
 “optional”: false,
 “field”: “purchaser”
 },
 {
 “type”: “int32”,
 “optional”: false,
 “field”: “quantity”
 },
 {
 “type”: “int32”,
 “optional”: false,
 “field”: “product_id”
 }
 ],
 “optional”: true,
 “name”: “dbserver1.inventory.orders.Value”,
 “field”: “before”
 },
 {
 “type”: “struct”,
 “fields”: [
 {
 “type”: “int32”,
 “optional”: false,
 “default”: 0,
 “field”: “id”
 },
 {
 “type”: “int32”,
 “optional”: false,
 “name”: “io.debezium.time.Date”,
 “version”: 1,
 “field”: “order_date”
 },
 {
 “type”: “int32”,
 “optional”: false,
 “field”: “purchaser”
 },
 {
 “type”: “int32”,
 “optional”: false,
 “field”: “quantity”
 },
 {
 “type”: “int32”,
 “optional”: false,
 “field”: “product_id”
 }
 ],
 “optional”: true,
 “name”: “dbserver1.inventory.orders.Value”,
 “field”: “after”
 },
 {
 “type”: “struct”,
 “fields”: [
 {
 “type”: “string”,
 “optional”: false,
 “field”: “version”
 },
 {
 “type”: “string”,
 “optional”: false,
 “field”: “connector”
 },
 {
 “type”: “string”,
 “optional”: false,
 “field”: “name”
 },
 {
 “type”: “int64”,
 “optional”: false,
 “field”: “ts_ms”
 },
 {
 “type”: “string”,
 “optional”: true,
 “name”: “io.debezium.data.Enum”,
 “version”: 1,
 “parameters”: {
 “allowed”: “true,last,false,incremental”
 },
 “default”: “false”,
 “field”: “snapshot”
 },
 {
 “type”: “string”,
 “optional”: false,
 “field”: “db”
 },
 {
 “type”: “string”,
 “optional”: true,
 “field”: “sequence”
 },
 {
 “type”: “string”,
 “optional”: false,
 “field”: “schema”
 },
 {
 “type”: “string”,
 “optional”: false,
 “field”: “table”
 },
 {
 “type”: “int64”,
 “optional”: true,
 “field”: “txId”
 },
 {
 “type”: “int64”,
 “optional”: true,
 “field”: “lsn”
 },
 {
 “type”: “int64”,
 “optional”: true,
 “field”: “xmin”
 }
 ],
 “optional”: false,
 “name”: “io.debezium.connector.postgresql.Source”,
 “field”: “source”
 },
 {
 “type”: “string”,
 “optional”: false,
 “field”: “op”
 },
 {
 “type”: “int64”,
 “optional”: true,
 “field”: “ts_ms”
 },
 {
 “type”: “struct”,
 “fields”: [
 {
 “type”: “string”,
 “optional”: false,
 “field”: “id”
 },
 {
 “type”: “int64”,
 “optional”: false,
 “field”: “total_order”
 },
 {
 “type”: “int64”,
 “optional”: false,
 “field”: “data_collection_order”
 }
 ],
 “optional”: true,
 “name”: “event.block”,
 “version”: 1,
 “field”: “transaction”
 }
 ],
 “optional”: false,
 “name”: “dbserver1.inventory.orders.Envelope”,
 “version”: 1
 },
 “payload”: {
 “before”: null,
 “after”: {
 “id”: 10003,
 “order_date”: 16850,
 “purchaser”: 1002,
 “quantity”: 2,
 “product_id”: 106
 },
 “source”: {
 “version”: “2.4.0-SNAPSHOT”,
 “connector”: “postgresql”,
 “name”: “dbserver1”,
 “ts_ms”: 1695631605204,
 “snapshot”: “incremental”,
 “db”: “postgres”,
 “sequence”: “[“34837776”,“34837776”]”,
 “schema”: “inventory”,
 “table”: “orders”,
 “txId”: null,
 “lsn”: null,
 “xmin”: null
 },
 “op”: “r”,
 “ts_ms”: 1695631605204,
 “transaction”: null
 }
 }
 {
 “schema”: {
 “type”: “struct”,
 “fields”: [
 {
 “type”: “struct”,
 “fields”: [
 {
 “type”: “int32”,
 “optional”: false,
 “default”: 0,
 “field”: “id”
 },
 {
 “type”: “int32”,
 “optional”: false,
 “name”: “io.debezium.time.Date”,
 “version”: 1,
 “field”: “order_date”
 },
 {
 “type”: “int32”,
 “optional”: false,
 “field”: “purchaser”
 },
 {
 “type”: “int32”,
 “optional”: false,
 “field”: “quantity”
 },
 {
 “type”: “int32”,
 “optional”: false,
 “field”: “product_id”
 }
 ],
 “optional”: true,
 “name”: “dbserver1.inventory.orders.Value”,
 “field”: “before”
 },
 {
 “type”: “struct”,
 “fields”: [
 {
 “type”: “int32”,
 “optional”: false,
 “default”: 0,
 “field”: “id”
 },
 {
 “type”: “int32”,
 “optional”: false,
 “name”: “io.debezium.time.Date”,
 “version”: 1,
 “field”: “order_date”
 },
 {
 “type”: “int32”,
 “optional”: false,
 “field”: “purchaser”
 },
 {
 “type”: “int32”,
 “optional”: false,
 “field”: “quantity”
 },
 {
 “type”: “int32”,
 “optional”: false,
 “field”: “product_id”
 }
 ],
 “optional”: true,
 “name”: “dbserver1.inventory.orders.Value”,
 “field”: “after”
 },
 {
 “type”: “struct”,
 “fields”: [
 {
 “type”: “string”,
 “optional”: false,
 “field”: “version”
 },
 {
 “type”: “string”,
 “optional”: false,
 “field”: “connector”
 },
 {
 “type”: “string”,
 “optional”: false,
 “field”: “name”
 },
 {
 “type”: “int64”,
 “optional”: false,
 “field”: “ts_ms”
 },
 {
 “type”: “string”,
 “optional”: true,
 “name”: “io.debezium.data.Enum”,
 “version”: 1,
 “parameters”: {
 “allowed”: “true,last,false,incremental”
 },
 “default”: “false”,
 “field”: “snapshot”
 },
 {
 “type”: “string”,
 “optional”: false,
 “field”: “db”
 },
 {
 “type”: “string”,
 “optional”: true,
 “field”: “sequence”
 },
 {
 “type”: “string”,
 “optional”: false,
 “field”: “schema”
 },
 {
 “type”: “string”,
 “optional”: false,
 “field”: “table”
 },
 {
 “type”: “int64”,
 “optional”: true,
 “field”: “txId”
 },
 {
 “type”: “int64”,
 “optional”: true,
 “field”: “lsn”
 },
 {
 “type”: “int64”,
 “optional”: true,
 “field”: “xmin”
 }
 ],
 “optional”: false,
 “name”: “io.debezium.connector.postgresql.Source”,
 “field”: “source”
 },
 {
 “type”: “string”,
 “optional”: false,
 “field”: “op”
 },
 {
 “type”: “int64”,
 “optional”: true,
 “field”: “ts_ms”
 },
 {
 “type”: “struct”,
 “fields”: [
 {
 “type”: “string”,
 “optional”: false,
 “field”: “id”
 },
 {
 “type”: “int64”,
 “optional”: false,
 “field”: “total_order”
 },
 {
 “type”: “int64”,
 “optional”: false,
 “field”: “data_collection_order”
 }
 ],
 “optional”: true,
 “name”: “event.block”,
 “version”: 1,
 “field”: “transaction”
 }
 ],
 “optional”: false,
 “name”: “dbserver1.inventory.orders.Envelope”,
 “version”: 1
 },
 “payload”: {
 “before”: null,
 “after”: {
 “id”: 10004,
 “order_date”: 16852,
 “purchaser”: 1003,
 “quantity”: 1,
 “product_id”: 107
 },
 “source”: {
 “version”: “2.4.0-SNAPSHOT”,
 “connector”: “postgresql”,
 “name”: “dbserver1”,
 “ts_ms”: 1695631605204,
 “snapshot”: “incremental”,
 “db”: “postgres”,
 “sequence”: “[“34837776”,“34837776”]”,
 “schema”: “inventory”,
 “table”: “orders”,
 “txId”: null,
 “lsn”: null,
 “xmin”: null
 },
 “op”: “r”,
 “ts_ms”: 1695631605204,
 “transaction”: null
 }
 }
 就这样!我们已经使用jmx通道发送了一个增量快照信号。
通过jmx通道监控增量快照进展
 由于我们已经执行了一个增量快照,现在我们可以通过JMDA通道读取Debezns生成的通知。
我们使用下列配置注册连接器
{
 “name”: “inventory-connector”,
 “config”: {
 “connector.class”: “io.debezium.connector.postgresql.PostgresConnector”,
 “tasks.max”: “1”,
 “database.hostname”: “postgres”,
 “database.port”: “5432”,
 “database.user”: “postgres”,
 “database.password”: “postgres”,
 “database.server.id”: “184054”,
 “database.dbname”: “postgres”,
 “topic.prefix”: “dbserver1”,
 “snapshot.mode”: “NEVER”,
 “schema.history.internal.kafka.bootstrap.servers”: “kafka:9092”,
 “schema.history.internal.kafka.topic”: “schema-changes.inventory”,
 “signal.enabled.channels”: “source,jmx”,
 “signal.data.collection”: “inventory.debezium_signal”,
 “notification.enabled.channels”: “jmx”
 }
 }
 这种配置使 Jmx 通知频道。
 要访问该通知,我们需要再次连接到JDB2服务器。就像我们为信号所做的那样,我们将使用jmxterm
java -jar jmxterm-1.0.4-uber.jar
open localhost:9012
beans -d debezium.postgres
get -b debezium.postgres:context=notifications,server=dbserver1,type=management Notifications
 帮我查一下客户
 打开一个连接到JDB2服务器
 搜寻豆下 德贝齐姆。 领域
 得到通知。
 你应该期待下面的输出
#mbean = debezium.postgres:context=notifications,server=dbserver1,type=management:
 Notifications = [ {
 additionalData = {
 ( connector_name ) = {
 key = connector_name;
 value = dbserver1;
 };
 };
 aggregateType = Initial Snapshot;
 id = b20bec8d-f21f-4d74-bb75-cdd7f4c7d933;
 type = SKIPPED;
 },
 {
 additionalData = {
 ( connector_name ) = {
 key = connector_name;
 value = dbserver1;
 };
 ( data_collections ) = {
 key = data_collections;
 value = inventory.orders;
 };
 };
 aggregateType = Incremental Snapshot;
 id = 12345;
 type = STARTED;
 },
 {
 additionalData = {
 ( current_collection_in_progress ) = {
 key = current_collection_in_progress;
 value = inventory.orders;
 };
 ( connector_name ) = {
 key = connector_name;
 value = dbserver1;
 };
 ( maximum_key ) = {
 key = maximum_key;
 value = 10004;
 };
 ( last_processed_key ) = {
 key = last_processed_key;
 value = 10004;
 };
 ( data_collections ) = {
 key = data_collections;
 value = inventory.orders;
 };
 };
 aggregateType = Incremental Snapshot;
 id = 12345;
 type = IN_PROGRESS;
 },
 {
 additionalData = {
 ( scanned_collection ) = {
 key = scanned_collection;
 value = inventory.orders;
 };
 ( connector_name ) = {
 key = connector_name;
 value = dbserver1;
 };
 ( total_rows_scanned ) = {
 key = total_rows_scanned;
 value = 4;
 };
 ( status ) = {
 key = status;
 value = SUCCEEDED;
 };
 ( data_collections ) = {
 key = data_collections;
 value = inventory.orders;
 };
 };
 aggregateType = Incremental Snapshot;
 id = 12345;
 type = TABLE_SCAN_COMPLETED;
 },
 {
 additionalData = {
 ( connector_name ) = {
 key = connector_name;
 value = dbserver1;
 };
 };
 aggregateType = Incremental Snapshot;
 id = 12345;
 type = COMPLETED;
 }
 ];
 这是一个来自初始快照的通知,带有状态SKIPPED 因为我们的连接器配置了"snapshot.mode": “NEVER”
 这是关于增量快照启动的通知
 这个通知告诉我们inventory.orders 快照正在进行中,提供了关于最后处理和最大键的有用信息。在这个例子中,我们只有一个in progress 通知,但取决于你的桌子尺寸snapshot.fetch.size ,你可以得到更多。
 此通知告知特定表的快照已经完成,并提供了所处理的全部行的信息。
 对于这个示例,这是我们有的最后一个通知,它告诉我们整个增量快照进展已经完成。
 jmx还提供了生成自己的通知的可能性。德贝唑也会产生这些通知。您可以订阅这些通知,因此您可以在不投票的情况下立即接收这些通知。 通知书 豆。
 利用亚洛基亚
 JLOLIA是一个功能强大的工具,可以让您与JDB2服务器进行交互,并通过REST来公开它。使用它,我们可以通过REST与Debezns进行交互,利用信号和通知的jmx豆。这样,您可以无缝地发送信号和接收通知,并使用更熟悉的RESTAPI。
要启用Joloya,我们需要启用它在我们卡夫卡连接容器上的代理。
这是我们示例中使用的码头组合文件
version: ‘2’
 services:
 zookeeper:
 image: quay.io/debezium/zookeeper:2.4
 ports:
 - 2181:2181
 - 2888:2888
 - 3888:3888
 kafka:
 image: quay.io/debezium/kafka:2.4
 ports:
 - 9092:9092
 links:
 - zookeeper
 environment:
 - ZOOKEEPER_CONNECT=zookeeper:2181
 postgres:
 image: quay.io/debezium/example-postgres:2.4
 ports:
 - 5432:5432
 environment:
 - POSTGRES_USER=postgres
 - POSTGRES_PASSWORD=postgres
 connect:
 image: quay.io/debezium/connect:2.4
 ports:
 - 8083:8083
 - 9012:9012
 - 8778:8778
 links:
 - kafka
 - postgres
 environment:
 - BOOTSTRAP_SERVERS=kafka:9092
 - GROUP_ID=1
 - CONFIG_STORAGE_TOPIC=my_connect_configs
 - OFFSET_STORAGE_TOPIC=my_connect_offsets
 - STATUS_STORAGE_TOPIC=my_connect_statuses
 - JMXPORT=9012
 - JMXHOST=0.0.0.0
 - ENABLE_JOLOKIA=true
 会暴露出恐怖分子使用的港口
 这将使在我们的测试图像中已经装运的Joloia代理。如果您想在您的安装上启用代理,请检查 正式文件
 通过约洛基亚发出信号
 若要通过JOLOKIA发送信号,我们可以向具有所需信号和参数的JOLOKIA端点发送一个HTTP邮件请求。
要继续使用增量快照示例,要触发它,您可以运行以下命令
curl -X POST ‘http://localhost:8778/jolokia/exec’ -d ‘{“type”:“EXEC”,“mbean”:“debezium.postgres:context=signals,server=dbserver1,type=management”,“operation”:“signal”,“arguments”:[“12345”,“execute-snapshot”,"{“data-collections”: [“inventory.products”], “type”: “INCREMENTAL”}"]}’ | jq
 它应该是
{
 “request”: {
 “mbean”: “debezium.postgres:context=signals,server=dbserver1,type=management”,
 “arguments”: [
 “12345”,
 “execute-snapshot”,
 “{“data-collections”: [“inventory.products”], “type”: “INCREMENTAL”}”
 ],
 “type”: “exec”,
 “operation”: “signal”
 },
 “value”: null,
 “timestamp”: 1695651387,
 “status”: 200
 }
 接收通知书
 还允许您使用httpGET请求从Debezum获取通知。
curl -X GET ‘http://localhost:8778/jolokia/read/debezium.postgres:context=notifications,server=dbserver1,type=management/Notifications’ | jq
 它应该是
{
 “request”: {
 “mbean”: “debezium.postgres:context=notifications,server=dbserver1,type=management”,
 “attribute”: “Notifications”,
 “type”: “read”
 },
 “value”: [
 {
 “additionalData”: {
 “connector_name”: “dbserver1”
 },
 “id”: “b20bec8d-f21f-4d74-bb75-cdd7f4c7d933”,
 “type”: “SKIPPED”,
 “aggregateType”: “Initial Snapshot”
 },
 {
 “additionalData”: {
 “connector_name”: “dbserver1”,
 “data_collections”: “inventory.orders”
 },
 “id”: “12345”,
 “type”: “STARTED”,
 “aggregateType”: “Incremental Snapshot”
 },
 {
 “additionalData”: {
 “last_processed_key”: “10004”,
 “current_collection_in_progress”: “inventory.orders”,
 “connector_name”: “dbserver1”,
 “maximum_key”: “10004”,
 “data_collections”: “inventory.orders”
 },
 “id”: “12345”,
 “type”: “IN_PROGRESS”,
 “aggregateType”: “Incremental Snapshot”
 },
 {
 “additionalData”: {
 “scanned_collection”: “inventory.orders”,
 “connector_name”: “dbserver1”,
 “total_rows_scanned”: “4”,
 “status”: “SUCCEEDED”,
 “data_collections”: “inventory.orders”
 },
 “id”: “12345”,
 “type”: “TABLE_SCAN_COMPLETED”,
 “aggregateType”: “Incremental Snapshot”
 },
 {
 “additionalData”: {
 “connector_name”: “dbserver1”
 },
 “id”: “12345”,
 “type”: “COMPLETED”,
 “aggregateType”: “Incremental Snapshot”
 },
 {
 “additionalData”: {
 “connector_name”: “dbserver1”,
 “data_collections”: “inventory.products”
 },
 “id”: “12345”,
 “type”: “STARTED”,
 “aggregateType”: “Incremental Snapshot”
 },
 {
 “additionalData”: {
 “last_processed_key”: “109”,
 “current_collection_in_progress”: “inventory.products”,
 “connector_name”: “dbserver1”,
 “maximum_key”: “109”,
 “data_collections”: “inventory.products”
 },
 “id”: “12345”,
 “type”: “IN_PROGRESS”,
 “aggregateType”: “Incremental Snapshot”
 },
 {
 “additionalData”: {
 “scanned_collection”: “inventory.products”,
 “connector_name”: “dbserver1”,
 “total_rows_scanned”: “9”,
 “status”: “SUCCEEDED”,
 “data_collections”: “inventory.products”
 },
 “id”: “12345”,
 “type”: “TABLE_SCAN_COMPLETED”,
 “aggregateType”: “Incremental Snapshot”
 },
 {
 “additionalData”: {
 “connector_name”: “dbserver1”
 },
 “id”: “12345”,
 “type”: “COMPLETED”,
 “aggregateType”: “Incremental Snapshot”
 }
 ],
 “timestamp”: 1695652278,
 “status”: 200
 }
 你也看到了我们也收到了inventory.products 我们通过RESTAPI发送的表增量快照
结论
 在我们系列的第三部分中,我们学习了如何启用和管理使用jmx和joloia的信令和通知。信号可以让你动态地控制Debezium的行为,而通知可以让你了解关键事件。通过利用这些功能和JOLOKIA,您可以有效地管理、监视和与您的数据流工作流交互,确保您始终控制Debezium。