欢迎访问 生活随笔!

生活随笔

当前位置: 首页 >

基于debezium实时数据同步(Oracle为主)

发布时间:2024/3/13 59 豆豆
生活随笔 收集整理的这篇文章主要介绍了 基于debezium实时数据同步(Oracle为主) 小编觉得挺不错的,现在分享给大家,帮大家做个参考.

基于debezium实时数据同步

  • 全部需要下载的内容链接
  • 1、下载zookeeper-3.4.10
  • 2、下载kafka_2.13-2.8.0
  • 3、下载Kafka Connector:建议使用1.6以上版本可以对ddl进行捕获
  • 4、安装debezium-connector-oracle
    • 4.1下载debezium-connector-oracle-1.6.0.Final-plugin.tar.gz并解压,安装在自己的服务器,我的安装目录是/home/debezium/
    • 4.2、将debezium-connector-oracle 目录下得jar包都拷贝一份到${KAFKA_HOME}/libs中
    • 4.3、Oracle需要下载客户端并把jar包复制到${KAFKA_HOME}/libs
  • 5、kafka环境修改,使用集群方式配置,但其实kafka非集群搭建
  • 6、启动zookeeper、kafka,connect-distributed环境
    • 6.1.进入zookeeper目录
    • 6.2.进入kafka目录
    • 6.3.以环境配置方式启动connect-distributed
  • 7、提交Oracle-connector,监视Oracle数据库
  • 8、查看创建的kafka connector列表
  • 9、查看创建的kafka connector状态
  • 10、查看创建的kafka connector配置
  • 11、查看kafka中topic
  • 12、flinksqlclient创建表并测试
  • 附上:Oracle的归档开启
    • Oracle 开启归档日志
    • 创建 新得表空间与dbzuser,并赋予相应得权限
    • 暂时可以不用,官网有做要求,暂时没明白有什么用
  • kafka查看topic和消息内容命令
    • 1、查询topic,进入kafka目录:
    • 2、查询topic内容:

全部需要下载的内容链接

https://download.csdn.net/download/u010978399/21733452

1、下载zookeeper-3.4.10

https://blog.csdn.net/She_lock/article/details/80435176?utm_medium=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromBaidu%7Edefault-8.control&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromBaidu%7Edefault-8.control

2、下载kafka_2.13-2.8.0

kafka安装参考:

https://blog.csdn.net/weixin_39984161/article/details/91971731?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522161959594516780262520102%2522%252C%2522scm%2522%253A%252220140713.130102334..%2522%257D&request_id=161959594516780262520102&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~all~sobaiduend~default-1-91971731.pc_search_result_before_js&utm_term=linux%E5%AE%89%E8%A3%85kafka

3、下载Kafka Connector:建议使用1.6以上版本可以对ddl进行捕获

debezium-connector-mysql-1.6.0.Final-plugin.tar.gz

https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.6.0.Final/debezium-connector-mysql-1.6.0.Final-plugin.tar.gz

debezium-connector-postgres-1.6.0.Final-plugin.tar.gz

https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/1.6.0.Final/debezium-connector-postgres-1.6.0.Final-plugin.tar.gz

debezium-connector-oracle-1.6.0.Final-plugin.tar.gz

https://repo1.maven.org/maven2/io/debezium/debezium-connector-oracle/1.6.0.Final/debezium-connector-oracle-1.6.0.Final-plugin.tar.gz

4、安装debezium-connector-oracle

4.1下载debezium-connector-oracle-1.6.0.Final-plugin.tar.gz并解压,安装在自己的服务器,我的安装目录是/home/debezium/

4.2、将debezium-connector-oracle 目录下得jar包都拷贝一份到${KAFKA_HOME}/libs中

4.3、Oracle需要下载客户端并把jar包复制到${KAFKA_HOME}/libs

客户端下载地址:

https://download.oracle.com/otn_software/linux/instantclient/211000/instantclient-basic-linux.x64-21.1.0.0.0.zip

5、kafka环境修改,使用集群方式配置,但其实kafka非集群搭建

kafka安装目录:/home/kafka/kafka_2.13-2.8.0/

单机部署修改 [connect-standalone.properties]
集群部署修改 [connect-distributed.properties]

bootstrap.servers=192.168.1.121:9092 plugin.path=/home/debezium/debezium-connector-oraclegroup.id=amirdebezium // kafka connect内部信息保存到kafka时消息的序列化方式 key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=false value.converter.schemas.enable=falseinternal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false// kafka connect内部需要用到的三个topic config.storage.topic=amir-connect-configs offset.storage.topic=amir-connect-offsets status.storage.topic=amir-connect-statusesconfig.storage.replication.factor=1 offset.storage.replication.factor=1 status.storage.replication.factor=1offset.flush.interval.ms=10000 rest.advertised.host.name=192.168.1.121cleanup.policy=compact rest.host.name=192.168.1.121 rest.port=8085

6、启动zookeeper、kafka,connect-distributed环境

6.1.进入zookeeper目录

启动zookeeper

sh zkServer.sh start

停止zookeeper

sh zkServer.sh stop

6.2.进入kafka目录

启动kafka

/home/kafka/kafka_2.13-2.8.0/bin/kafka-server-start.sh /home/kafka/kafka_2.13-2.8.0/config/server.properties &

关闭kafka

/home/kafka/kafka_2.13-2.8.0/bin/kafka-server-stop.sh /home/kafka/kafka_2.13-2.8.0/config/server.properties &

6.3.以环境配置方式启动connect-distributed

加载环境

export KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:/home/kafka/kafka_2.13-2.8.0/config/connect-log4j.properties

启动

./bin/connect-distributed.sh /home/kafka/kafka_2.13-2.8.0/config/connect-distributed.properties &

末尾 一定要加上符号&是为了后台运行,这样就不会页面一关,服务就没有了

7、提交Oracle-connector,监视Oracle数据库

这个就是在liunx里面,命令直接贴进去

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://172.16.50.22:8085/connectors/ -d ' { "name": "debezium-oracle", "config": { "connector.class" : "io.debezium.connector.oracle.OracleConnector", "tasks.max" : "1", "database.server.name" : "XE", "database.hostname" : "172.16.50.239", "database.port" : "1521", "database.user" : "amir", "database.password" : "amir", "database.dbname" : "XE", "database.schema" : "MSCDW", "database.connection.adapter": "logminer", "database.tablename.case.insensitive": "true", "table.include.list" : "MSCDW.*", "snapshot.mode" : "initial", "schema.include.list" : "MSCDW", "database.history.kafka.bootstrap.servers" : "172.16.50.22:9092", "database.history.kafka.topic": "schema-changes.inventory" } }'

8、查看创建的kafka connector列表

链接:

172.16.50.22:8085/connectors

9、查看创建的kafka connector状态

链接:

172.16.50.22:8085/connectors/debezium-oracle/status

这里的debezium-oracle是上一步查出来的名称

10、查看创建的kafka connector配置

链接:

172.16.50.22:8085/connectors/debezium-oracle/config

11、查看kafka中topic

当环境搭建好之后,默认为每个表创建一个属于自己的主题,如图所示,小编这里使用的kafka Tool工具查看,注意这里的主题为XE.SCOTT.DEPT,而非XE.MSCDW.CONFIG,其实按照上述步骤应该是MSCDW,但因为在写文档的时候忘记放这块的内容,是后来才发现补的,补的时候配置是监听SCOTT库的DDL,就懒的换了。

12、flinksqlclient创建表并测试

CREATE TABLE sinkMysqlConfigTable ( ID STRING, CRON STRING ) WITH ( ‘connector.type’ = ‘jdbc’, ‘connector.url’ = ‘jdbc:mysql://IP:3306/admin’, ‘connector.table’ = ‘config’, ‘connector.username’ = ‘root’, ‘connector.password’ = ‘dhcc@2020, ‘connector.write.flush.max-rows’ =1);CREATE TABLE createOracleConfigTable ( id STRING, cron STRING ) WITH ( ‘connector’ = ‘kafka’, ‘topic’ =XE.MSCDW.CONFIG, ‘properties.bootstrap.servers’ =172.16.50.22:9092, ‘debezium-json.schema-include’ =false, ‘properties.group.id’ = ‘amirdebezium’, ‘scan.startup.mode’ = ‘earliest-offset’, ‘value.format’ = ‘debezium-json’ );

附上:Oracle的归档开启

#按要求修改,不然会报错

alter system set db_recovery_file_dest_size=5G;

Oracle 开启归档日志

#开启行模式

alter database add supplemental log data (all) columns;

创建 新得表空间与dbzuser,并赋予相应得权限

CREATE TABLESPACE LOGMINER_TBS DATAFILE '/home/oracle/app/oracle/oradata/amir/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED; CREATE USER dbzuser IDENTIFIED BY dbz DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS; GRANT CREATE SESSION TO dbzuser; GRANT SELECT ON V_$DATABASE TO dbzuser; GRANT FLASHBACK ANY TABLE TO dbzuser; GRANT SELECT ANY TABLE TO dbzuser; GRANT SELECT_CATALOG_ROLE TO dbzuser; GRANT EXECUTE_CATALOG_ROLE TO dbzuser; GRANT SELECT ANY TRANSACTION TO dbzuser; GRANT SELECT ANY DICTIONARY TO dbzuser;GRANT CREATE TABLE TO dbzuser; GRANT ALTER ANY TABLE TO dbzuser; GRANT LOCK ANY TABLE TO dbzuser; GRANT CREATE SEQUENCE TO dbzuser;GRANT EXECUTE ON DBMS_LOGMNR TO dbzuser; GRANT EXECUTE ON DBMS_LOGMNR_D TO dbzuser; GRANT SELECT ON V_$LOGMNR_LOGS to dbzuser; GRANT SELECT ON V_$LOGMNR_CONTENTS TO dbzuser; GRANT SELECT ON V_$LOGFILE TO dbzuser; GRANT SELECT ON V_$ARCHIVED_LOG TO dbzuser; GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO dbzuser;

暂时可以不用,官网有做要求,暂时没明白有什么用

CREATE USER debezium IDENTIFIED BY dbz DEFAULT TABLESPACE USERS QUOTA UNLIMITED ON USERS; GRANT CONNECT TO debezium; GRANT CREATE SESSION TO debezium; GRANT CREATE TABLE TO debezium; GRANT CREATE SEQUENCE to debezium; ALTER USER debezium QUOTA 100M on users;

kafka查看topic和消息内容命令

1、查询topic,进入kafka目录:

bin/kafka-topics.sh --list --zookeeper localhost:2181

2、查询topic内容:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topicName --from-beginning

总结

以上是生活随笔为你收集整理的基于debezium实时数据同步(Oracle为主)的全部内容,希望文章能够帮你解决所遇到的问题。

如果觉得生活随笔网站内容还不错,欢迎将生活随笔推荐给好友。