Debezium日常分享系列之:Debezium 通知

Debezium日常分享系列之:Debezium 通知

  • 一、概论
  • 二、Debezium通知格式
  • 三、Debezium 有关初始快照状态的通知
  • 四、Debezium 有关增量快照进度的通知
  • 五、启用 Debezium 通知
  • 六、访问 Debezium JMX 通知
  • 七、自定义通知渠道
  • 八、应用案例

一、概论

Debezium 通知提供了一种获取有关连接器状态信息的机制。通知可以发送到以下渠道:

  • 接收器通知通道:通过 Connect API 将通知发送到配置的主题。
  • 日志通知通道:通知会附加到日志中。
  • JmxNotificationChannel:通知作为 JMX bean 中的属性公开。
  • 定制:通知将发送到您实施的自定义渠道。

二、Debezium通知格式

通知消息包含以下信息:

属性描述
id分配给通知的唯一标识符。对于增量快照通知,id 与使用执行快照信号发送的相同。
aggregate_type快照类型
type提供有关在aggregate_type 字段中指定的事件的状态信息。
additional_data包含有关通知的详细信息的 Map<String,String>。
timestamp创建通知的时间。 Epoch unix 时间戳(以毫秒为单位)

三、Debezium 有关初始快照状态的通知

以下示例显示了提供初始快照状态的典型通知:

{
    "id": "5563ae14-49f8-4579-9641-c1bbc2d76f99",
    "aggregate_type": "Initial Snapshot",
    "type": "COMPLETED", 
    "additional_data" : {
        "connector_name": "myConnector"
    },
    "timestamp": "1695817046353"
}

类型字段可以包含以下值之一:

  • STARTED
  • IN_PROGRESS
  • TABLE_SCAN_COMPLETED
  • COMPLETED
  • ABORTED
  • SKIPPED

下表显示了报告初始快照状态的通知中可能存在的不同负载的示例:

  • STARTED
{
      "id":"ff81ba59-15ea-42ae-b5d0-4d74f1f4038f",
      "aggregate_type":"Initial Snapshot",
      "type":"STARTED",
      "additional_data":{
         "connector_name":"my-connector"
      },
      "timestamp": "1695817046353"
}
  • IN_PROGRESS
{
   "id":"6d82a3ec-ba86-4b36-9168-7423b0dd5c1d",
   "aggregate_type":"Initial Snapshot",
   "type":"IN_PROGRESS",
   "additional_data":{
      "connector_name":"my-connector",
      "data_collections":"table1, table2",
      "current_collection_in_progress":"table1"
   },
   "timestamp": "1695817046353"
}

Mongo 连接器当前不支持字段 data_collection

  • TABLE_SCAN_COMPLETED
   "id":"6d82a3ec-ba86-4b36-9168-7423b0dd5c1d",
   "aggregate_type":"Initial Snapshot",
   "type":"TABLE_SCAN_COMPLETED",
   "additional_data":{
      "connector_name":"my-connector",
      "data_collection":"table1, table2",
      "scanned_collection":"table1",
      "total_rows_scanned":"100",
      "status":"SUCCEEDED"
   },
   "timestamp": "1695817046353"
}

在前面的示例中,additional_data.status 字段可以包含以下值之一:

SQL_异常:执行快照时发生 SQL 异常。

成功了:快照成功完成。

Mongo 连接器当前不支持字段total_rows_scanned 和data_collection

  • COMPLETED
{
      "id":"ff81ba59-15ea-42ae-b5d0-4d74f1f4038f",
      "aggregate_type":"Initial Snapshot",
      "type":"COMPLETED",
      "additional_data":{
         "connector_name":"my-connector"
      },
      "timestamp": "1695817046353"
}
  • ABORTED
{
      "id":"ff81ba59-15ea-42ae-b5d0-4d74f1f4038f",
      "aggregate_type":"Initial Snapshot",
      "type":"ABORTED",
      "additional_data":{
         "connector_name":"my-connector"
      },
      "timestamp": "1695817046353"
}
  • SKIPPED
{
      "id":"ff81ba59-15ea-42ae-b5d0-4d74f1f4038f",
      "aggregate_type":"Initial Snapshot",
      "type":"SKIPPED",
      "additional_data":{
         "connector_name":"my-connector"
      },
      "timestamp": "1695817046353"
}

四、Debezium 有关增量快照进度的通知

下表显示了报告增量快照状态的通知中可能存在的不同负载的示例:

  • Start
{
      "id":"ff81ba59-15ea-42ae-b5d0-4d74f1f4038f",
      "aggregate_type":"Incremental Snapshot",
      "type":"STARTED",
      "additional_data":{
         "connector_name":"my-connector",
         "data_collections":"table1, table2"
      },
      "timestamp": "1695817046353"
}
  • Paused
{
      "id":"068d07a5-d16b-4c4a-b95f-8ad061a69d51",
      "aggregate_type":"Incremental Snapshot",
      "type":"PAUSED",
      "additional_data":{
         "connector_name":"my-connector",
         "data_collections":"table1, table2"
      },
      "timestamp": "1695817046353"
}
  • Resumed
{
   "id":"a9468204-769d-430f-96d2-b0933d4839f3",
   "aggregate_type":"Incremental Snapshot",
   "type":"RESUMED",
   "additional_data":{
      "connector_name":"my-connector",
      "data_collections":"table1, table2"
   },
   "timestamp": "1695817046353"
}
  • Stopped
{
   "id":"83fb3d6c-190b-4e40-96eb-f8f427bf482c",
   "aggregate_type":"Incremental Snapshot",
   "type":"ABORTED",
   "additional_data":{
      "connector_name":"my-connector"
   },
   "timestamp": "1695817046353"
}
  • Processing chunk
{
   "id":"d02047d6-377f-4a21-a4e9-cb6e817cf744",
   "aggregate_type":"Incremental Snapshot",
   "type":"IN_PROGRESS",
   "additional_data":{
      "connector_name":"my-connector",
      "data_collections":"table1, table2",
      "current_collection_in_progress":"table1",
      "maximum_key":"100",
      "last_processed_key":"50"
   },
   "timestamp": "1695817046353"
}
  • Snapshot completed for a table
{
   "id":"6d82a3ec-ba86-4b36-9168-7423b0dd5c1d",
   "aggregate_type":"Incremental Snapshot",
   "type":"TABLE_SCAN_COMPLETED",
   "additional_data":{
      "connector_name":"my-connector",
      "data_collection":"table1, table2",
      "scanned_collection":"table1",
      "total_rows_scanned":"100",
      "status":"SUCCEEDED"
   },
   "timestamp": "1695817046353"
}

在前面的示例中,additional_data.status 字段可以包含以下值之一:

EMPTY:该表不包含任何值。

NO_PRIMARY_KEY:无法完成快照;表没有主键。

SKIPPED:无法完成此类表的快照。有关详细信息,请参阅日志。

SQL_EXCEPTION:执行快照时发生 SQL 异常。

SUCCEEDED:快照成功完成。

UNKNOWN_SCHEMA:找不到该表的架构。检查日志中已知表的列表。

  • Completed
{
   "id":"6d82a3ec-ba86-4b36-9168-7423b0dd5c1d",
   "aggregate_type":"Incremental Snapshot",
   "type":"COMPLETED",
   "additional_data":{
      "connector_name":"my-connector"
   },
   "timestamp": "1695817046353"
}

五、启用 Debezium 通知

要使 Debezium 能够发出通知,请通过设置 notification.enabled.channels 配置属性来指定通知通道列表。默认情况下,以下通知渠道可用:

  • sink
  • log
  • jmx

重要的:要使用接收器通知通道,还必须将 notification.sink.topic.name 配置属性设置为希望 Debezium 发送通知的主题的名称。

六、访问 Debezium JMX 通知

要使 Debezium 能够报告通过 JMX beans 公开的事件,请完成以下配置步骤:

  • 启用 JMX MBean 服务器以公开通知 bean。
  • 将 jmx 添加到连接器配置中的 notification.enabled.channels 属性中。
  • 将首选的 JMX 客户端连接到 MBean 服务器。

通知通过名称为 debezium..management.notifications. 的 bean 的“Notifications”属性公开。

下图显示了报告增量快照开始的通知:

在这里插入图片描述
要放弃通知,请对 bean 调用重置操作。

通知还公开为 debezium.notification 类型的 JMX 通知。要使应用程序能够侦听 MBean 发出的 JMX 通知,请为应用程序订阅通知。

七、自定义通知渠道

通知机制被设计为可扩展的。可以根据需要实施渠道,以最适合的环境的方式传递通知。添加通知通道涉及几个步骤:

  • 为通道创建一个Java项目来实现通道,并添加Debezium Core作为依赖项。
  • 部署通知通道。
  • 通过修改连接器配置,使连接器能够使用自定义通知通道。

配置自定义通知渠道

自定义通知通道是实现 io.debezium.pipeline.notification.channels.NotificationChannel 服务提供者接口 (SPI) 的 Java 类。例如:

public interface NotificationChannel {

    String name(); 1

    void init(CommonConnectorConfig config); 2

    void send(Notification notification); 3

    void close(); 4
}
  • 频道的名称。要使 Debezium 能够使用该通道,请在连接器的 notification.enabled.channels 属性中指定此名称。
  • 初始化通道所需的特定配置、变量或连接。
  • 在频道上发送通知。 Debezium 调用此方法来报告其状态。
  • 关闭所有分配的资源。 Debezium 在连接器停止时调用此方法。

Debezium 核心模块依赖项

自定义通知通道 Java 项目具有对 Debezium 核心模块的编译依赖项。必须将这些编译依赖项包含在项目的 pom.xml 文件中,如以下示例所示:

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-core</artifactId>
    <version>${version.debezium}</version> 
</dependency>

${version.debezium} 表示 Debezium 连接器的版本。

在 META-INF/services/io.debezium.pipeline.notification.channels.NotificationChannel 文件中声明您的实现。

部署自定义通知渠道

先决条件:有一个自定义通知通道 Java 程序。

程序:要将通知通道与 Debezium 连接器结合使用,请将 Java 项目导出到 JAR 文件,然后将该文件复制到包含要与其一起使用的每个 Debezium 连接器的 JAR 文件的目录。

例如,在典型部署中,Debezium 连接器文件存储在 Kafka Connect 目录 (/kafka/connect) 的子目录中,每个连接器 JAR 位于其自己的子目录中 (/kafka/connect/debezium-connector-db2、/kafka /connect/debezium-connector-mysql 等)。要将信号通道与连接器一起使用,请将转换器 JAR 文件添加到连接器的子目录中。

注意:要将自定义通知通道与多个连接器一起使用,必须将通知通道 JAR 文件的副本放置在每个连接器子目录中。

配置连接器以使用自定义通知通道:在连接器配置中,将自定义通知通道的名称添加到 notification.enabled.channels 属性中。

八、应用案例

  • Debezium系列之:实现增量快照incremental技术的详细步骤
  • Debezium系列之:基于数据库信号表和Kafka信号Topic两种技术方案实现增量快照incremental技术的详细步骤
  • Debezium系列之:深入理解临时阻塞快照

更多Debezium实战应用可以参考博主Debezium专栏:

  • Debezium专栏,Debezium实战应用详细总结

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/284129.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

MySql——1146 - Table‘mysql.proc‘doesn‘t exit是这个

项目场景&#xff1a; 做自己的小项目需要连接mysql数据库 问题描述 点击数据库时报错 1146 - Table’mysql.proc’doesn’t exit 原因分析&#xff1a; 误删原生的mysql数据库 解决方案&#xff1a; 重新安装装部署mysql就好了 注意不要轻易删除原生的东西

JMeter(十六)-JMeter断言

十六、JMeter断言 1.简介 断言组件用来对服务器的响应数据做验证&#xff0c;常用的断言是响应断言&#xff0c;其支持正则表达式。虽然我们的通过响应断言能够完成绝大多数的结果验证工作&#xff0c;但是JMeter还是为我们提供了适合多个场景的断言元件&#xff0c;辅助我们来…

[Vulnhub靶机] DriftingBlues: 1

[Vulnhub靶机] DriftingBlues: 1靶机渗透思路及方法&#xff08;个人分享&#xff09; 靶机下载地址&#xff1a; https://download.vulnhub.com/driftingblues/driftingblues.ova 靶机地址&#xff1a;192.168.67.20 攻击机地址&#xff1a;192.168.67.3 一、信息收集 1.使…

采用环形首尾互联互控的雪崩效应极好的Hash算法/杂凑函数RING-512设计原理详解

RING-512密码杂凑算法 黄金龙&#xff08;QQ1435271638&#xff09; 什么是Hash算法&#xff1f; Hash算法&#xff0c;又称为哈希算法、杂凑函数、散列函数、消息摘要算法。它可以将相当长&#xff08;一般不大于2^64Bit&#xff09;的输入数据经过计算生成固定长度的Hash值…

保护Word或Excel的几种方法,总有一种满足你的需求

你已经在Microsoft Word或Excel中创建了一个重要或机密文件,你希望将其保密或至少保持安全。也许你想确保只有你和某些人可以阅读或编辑它。也许你想限制某人可以对文件进行的修改类型。你甚至可以向读者保证这是最终版本。如果你知道在Word和Excel中使用哪些工具以及它们是如…

openGauss学习笔记-182 openGauss 数据库运维-升级-升级前准备与检查

文章目录 openGauss学习笔记-182 openGauss 数据库运维-升级-升级前准备与检查182.1 升级前准备与检查清单182.2 收集节点信息182.3 备份数据182.4 获取升级包182.5 健康检查182.5.1 前提条件182.5.2 操作步骤 182.6 检查数据库节点磁盘使用率182.7 检查数据库状态182.7.1 验证…

JavaWeb——前端之AjaxVue

6. 前后端交互 6.1 Ajax&#xff08;原生的&#xff09; 概念&#xff1a; Asynchronous JavaScript And XML&#xff08;异步的JavaScript和XML&#xff09; 作用&#xff1a; 数据交互&#xff1a;通过Ajax可以给服务器发送请求&#xff0c;并获取服务器响应的数据异步交…

用 print 太慢了!强烈推荐这款Python Debug工具~

作为程序员&#xff0c;我们都深知调试&#xff08;Debug&#xff09;在编程过程中的重要性。 然而&#xff0c;使用传统的"print"语句进行调试可能效率较低&#xff0c;今天&#xff0c;笔者将推荐一款独具一格的Python调试工具——Reloadium。 Reloadium为IDE添加…

Java技术栈 —— Redis的雪崩、穿透与击穿

Java技术栈 —— Redis的雪崩、穿透与击穿 〇、实验的先导条件&#xff08;NginxJmeter&#xff09;一、Redis缓存雪崩、缓存穿透、缓存击穿1.1 雪崩1.2 穿透1.3 击穿 二、Redis应用场景——高并发2.1 单机部署的高并发问题与解决&#xff08;JVM级别锁&#xff09;2.2 集群部署…

数据结构:基于数组的环形队列(循环队列)实现

1 前言 队列是一种先进先出的线性表&#xff0c;简称为FIFO。它只允许在队尾插入成员&#xff0c;在队头删除成员&#xff0c;就像现实生活中排队上车一样。 队列的实现可以通过链表或数组完成&#xff0c;一般来说都推荐使用链表来实现队列&#xff0c;使用数组实现队列时每次…

SpreadJS 集成使用案例

SpreadJS 集成案例 介绍&#xff1a; SpreadJS 基于 HTML5 标准&#xff0c;支持跨平台开发和集成&#xff0c;支持所有主流浏览器&#xff0c;无需预装任何插件或第三方组件&#xff0c;以原生的方式嵌入各类应用&#xff0c;可以与各类后端技术框架相结合。SpreadJS 以 纯前…

Java日期和时间(二)

新增的日期和时间 为什么要学习新增的日期和时间 1、代替Calendar LocalDate&#xff1a;年、月、日 LocalTime&#xff1a;时、分、秒 LocalDateTime&#xff1a;年、月、日、时、分、秒 ZoneId&#xff1a;时区 ZoneldDatetime&#xff1a;带时区的时间 2、代替Date Instan…

使用flutter开发一个简单的轮播图带指示器的组件

使用PageView开发一个带指示器的轮播图组件&#xff0c;当轮播图切换的时候&#xff0c;指示器也会跟着切换&#xff0c;切换到当前轮播图所在的索引时&#xff0c;指示器的背景色会变成蓝色&#xff0c;否则是灰色。使用了一个curIndex变量来记录当前激活的轮播图索引。并使用…

HarmonyOS资源分类与访问

资源分类与访问 应用开发过程中&#xff0c;经常需要用到颜色、字体、间距、图片等资源&#xff0c;在不同的设备或配置中&#xff0c;这些资源的值可能不同。 应用资源&#xff1a;借助资源文件能力&#xff0c;开发者在应用中自定义资源&#xff0c;自行管理这些资源在不同…

【INTEL(ALTERA)】如何使用quartus设计助理Design Assistant提高结果质量,很好的资料一定要分享!!!

大家在用quartus的时候一定遇到过超级多的警告 warning&#xff0c;甚至异常 error&#xff0c;还有无从下手的timing 。 多扇出&#xff0c;布线拥堵&#xff0c;时序违例是不是让你头疼不已&#xff1f;那你一定要看看这篇文章分享的文档和资料。 优化设计的源代码通常是提高…

CMake入门教程【基础篇】什么是CMakeLists.txt

文章目录 什么是CMakeLists.txtCMakeLists.txt的核心作用CMakeLists.txt的基本结构总结 什么是CMakeLists.txt CMakeLists.txt是一个由CMake&#xff08;一个跨平台的自动化构建系统&#xff09;使用的配置文件。这个文件用于定义软件构建的过程&#xff0c;包括编译源代码、链…

wait 和 notify 这个为什么要在synchronized 代码块中

文章目录 wait 和 notify 这个为什么要在synchronized 代码块中&#xff1f; wait 和 notify 这个为什么要在synchronized 代码块中&#xff1f; wait 和 notify 用来实现多线程之间的协调&#xff0c;wait 表示让线程进入到阻塞状态&#xff0c;notify 表示让阻塞的线程唤醒。…

Vue3+Echarts(柱状图):点击不同的按钮可以切换不同年份的数据

一、需求 在Vue3项目中&#xff0c;绘制一个柱状图&#xff1a; 柱状图会展示某一年里四个季度的销售额提供2个按钮选项&#xff0c;点击不同的按钮可以切换到不同年份的销售额&#xff0c;这里的年份指2022年以及2023年目标效果如下&#xff1a; 默认展示的是2023年的数据&a…

spring 之 事务

1、JdbcTemplate Spring 框架对 JDBC 进行封装&#xff0c;使用 JdbcTemplate 方便实现对数据库操作 1.1 准备工作 ①搭建子模块 搭建子模块&#xff1a;spring-jdbc-tx ②加入依赖 <dependencies><!--spring jdbc Spring 持久化层支持jar包--><dependency&…

病情聊天机器人,利用Neo4j图数据库和Elasticsearch全文搜索引擎相结合

项目设计目的&#xff1a; 本项目旨在开发一个病情聊天机器人&#xff0c;利用Neo4j图数据库和Elasticsearch全文搜索引擎相结合&#xff0c;实现对病情相关数据的存储、查询和自动回答。通过与用户的交互&#xff0c;机器人可以根据用户提供的症状描述&#xff0c;给出初步的可…