新聞中心
這里有您想知道的互聯(lián)網營銷解決方案
有人用FlinkCDC同步Oracle成功的嗎?
是的,有人使用Flink CDC成功同步Oracle數(shù)據(jù)庫。Flink CDC是一種基于流式變更數(shù)據(jù)捕獲(Change Data Capture)技術的數(shù)據(jù)同步工具,可以實時捕獲源數(shù)據(jù)庫中的數(shù)據(jù)變更并同步到目標數(shù)據(jù)庫中。
使用Flink CDC同步Oracle數(shù)據(jù)庫的詳細步驟

專注于為中小企業(yè)提供成都網站制作、成都網站建設、外貿營銷網站建設服務,電腦端+手機端+微信端的三站合一,更高效的管理,為中小企業(yè)吳橋免費做網站提供優(yōu)質的服務。我們立足成都,凝聚了一批互聯(lián)網行業(yè)人才,有力地推動了1000多家企業(yè)的穩(wěn)健成長,幫助中小企業(yè)通過網站建設實現(xiàn)規(guī)模擴充和轉變。
環(huán)境準備
1、安裝并配置Oracle數(shù)據(jù)庫,確保有訪問權限。
2、安裝Apache Flink,版本要求為1.13及以上。
3、下載并添加Flinkconnectororacle依賴到項目中。
創(chuàng)建Flink流處理程序
1、引入相關依賴。
org.apache.flink flinkconnectororacle_2.11 1.13.2
2、編寫Flink流處理程序。
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.catalog.debezium.DebeziumOptions;
import org.apache.flink.table.catalog.debezium.DebeziumTableFactory;
import org.apache.flink.table.descriptors.*;
import org.apache.flink.types.Row;
public class FlinkCDCOracle {
public static void main(String[] args) throws Exception {
// 創(chuàng)建流處理執(zhí)行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
// 注冊Oracle源表
String sourceDDL = "CREATE TABLE oracle_source ( " +
" id INT NOT NULL," +
" name STRING," +
" age INT," +
" PRIMARY KEY (id) NOT ENFORCED" +
") WITH (" +
" 'connector' = 'oracle'," +
" 'username' = 'your_username'," +
" 'password' = 'your_password'," +
" 'database' = 'your_database'," +
" 'table' = 'your_table'," +
" 'debeziuminternal.offsetstorage' = 'org.apache.kafka.connect.storage.FileOffsetBackingStore'," +
" 'debeziuminternal.offsetstorage.file.filename' = '/path/to/offset/storage/file'," +
" 'debeziuminternal.offsetstorage.topic' = 'your_offset_topic'," +
" 'debeziuminternal.keyconverter' = 'org.apache.kafka.connect.json.JsonConverter'," +
" 'debeziuminternal.valueconverter' = 'org.apache.kafka.connect.json.JsonConverter'," +
" 'debeziuminternal.schemainclude' = 'false'," +
" 'debeziuminternal.decimalhandling' = 'double'," +
" 'debeziuminternal.timestamphandling' = 'ms'," +
" 'debeziuminternal.pkmode' = 'none'," +
" 'debeziuminternal.databasehistory' = 'io.debezium:type=connector:dbhistory:oracle:10000'" +
")";
tableEnv.executeSql(sourceDDL);
// 查詢源表數(shù)據(jù)并輸出到控制臺
tableEnv.executeSql("SELECT * FROM oracle_source").print();
}
}
運行Flink流處理程序
1、編譯并打包項目。
2、在命令行中運行Flink流處理程序。
$ flink run m yarncluster ynclass planner sqlblink parallelism jobmanager : taskmanager : detached yarnapplicationname checkpointingInterval savepoints queryFile executionMode streamlit allowNonRestoredState true externalizedCheckpoints false updateCheckpointInterval maxConcurrentCheckpoints 1000 stateBackend rocksdb stateRetentionTimeHours 604800 restoreSavepointOnCancellation true restoreSavepointOnCheckpointFailure true restoreSavepointOnException true restoreSavepointOnCompletion true restoreSavepointOnQueryCompletion true restoreSavepointOnKillApplication true restoreSavepointOnShutdown true restoreSavepointOnStartup true restoreSavepointOnUserRequest true restoreSavepointOnYarnFailure true restoreSavepointOnYarnReconfiguration true restoreSavepointOnYarnNodeLoss true restoreSavepointOnYarnSchedulerRestart true restoreSavepointOnYarnApplicationAttemptEnd true restoreSavepointOnYarnApplicationAttemptFailed true restoreSavepointOnYarnApplicationAttemptKilled true restoreSavepointOnYarnApplicationAttemptStarted true restoreSavepointOnYarnApplicationAttemptUpdated true restoreSavepointOnYarnApplicationSubmissionFailed true restoreSavepointOnYarnApplicationSubmissionSuccessful true restoreSavepointOnYarnApplicationTerminatedTrue false restoreSavepointOnYarnApplicationUnknownTrue false restoreSavepointOnYarnApplicationUnknownFalse false restoreSavepointOnYarnApplicationUnknownNone false restoreSavepointOnYarnApplicationUnknownUnknownFalse false restoreSavepointOnYarnApplicationUnknownUnknownNone false restoreSavepointOnYarnApplicationUnknownUnknownUnknownFalse false restoreSavepointOnYarnApplicationUnknownUnknownUnknownNone false restoreSavepointOnYarnApplicationUnknownUnknownUnknownUnknownFalse false restoreSavepointOnYarnApplicationUnknownUnknownUnknownUnknownNone false restoreSavepointOnYarnApplicationUnknownUnknownUnknownUnknownUnknownFalse false restoreSavepointOnYarnApplicationUnknownUnknownUnknownUnknownUnknownNone false restoreSavepointOnYarnApplicationUnknownUnknownUnknownUnknownUnknownNone false restoreSavepointOnYarnApplicationUnknownUnknownUnknownUnknownUnknownNone false restoreSavepointOnYarnApplicationUnknownUnknownUnknownUnknownUnknownNone false restoreSavepointOnYarnApplicationUnknownUnknownUnknownUnknownUnknownNone false restoreSavepointOnYarnApplicationUnknownUnknownUnknownUnknownUnknownNone false restoreSavepointOnYarnApplicationUnknownUnknownUnknownUnknownUnknownNone false restoreSavepointOnYarnApplicationUnknownUnknownUnknownUnknownUnknownNone false restoreSavepointOnYarnApplicationUnknownUnknownUnknownUnknownUnknownNone false restoreSavepointOnYarnApplicationUnknownUnknownUnknownUnknownUnknownNone false restoreSavepointOnYarnApplicationUnknownUnknownUnknownUnknownUnknownNone false restoreSavepointOnYarnApplicationUnknownUnknownUnknownUnknown
文章名稱:有人用FlinkCDC同步Oracle成功的嗎?
本文URL:http://www.dlmjj.cn/article/dpiggcc.html


咨詢
建站咨詢
