欢迎访问 生活随笔!

生活随笔

当前位置: 首页 >

Debezium 抽取oracle数据

发布时间:2024/3/13 49 豆豆
生活随笔 收集整理的这篇文章主要介绍了 Debezium 抽取oracle数据 小编觉得挺不错的,现在分享给大家,帮大家做个参考.

1、环境介绍
操作系统:centos 7.9
jdk版本:11.0.12
kafka版本:2.8.0
Debezium版本:1.6(debezium-connector-oracle-1.6.1.Final-plugin.tar.gz)
oracle版本:19c

kafka connect 分布式部署
https://blog.csdn.net/zyj81092211/article/details/119647591
kafka connector 配置 Debezium
https://blog.csdn.net/zyj81092211/article/details/119840744

2、安装oracle
https://blog.csdn.net/zyj81092211/article/details/120082828

3、设置oracle
(1)创建目录

mkdir /u01/app/oracle/oradata/recovery_area

(2)连接oracle

sqlplus / as sysdba

(3)进行设置

alter system set db_recovery_file_dest_size = 10G; alter system set db_recovery_file_dest = '/u01/app/oracle/oradata/recovery_area' scope=spfile; shutdown immediate; startup mount; alter database archivelog; alter database open; archive log list;

(4)打开pdb

alter pluggable database orders open;


(5)、连接至pdb ORDERS

alter session set container=ORDERS;

(6)、创建用户

create user debezium identified by Smtgbk_123; grant dba to debezium;

(7)、重新链接数据库

sqlplus debezium/Smtgbk_123@localhost/orders

(8)、创建测试表

CREATE TABLE customers (id NUMBER(9) GENERATED BY DEFAULT ON NULL AS IDENTITY (START WITH 1001) NOT NULL PRIMARY KEY,first_name VARCHAR2(255) NOT NULL,last_name VARCHAR2(255) NOT NULL,email VARCHAR2(255) NOT NULL UNIQUE );

(9)、开启补充日志
开启表级补充日志

ALTER TABLE customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

开启数据库级别日志补充,在CDB中,执行以下命令

ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;

(10)、为连接器创建用户
a、使用管理员重新连接数据库

sqlplus / as sysdba

b、创建根容器表空间

CREATE TABLESPACE logminer_tbs DATAFILE '/u01/app/oracle/oradata/CDB19C/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;

c、创建PDB表空间
切换至PDB

alter session set container=ORDERS;

CREATE TABLESPACE logminer_tbs DATAFILE '/u01/app/oracle/oradata/CDB19C/ORDERS/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;

d、创建连接器的 LogMiner 用户

sqlplus / as sysdba

CREATE USER c##dbzuser IDENTIFIED BY dbz DEFAULT TABLESPACE logminer_tbs QUOTA UNLIMITED ON logminer_tbs CONTAINER=ALL; GRANT CREATE SESSION TO c##dbzuser CONTAINER=ALL; GRANT SET CONTAINER TO c##dbzuser CONTAINER=ALL; GRANT SELECT ON V_$DATABASE to c##dbzuser CONTAINER=ALL; GRANT FLASHBACK ANY TABLE TO c##dbzuser CONTAINER=ALL; GRANT SELECT ANY TABLE TO c##dbzuser CONTAINER=ALL; GRANT SELECT_CATALOG_ROLE TO c##dbzuser CONTAINER=ALL; GRANT EXECUTE_CATALOG_ROLE TO c##dbzuser CONTAINER=ALL; GRANT SELECT ANY TRANSACTION TO c##dbzuser CONTAINER=ALL; GRANT LOGMINING TO c##dbzuser CONTAINER=ALL; GRANT CREATE TABLE TO c##dbzuser CONTAINER=ALL; GRANT LOCK ANY TABLE TO c##dbzuser CONTAINER=ALL; GRANT ALTER ANY TABLE TO c##dbzuser CONTAINER=ALL; GRANT CREATE SEQUENCE TO c##dbzuser CONTAINER=ALL; GRANT EXECUTE ON DBMS_LOGMNR TO c##dbzuser CONTAINER=ALL; GRANT EXECUTE ON DBMS_LOGMNR_D TO c##dbzuser CONTAINER=ALL; GRANT SELECT ON V_$LOG TO c##dbzuser CONTAINER=ALL; GRANT SELECT ON V_$LOG_HISTORY TO c##dbzuser CONTAINER=ALL; GRANT SELECT ON V_$LOGMNR_LOGS TO c##dbzuser CONTAINER=ALL; GRANT SELECT ON V_$LOGMNR_CONTENTS TO c##dbzuser CONTAINER=ALL; GRANT SELECT ON V_$LOGMNR_PARAMETERS TO c##dbzuser CONTAINER=ALL; GRANT SELECT ON V_$LOGFILE TO c##dbzuser CONTAINER=ALL; GRANT SELECT ON V_$ARCHIVED_LOG TO c##dbzuser CONTAINER=ALL; GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO c##dbzuser CONTAINER=ALL;

4、获取 Oracle JDBC 驱动程序
下载地址
https://www.oracle.com/database/technologies/instant-client/linux-x86-64-downloads.html

将软件包中ojdbc8.jar的上传到kafka connector集群所有节点libs文件夹内


杀死kafka 进程重启

kafka-server-start.sh -daemon /data/kafka-connect/config/server.properties connect-distributed.sh -daemon /data/kafka-connect/config/connect-distributed.properties

5、Debezium Oracle 连接器配置
官方示例:

curl -H "Content-Type: application/json" -X POST -d '{"name": "source-oracle202", "config": {"connector.class" : "io.debezium.connector.oracle.OracleConnector","tasks.max" : "1","database.server.name" : "oracle202","database.hostname" : "10.99.99.202","database.port" : "1521","database.user" : "c##dbzuser","database.password" : "dbz","database.dbname" : "CDB19C","database.pdb.name" : "ORDERS","database.history.kafka.bootstrap.servers" : "kafkac01.wtown.com:9092,kafkac02.wtown.com:9092,kafkac03.wtown.com:9092","database.history.kafka.topic": "schema-changes.orders"} }' http://kafkac01.wtown.com:8083/connectors/

查看状态正常

查看topic

6、测试
插入数据

INSERT INTO "DEBEZIUM"."CUSTOMERS" ("ID", "FIRST_NAME", "LAST_NAME", "EMAIL") VALUES ('1', 'zhang', 'san', 'zhagnsan@163.com');

查看topic中多了一个oracle202.DEBEZIUM.CUSTOMERS

消费oracle202.DEBEZIUM.CUSTOMERS

kafka-console-consumer.sh --bootstrap-server kafkac01.wtown.com:9092,kafkac02.wtown.com:9092,kafkac03.wtown.com:9092 --topic oracle202.DEBEZIUM.CUSTOMERS --from-beginning

刚才插入的数据已经抓取到

注:创建non cdb数据库参考官方文档
官方文档:
https://debezium.io/documentation/reference/1.6/connectors/oracle.html

总结

以上是生活随笔为你收集整理的Debezium 抽取oracle数据的全部内容,希望文章能够帮你解决所遇到的问题。

如果觉得生活随笔网站内容还不错,欢迎将生活随笔推荐给好友。