FlinkSQL 实时同步 Oracle

news/2024/9/28 11:59:20

准备工作

  • Oracle 数据库(version: 11g

    • 开启归档日志

      sqlplus /nologSQL> conn /as sysdba;-- 立即关闭数据库
      SQL> shutdown immediate;-- 以mount模式启动数据库
      SQL> startup mount;-- 启用数据库归档日志模式
      SQL> alter database archivelog;-- 打开数据库,允许用户访问
      SQL> alter database open;-- 查看归档日志是否启用
      SQL> archive log list;
      

      备注:启用日志归档需要重启数据库。归档日志将占用大量的磁盘空间,因此需要定期清理过期的日志。

    • 开启增量日志

      sqlplus /nologSQL> conn /as sysdba;SQL> ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
      
    • 以 DBA 权限登录数据库,赋予用户操作权限

      sqlplus /nologSQL> conn /as sysdba;-- 创建一个名为"cdc"的用户,密码为"123456"
      SQL> CREATE USER cdc IDENTIFIED BY 123456;-- 允许"cdc"用户创建会话,即允许该用户连接到数据库。
      SQL> GRANT CREATE SESSION TO cdc;-- (不支持Oracle 11g)允许"cdc"用户在多租户数据库(CDB)中设置容器。
      -- SQL> GRANT SET CONTAINER TO cdc;-- 允许"cdc"用户查询V_$DATABASE视图,该视图包含有关数据库实例的信息。
      SQL> GRANT SELECT ON V_$DATABASE TO cdc;-- 允许"cdc"用户执行任何表的闪回操作。
      SQL> GRANT FLASHBACK ANY TABLE TO cdc;-- 允许"cdc"用户查询任何表的数据。
      SQL> GRANT SELECT ANY TABLE TO cdc;-- 允许"cdc"用户拥有SELECT_CATALOG_ROLE角色,该角色允许查询数据字典和元数据。
      SQL> GRANT SELECT_CATALOG_ROLE TO cdc;-- 允许"cdc"用户拥有EXECUTE_CATALOG_ROLE角色,该角色允许执行一些数据字典中的过程和函数。
      SQL> GRANT EXECUTE_CATALOG_ROLE TO cdc;-- 允许"cdc"用户查询任何事务。
      SQL> GRANT SELECT ANY TRANSACTION TO cdc;-- (不支持Oracle 11g)允许"cdc"用户进行数据变更追踪(LogMiner)。
      -- SQL> GRANT LOGMINING TO cdc;-- 允许"cdc"用户创建表。
      SQL> GRANT CREATE TABLE TO cdc;-- 允许"cdc"用户锁定任何表。
      SQL> GRANT LOCK ANY TABLE TO cdc;-- 允许"cdc"用户修改任何表。
      SQL> GRANT ALTER ANY TABLE TO cdc;-- 允许"cdc"用户创建序列。
      SQL> GRANT CREATE SEQUENCE TO cdc;-- 允许"cdc"用户执行DBMS_LOGMNR包中的过程。
      SQL> GRANT EXECUTE ON DBMS_LOGMNR TO cdc;-- 允许"cdc"用户执行DBMS_LOGMNR_D包中的过程。
      SQL> GRANT EXECUTE ON DBMS_LOGMNR_D TO cdc;-- 允许"cdc"用户查询V_$LOG视图,该视图包含有关数据库日志文件的信息。
      SQL> GRANT SELECT ON V_$LOG TO cdc;-- 允许"cdc"用户查询V_$LOG_HISTORY视图,该视图包含有关数据库历史日志文件的信息。
      SQL> GRANT SELECT ON V_$LOG_HISTORY TO cdc;-- 允许"cdc"用户查询V_$LOGMNR_LOGS视图,该视图包含有关LogMiner日志文件的信息。
      SQL> GRANT SELECT ON V_$LOGMNR_LOGS TO cdc;-- 允许"cdc"用户查询V_$LOGMNR_CONTENTS视图,该视图包含LogMiner日志文件的内容。
      SQL> GRANT SELECT ON V_$LOGMNR_CONTENTS TO cdc;-- 允许"cdc"用户查询V_$LOGMNR_PARAMETERS视图,该视图包含有关LogMiner的参数信息。
      SQL> GRANT SELECT ON V_$LOGMNR_PARAMETERS TO cdc;-- 允许"cdc"用户查询V_$LOGFILE视图,该视图包含有关数据库日志文件的信息。
      SQL> GRANT SELECT ON V_$LOGFILE TO cdc;-- 允许"cdc"用户查询V_$ARCHIVED_LOG视图,该视图包含已归档的数据库日志文件的信息。
      SQL> GRANT SELECT ON V_$ARCHIVED_LOG TO cdc;-- 允许"cdc"用户查询V_$ARCHIVE_DEST_STATUS视图,该视图包含有关归档目标状态的信息。
      SQL> GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO cdc;
      
  • Flink (version : 1.18.1),添加以下 jar 包到 flink/lib

    • flink-connector-jdbc-3.2.0-1.18.jar

    • flink-sql-connector-oracle-cdc-3.2-SNAPSHOT.jar

    • ojdbc8-19.3.0.0.jar

      注意,这里添加的 flink-sql-connector-oracle-cdc 的版本是 3.2-SHAPSHOT, 这是我基于源码自主构建的,因为目前最新的 3.1.0 版本在同步 Oracle 时有 bug,截至版本 3.1.0 发布,尚未对此问题进行修复,但是改动已经合并到了 Master 分支,我们只能基于源码自主构建。详见 flink cdc 源码构建过程

  • 准备待同步源端表和目标端表

    源端表:CDC_TEST.PLAYER_SOURCE

    CREATE TABLE "CDC_TEST"."PLAYER_SOURCE" ("ID" NUMBER(5,0) NOT NULL,"NAME" VARCHAR2(255 BYTE)
    );
    
  • 准备目标端表

    在目标端创建和源端同构的表:CDC_TEST.PLAYER_TARGET

    CREATE TABLE "CDC_TEST"."PLAYER_TARGET" ("ID" NUMBER(5,0) NOT NULL,"NAME" VARCHAR2(255 BYTE)
    );
    

SQL-Client 实现数据同步

使用 Flink 的 sql-client 实现数据同步

./bin/start-cluster.sh

启动 sql-client

sudo ./bin/sql-client.sh

注意,这里启动 sql-client 时需要加 sudo,不然可能会报错,参见 [启动 sql-client 报错](#启动 sql-client 报错)

数据同步任务创建

  • 创建源端表对应的逻辑表,参照 Oracle 字段类型和 FlinkSQL 字段类型的映射关系

    CREATE TABLE player_source(`ID` INT NOT NULL,`NAME` STRING,PRIMARY KEY(ID) NOT ENFORCED
    ) WITH ('connector' = 'oracle-cdc','hostname' = '10.4.45.206','port' = '1522','username' = 'username','password' = 'password','database-name' = 'cdc','schema-name' = 'CDC_TEST','table-name' = 'PLAYER_SOURCE','debezium.log.mining.strategy' = 'online_catalog','debezium.log.mining.continuous.mine' = 'true'
    );
    

    为了解决 Oracle 同步延迟的问题,添加了两个配置项:

    'debezium.log.mining.strategy'='online_catalog',
    'debezium.log.mining.continuous.mine'='true'
    
  • 创建目标端表对应的逻辑表

    CREATE TABLE player_target(`ID` INT NOT NULL,`NAME` STRING,PRIMARY KEY (`ID`) NOT ENFORCED
    ) WITH ('connector' = 'jdbc','url' = 'jdbc:oracle:thin:@//10.4.45.206:1522/cdc','username' = 'username','password' = 'password','table-name' = 'PLAYER_TARGET','sink.parallelism' = '1'
    );
    
  • 建立源端逻辑表和目标端逻辑表的连接

    INSERT INTO player_target SELECT ID, NAME FROM player_source;
    
  • 任务创建成功:

  • Flink Web 查看提交的任务:

  • 源端表中进行更新、删除操作,查看目标端表是否自动完成同步

TableAPI 实现数据同步

引入 maven 依赖:

<properties><flink.version>1.18.1</flink.version>
</properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-loader</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.1.2-1.18</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-oracle-cdc</artifactId><version>3.2-SNAPSHOT</version></dependency><dependency><groupId>cn.easyproject</groupId><artifactId>orai18n</artifactId><version>12.1.0.2.0</version></dependency>
<dependencies>

程序实现:

package test.legacy.oracle;import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class FlinkSQL {public static void main(String[] args) throws Exception {Configuration configuration = new Configuration();configuration.setInteger("rest.port", 9091);StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);// 创建源端逻辑表String createSourceTableSQL = "CREATE TABLE player_source (" +"`ID` INT NOT NULL, " +"`NAME` STRING, " +"PRIMARY KEY (`ID`) NOT ENFORCED" +") WITH (" +"'connector' = 'oracle-cdc', " +"'hostname' = '10.4.45.206', " +"'port' = '1522', " +"'username' = 'username', " +"'password' = 'password', " +"'database-name' = 'cdc', " +"'schema-name' = 'CDC_TEST', " +"'table-name' = 'PLAYER_SOURCE', " +"'scan.startup.mode' = 'latest-offset', " +"'debezium.log.mining.strategy' = 'online_catalog', " +"'debezium.log.mining.continuous.mine' = 'true'" +");";tableEnv.executeSql(createSourceTableSQL);// 创建目标端逻辑表String createSinkTableSQL = "CREATE TABLE player_target (" +"`ID` INT NOT NULL, " +"`NAME` STRING, " +"PRIMARY KEY (`ID`) NOT ENFORCED" +") WITH (" +"'connector' = 'jdbc', " +"'url' = 'jdbc:oracle:thin:@//10.4.45.206:1522/cdc', " +"'username' = 'username', " +"'password' = 'password', " +"'table-name' = 'PLAYER_TARGET', " +"'sink.parallelism' = '1'" +");";tableEnv.executeSql(createSinkTableSQL);// 建立源端逻辑表和目标端逻辑表的连接String insertSQL = "INSERT INTO player_target SELECT * FROM player_source;";StatementSet statementSet = tableEnv.createStatementSet();statementSet.addInsertSql(insertSQL);statementSet.execute();}
}

我们在 Configuration 中设置了 rest.port = 9091, 程序启动成功后,可以在浏览器打开 localhost:9091 看到提交运行的任务。

遇到的问题

启动 sql-client 报错

Exception in thread "main" org.apache.flink.table.client.SqlClientException: Could not read from command line.at org.apache.flink.table.client.cli.CliClient.getAndExecuteStatements(CliClient.java:221)at org.apache.flink.table.client.cli.CliClient.executeInteractive(CliClient.java:179)at org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:121)at org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:114)at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:169)at org.apache.flink.table.client.SqlClient.start(SqlClient.java:118)at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:228)at org.apache.flink.table.client.SqlClient.main(SqlClient.java:179)
Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.apache.flink.table.client.config.SqlClientOptionsat org.apache.flink.table.client.cli.parser.SqlClientSyntaxHighlighter.highlight(SqlClientSyntaxHighlighter.java:59)at org.jline.reader.impl.LineReaderImpl.getHighlightedBuffer(LineReaderImpl.java:3633)at org.jline.reader.impl.LineReaderImpl.getDisplayedBufferWithPrompts(LineReaderImpl.java:3615)at org.jline.reader.impl.LineReaderImpl.redisplay(LineReaderImpl.java:3554)at org.jline.reader.impl.LineReaderImpl.doCleanup(LineReaderImpl.java:2340)at org.jline.reader.impl.LineReaderImpl.cleanup(LineReaderImpl.java:2332)at org.jline.reader.impl.LineReaderImpl.readLine(LineReaderImpl.java:626)at org.apache.flink.table.client.cli.CliClient.getAndExecuteStatements(CliClient.java:194)... 7 more

是权限问题,只要在启动时添加 sudo 就可以解决:

sudo ./bin/sql-client.sh

启动后还遇到报错:

Please specify JAVA_HOME. Either in Flink config ./conf/flink-conf.yaml or as system-wide JAVA_HOME.

需要在 flink-conf.yaml 中添加:

env.java.home: /opt/java-1.8.0

table or view does not exist

Caused by: java.sql.BatchUpdateException: ORA-00942: table or view does not existat oracle.jdbc.driver.OraclePreparedStatement.executeLargeBatch(OraclePreparedStatement.java:9711)at oracle.jdbc.driver.T4CPreparedStatement.executeLargeBatch(T4CPreparedStatement.java:1447)at oracle.jdbc.driver.OraclePreparedStatement.executeBatch(OraclePreparedStatement.java:9487)at oracle.jdbc.driver.OracleStatementWrapper.executeBatch(OracleStatementWrapper.java:237)at org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl.executeBatch(FieldNamedPreparedStatementImpl.java:65)at org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor.executeBatch(TableSimpleStatementExecutor.java:64)at org.apache.flink.connector.jdbc.internal.executor.TableBufferReducedStatementExecutor.executeBatch(TableBufferReducedStatementExecutor.java:98)at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.attemptFlush(JdbcOutputFormat.java:202)at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:172)... 8 more

这是 Oracle 11g 对表名大小写敏感的问题。

如果 源端 表名存在小写字母,需要新增以下配置:

'debezium.database.tablename.case.insensitive' = 'true'

如果 目标端 表名存在小写字母,需要用双引号修正表名的大小写:

file is not a valid filed name

Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.DataException: file is not a valid field nameat com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254)at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct.getCheckType(Struct.java:261)at com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct.getString(Struct.java:158)at com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher$SchemaChangeEventReceiver.schemaChangeRecordValue(JdbcSourceEventDispatcher.java:193)at com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher$SchemaChangeEventReceiver.schemaChangeEvent(JdbcSourceEventDispatcher.java:223)at io.debezium.connector.oracle.OracleSchemaChangeEventEmitter.emitSchemaChangeEvent(OracleSchemaChangeEventEmitter.java:122)at com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher.dispatchSchemaChangeEvent(JdbcSourceEventDispatcher.java:147)at com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher.dispatchSchemaChangeEvent(JdbcSourceEventDispatcher.java:62)at io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor.dispatchSchemaChangeEventAndGetTableForNewCapturedTable(AbstractLogMinerEventProcessor.java:1018)at io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor.getTableForDataEvent(AbstractLogMinerEventProcessor.java:913)at io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor.handleDataEvent(AbstractLogMinerEventProcessor.java:835)at io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor.processRow(AbstractLogMinerEventProcessor.java:321)at com.ververica.cdc.connectors.oracle.source.reader.fetch.EventProcessorFactory$CDCMemoryLogMinerEventProcessor.processRow(EventProcessorFactory.java:151)at io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor.processResults(AbstractLogMinerEventProcessor.java:262)at io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor.process(AbstractLogMinerEventProcessor.java:198)at io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:249)... 8 more

这个是 flink-cdc 的 bug,参见 Github 关于此问题的讨论和最新进展:

https://github.com/apache/flink-cdc/pull/2315

https://github.com/apache/flink-cdc/issues/1792

截至目前,3.1 发布版本中尚未对此问题进行修复,但是改动已经合并到了 Master 分支,我们只能基于源码自主构建:

git clone https://github.com/ververica/flink-cdc-connectors.git
cd flink-cdc-connector
mvn clean install -DskipTests

打包完成后,将 flink-sql-connector-oracle-cdc-3.2-SNAPSHOT.jar 放到 flink/lib 目录下,重启 flink 和 sql-client。

Read the redoLog offset error

Caused by: org.apache.flink.util.FlinkRuntimeException: Read the redoLog offset errorat org.apache.flink.cdc.connectors.oracle.source.OracleDialect.displayCurrentOffset(OracleDialect.java:70)at org.apache.flink.cdc.connectors.oracle.source.OracleDialect.displayCurrentOffset(OracleDialect.java:52)at org.apache.flink.cdc.connectors.base.source.reader.external.AbstractScanFetchTask.execute(AbstractScanFetchTask.java:60)at org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher.lambda$submitTask$1(IncrementalSourceScanFetcher.java:99)... 3 more
Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot read the redo log position via 'SELECT CURRENT_SCN FROM V$DATABASE'. Make sure your server is correctly configuredat org.apache.flink.cdc.connectors.oracle.source.utils.OracleConnectionUtils.currentRedoLogOffset(OracleConnectionUtils.java:82)at org.apache.flink.cdc.connectors.oracle.source.OracleDialect.displayCurrentOffset(OracleDialect.java:68)... 6 more

可能是用户权限问题,没有查看 V$DATABASE 的权限。需要 DBA 给用户赋权。

Supplemental logging not configured

Caused by: io.debezium.DebeziumException: Supplemental logging not properly configured. Use: ALTER DATABASE ADD SUPPLEMENTAL LOG DATAat io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.checkDatabaseAndTableState(LogMinerStreamingChangeEventSource.java:860)at io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:173)... 7 more

提示增量日志未开启,切换至 SYS 用户,以 DBA 的权限登录数据库,为数据库启用增量日志:

sqlplus /nolog
SQL> conn /as sysdba;
SQL> ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.ryyt.cn/news/45566.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈,一经查实,立即删除!

相关文章

dotnet 将本地的 Phi-3 模型与 SemanticKernel 进行对接

在本地完成 Phi-3 模型的部署之后,即可在本地拥有一个小语言模型。本文将告诉大家如何将本地的 Phi-3 模型与 SemanticKernel 进行对接,让 SemanticKernel 使用本地小语言模型提供的能力在我大部分的博客里面,都是使用 AzureAI 和 SemanticKernel 对接,所有的数据都需要发送…

读AI新生:破解人机共存密码笔记06人工智能生态系统

读AI新生:破解人机共存密码笔记06人工智能生态系统1. 深蓝 1.1. “深蓝”的胜利虽然令人印象深刻,但它只是延续了几十年来显而易见的趋势 1.2. 国际象棋算法的基本设计是由克劳德香农在1950年提出的 1.2.1. 这一基本设计在20世纪60年代初实现了重大改进 1.2.2. 最优秀的国际象…

Ventoy工具制作启动U盘

Ventoy是一个制作可启动 U 盘的开源工具,相比于软碟通使用Ventoy你的U盘不在局限于绑定某个PE系统,你只需要把 ISO/IMG/EFI 等类型的文件拷贝到U盘里面就可以启动了,无需其他操作。你可以一次性拷贝很多个不同类型的镜像文件,Ventoy 会在启动时显示一个菜单来供你进行选择。…

在System身份运行的.NET程序中以指定的用户身份启动可交互式进程

今天在技术群里,石头哥向大家提了个问题:"如何在一个以System身份运行的.NET程序(Windows Services)中,以其它活动的用户身份启动可交互式进程(桌面应用程序、控制台程序、等带有UI和交互式体验的程序)"? 我以前有过类似的需求,是在GitLab流水线中运行带有U…

CKEditor5 自定义构建富文本编辑器!

前言 CKEditor5的编辑是一个非常好的编辑器,但其英文文档比较绕眼睛,所以特地记录一下,如何使用自定义构建。 1、Online Bulider、Source Building 此为官方提供的,不适合我等现代构建方式。 自定义构建文档 2、自定义构建,在项目直接创建一个全新的0开始的编辑器。 此次,…

验证码识别

import ddddocrocr = ddddocr.DdddOcr()with open(img/验证码3.png, rb) as f:img_bytes = f.read()result = ocr.classification(img_bytes) print(result) 运行结果:

VIP视频解析

效果图 新建窗口import tkinter as tk# 创建一个窗口 root = tk.Tk()# 设置窗口大小 root.geometry(700x250+200+200)# 设置标题 root.title(在线观看电影软件)# 让窗口持续展现 root.mainloop() 设置背景图片# 设置读取一张图片 img = tk.PhotoImage(file=img\\封面.png)# 布局…

源代码安全漏洞扫描

构建一个应用程序,并始终确保应用程序其安全性的话,事实上构建应用程序的时候需要花大量的工作,一个步骤没有检查就可能导致整个系统或者产品都处于受黑客攻击的危险之中,谁不希望在产品发布初期就发现安全漏洞并且修复漏洞,那何乐而不为呢! 源代码安全漏洞扫描工具 可以…