欢迎访问 生活随笔!

生活随笔

当前位置: 首页 > 运维知识 > 数据库 >内容正文

数据库

Debezium的基本使用(以MySQL为例)

发布时间:2024/3/13 数据库 69 豆豆
生活随笔 收集整理的这篇文章主要介绍了 Debezium的基本使用(以MySQL为例) 小编觉得挺不错的,现在分享给大家,帮大家做个参考.
  • GreatSQL社区原创内容未经授权不得随意使用,转载请联系小编并注明来源。
  • GreatSQL是MySQL的国产分支版本,使用上与MySQL一致。

    一、Debezium介绍

    摘自官网:

    Debezium is a set of distributed services to capture changes in your databases so that your applications can see those changes and respond to them. Debezium records all row-level changes within each database table in a change event stream, and applications simply read these streams to see the change events in the same order in which they occurred.

简单理解就是Debezium可以捕获数据库中所有行级的数据变化并包装成事件流顺序输出。

二、基本使用

下面以MySQL为例介绍Debezium的基本使用。

1. MySQL的准备工作

  • 准备一个MySQL用户并且拥有相应权限,像这样:CREATE USER 'dbz'@'%' IDENTIFIED BY 'dbzpwd';
  • GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON . TO 'dbz' IDENTIFIED BY 'dbzpwd';

    2. 检查MySQL是否开启`log-bin` ```sql SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::" FROM information_schema.global_variables WHERE variable_name='log_bin';-- If the following error occurs: The 'INFORMATION_SCHEMA.GLOBAL_VARIABLES' feature is disabled... -- please execute the given SQL again after execute this SQL: set global show_compatibility_56=on;

    如果是OFF则需要修改MySQL配置文件,类似下面这样:

    server-id = 223344 #必须有 log_bin = mysql-bin #log_bin的值是binlog文件序列的基本名称 binlog_format = ROW #必须是ROW binlog_row_image = FULL #必须是FULL expire_logs_days = 10 #依据实际情况而定
  • 准备数据库&表create database inventory; create table inventory.a (id bigint primary key auto_increment, name varchar(32)); insert into inventory.a values (null, 'n1'),(null, 'n2'),(null, 'n3');
  • 2. 编写程序

    2.1. 工程依赖(Maven)

    pom.xml

    <dependency><groupId>io.debezium</groupId><artifactId>debezium-api</artifactId><version>${version.debezium}</version> </dependency> <dependency><groupId>io.debezium</groupId><artifactId>debezium-embedded</artifactId><version>${version.debezium}</version> </dependency><dependency><groupId>io.debezium</groupId><artifactId>debezium-connector-mysql</artifactId><version>${version.debezium}</version> </dependency>

    目前Debezium最新稳定版本为:1.9.5.Final

    2.2. 准备数据库&表

    create database inventory; create table inventory.a (id bigint primary key, name varchar(32)); insert into inventory.a values (1, 'n1'),(2, 'n2'),(3, 'n3');

    2.3. 代码编写

    package com.greatdb.dbzdemo;import java.io.IOException; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit;import io.debezium.engine.ChangeEvent; import io.debezium.engine.DebeziumEngine; import io.debezium.engine.format.Json;/*** @author wang.jianwen* @version 1.0* @date 2022/07/29*/ public class DebeziumTest {private static DebeziumEngine<ChangeEvent<String, String>> engine;public static void main(String[] args) throws Exception {final Properties props = new Properties();props.setProperty("name", "dbz-engine");props.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector");//offset config begin - 使用文件来存储已处理的binlog偏移量props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");props.setProperty("offset.storage.file.filename", "/tmp/dbz/storage/mysql_offsets.dat");props.setProperty("offset.flush.interval.ms", "0");//offset config endprops.setProperty("database.server.name", "mysql-connector");props.setProperty("database.history", "io.debezium.relational.history.FileDatabaseHistory");props.setProperty("database.history.file.filename", "/tmp/dbz/storage/mysql_dbhistory.txt");props.setProperty("database.server.id", "122112"); //需要与MySQL的server-id不同props.setProperty("database.hostname", "tmg");props.setProperty("database.port", "3306");props.setProperty("database.user", "mysqluser");props.setProperty("database.password", "mysqlpw");props.setProperty("database.include.list", "inventory");//要捕获的数据库名props.setProperty("table.include.list", "inventory.a");//要捕获的数据表props.setProperty("snapshot.mode", "initial");//全量+增量// 使用上述配置创建Debezium引擎,输出样式为Json字符串格式engine = DebeziumEngine.create(Json.class).using(props).notifying(record -> {System.out.println(record);//输出到控制台}).using((success, message, error) -> {if (error != null) {// 报错回调System.out.println("------------error, message:" + message + "exception:" + error);}closeEngine(engine);}).build();ExecutorService executor = Executors.newSingleThreadExecutor();executor.execute(engine);addShutdownHook(engine);awaitTermination(executor);System.out.println("------------main finished.");}private static void closeEngine(DebeziumEngine<ChangeEvent<String, String>> engine) {try {engine.close();} catch (IOException ignored) {}}private static void addShutdownHook(DebeziumEngine<ChangeEvent<String, String>> engine) {Runtime.getRuntime().addShutdownHook(new Thread(() -> closeEngine(engine)));}private static void awaitTermination(ExecutorService executor) {if (executor != null) {try {executor.shutdown();while (!executor.awaitTermination(5, TimeUnit.SECONDS)) {}} catch (InterruptedException e) {Thread.currentThread().interrupt();}}} }

    3. 测试

    程序跑起来后,可以看到控制台输出:

    ...(省略) EmbeddedEngineChangeEvent [key={"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"}],"optional":false,"name":"mysql_connector.inventory.a.Key"},"payload":{"id":1}}, value={"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"before"},{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"mysql_connector.inventory.a.Envelope"},"payload":{"before":null,"after":{"id":1,"name":"n1"},"source":{"version":"1.8.1.Final","connector":"mysql","name":"mysql-connector","ts_ms":1659064005186,"snapshot":"true","db":"inventory","sequence":null,"table":"a","server_id":0,"gtid":null,"file":"mysql-bin.000001","pos":154,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1659064005191,"transaction":null}}, sourceRecord=SourceRecord{sourcePartition={server=mysql-connector}, sourceOffset={ts_sec=1659064005, file=mysql-bin.000001, pos=154, snapshot=true}} ConnectRecord{topic='mysql-connector.inventory.a', kafkaPartition=null, key=Struct{id=1}, keySchema=Schema{mysql_connector.inventory.a.Key:STRUCT}, value=Struct{after=Struct{id=1,name=n1},source=Struct{version=1.8.1.Final,connector=mysql,name=mysql-connector,ts_ms=1659064005186,snapshot=true,db=inventory,table=a,server_id=0,file=mysql-bin.000001,pos=154,row=0},op=r,ts_ms=1659064005191}, valueSchema=Schema{mysql_connector.inventory.a.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}] EmbeddedEngineChangeEvent [key={"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"}],"optional":false,"name":"mysql_connector.inventory.a.Key"},"payload":{"id":2}}, value={"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"before"},{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"mysql_connector.inventory.a.Envelope"},"payload":{"before":null,"after":{"id":2,"name":"n2"},"source":{"version":"1.8.1.Final","connector":"mysql","name":"mysql-connector","ts_ms":1659064005195,"snapshot":"true","db":"inventory","sequence":null,"table":"a","server_id":0,"gtid":null,"file":"mysql-bin.000001","pos":154,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1659064005196,"transaction":null}}, sourceRecord=SourceRecord{sourcePartition={server=mysql-connector}, sourceOffset={ts_sec=1659064005, file=mysql-bin.000001, pos=154, snapshot=true}} ConnectRecord{topic='mysql-connector.inventory.a', kafkaPartition=null, key=Struct{id=2}, keySchema=Schema{mysql_connector.inventory.a.Key:STRUCT}, value=Struct{after=Struct{id=2,name=n2},source=Struct{version=1.8.1.Final,connector=mysql,name=mysql-connector,ts_ms=1659064005195,snapshot=true,db=inventory,table=a,server_id=0,file=mysql-bin.000001,pos=154,row=0},op=r,ts_ms=1659064005196}, valueSchema=Schema{mysql_connector.inventory.a.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}] EmbeddedEngineChangeEvent [key={"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"}],"optional":false,"name":"mysql_connector.inventory.a.Key"},"payload":{"id":3}}, value={"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"before"},{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"mysql_connector.inventory.a.Envelope"},"payload":{"before":null,"after":{"id":3,"name":"n3"},"source":{"version":"1.8.1.Final","connector":"mysql","name":"mysql-connector","ts_ms":1659064005196,"snapshot":"last","db":"inventory","sequence":null,"table":"a","server_id":0,"gtid":null,"file":"mysql-bin.000001","pos":154,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1659064005196,"transaction":null}}, sourceRecord=SourceRecord{sourcePartition={server=mysql-connector}, sourceOffset={ts_sec=1659064005, file=mysql-bin.000001, pos=154}} ConnectRecord{topic='mysql-connector.inventory.a', kafkaPartition=null, key=Struct{id=3}, keySchema=Schema{mysql_connector.inventory.a.Key:STRUCT}, value=Struct{after=Struct{id=3,name=n3},source=Struct{version=1.8.1.Final,connector=mysql,name=mysql-connector,ts_ms=1659064005196,snapshot=last,db=inventory,table=a,server_id=0,file=mysql-bin.000001,pos=154,row=0},op=r,ts_ms=1659064005196}, valueSchema=Schema{mysql_connector.inventory.a.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}] ...(省略)

    可以看到全量的数据已经输出,关键的数据如下:

    ..."payload":{"before":null,"after":{"id":1,"name":"n1"}..."op":"r"... ..."payload":{"before":null,"after":{"id":2,"name":"n2"}..."op":"r"... ..."payload":{"before":null,"after":{"id":3,"name":"n3"}..."op":"r"...
    • 接下来新增一条数据:

      insert into inventory.a values (4, 'n4');

      控制台输出:

      ..."payload":{"before":null,"after":{"id":4,"name":"n4"}..."op":"c"...
    • 修改一条数据:

      update inventory.a set name = 'n4-upd' where id = 4;

      控制台输出:

      ..."payload":{"before":{"id":4,"name":"n4"},"after":{"id":4,"name":"n4-upd"}..."op":"u"...
    • 删除一条数据:

      delete from inventory.a where id = 1;

      控制台输出:

      ..."payload":{"before":{"id":1,"name":"n1"},"after":null..."op":"d"...

      三、总结

      本文以MySQL为例介绍了Debezium在代码中基本使用流程,对MySQL的数据进行常见的增删改操作,Debezium将捕获这些数据行的变化,并记录了数据行变化前后的数据,并对外提供事件流,外部可以获取并对事件进行相应处理。

    参考:https://debezium.io/documentation/reference/1.8/index.html

    总结

    以上是生活随笔为你收集整理的Debezium的基本使用(以MySQL为例)的全部内容,希望文章能够帮你解决所遇到的问题。

    如果觉得生活随笔网站内容还不错,欢迎将生活随笔推荐给好友。