新聞中心
這篇文章主要講解了“Flink CDC怎么監(jiān)聽MySQL表”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“Flink CDC怎么監(jiān)聽MySQL表”吧!
創(chuàng)新互聯(lián)是一家專注于成都網(wǎng)站設計、網(wǎng)站制作與策劃設計,溫江網(wǎng)站建設哪家好?創(chuàng)新互聯(lián)做網(wǎng)站,專注于網(wǎng)站建設十年,網(wǎng)設計領域的專業(yè)建站公司;建站業(yè)務涵蓋:溫江等地區(qū)。溫江做網(wǎng)站價格咨詢:18982081108
// 前景提要:開啟mysql binlog監(jiān)控。(目錄:C:\ProgramData\MySQL\MySQL Server 5.6\my.ini)ProgramData 為隱藏目錄。注意:binlog_format=ROW // 創(chuàng)建Blink Streaming的TableEnvironmentEnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);// 創(chuàng)建表,connector使用mysql-cdcbsTableEnv.executeSql("CREATE TABLE mysql_binlog " +"(id STRING, " +"times STRING, " +"temp STRING) " +"WITH " +"('connector' = 'mysql-cdc', " +" 'hostname' = '127.0.0.1', " +" 'port' = '3306', " +" 'username' = 'root', " +" 'password' = '123456', " +" 'database-name' = 'test', " +" 'table-name' = 'sersor_temp'" +")");// 打印控制臺bsTableEnv.executeSql("CREATE TABLE sink_table " +"(id STRING, " +"times STRING, " +"temp DOUBLE) " +"WITH " +"('connector' = 'print'" +")");// 將CDC數(shù)據(jù)源和下游數(shù)據(jù)表對接起來bsTableEnv.executeSql("INSERT INTO sink_table SELECT id, times, temp FROM mysql_binlog");bsTableEnv.executeSql("CREATE TABLE sink_kafka_table " +"(id STRING, " +"times STRING, " +"temp DOUBLE " +") WITH (" +" 'connector' = 'kafka'," +" 'topic' = 'test_mysql_binlog'," +" 'scan.startup.mode' = 'earliest-offset'," +" 'properties.group.id' = 'testGroup'," +" 'properties.bootstrap.servers' = 'node2:9092', " +" 'format' = 'canal-json' " +")");// 將CDC數(shù)據(jù)與 kafka表對接起來bsTableEnv.executeSql("INSERT INTO sink_kafka_table SELECT id, times, temp FROM mysql_binlog");bsTableEnv.executeSql("CREATE TABLE hTable (" +" id STRING," +" f ROW," +" PRIMARY KEY (id) NOT ENFORCED" +") WITH (" +" 'connector' = 'hbase-2.2'," +" 'table-name' = 'regional:binlog'," +" 'zookeeper.quorum' = 'node2:2181'" +")");// 將CDC數(shù)據(jù)存儲到 Hbase中bsTableEnv.executeSql("INSERT INTO hTable SELECT id, ROW(times, temp) FROM mysql_binlog");
-- ----------------------------
-- Table structure for sersor_temp
-- ----------------------------
DROP TABLE IF EXISTS `sersor_temp`;
CREATE TABLE `sersor_temp` (
`id` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
`temp` decimal(10, 2) NOT NULL,
`times` varchar(10) CHARACTER SET latin1 COLLATE latin1_swedish_ci NOT NULL,
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = latin1 COLLATE = latin1_swedish_ci ROW_FORMAT = Compact;
-- ----------------------------
-- Records of sersor_temp
-- ----------------------------
INSERT INTO `sersor_temp` VALUES ('sensor_1', 22.20, '1547718527');
INSERT INTO `sersor_temp` VALUES ('sensor_2', 25.20, '1547718214');
INSERT INTO `sersor_temp` VALUES ('sensor_3', 46.40, '1547718520');
INSERT INTO `sersor_temp` VALUES ('sensor_5', 32.62, '1547718325');
注意:此處 表中 temp 字段為 decimal 類型,在SQL中使用 DECIMAL 、DOUBLE 類型 存儲到hbase中都會出現(xiàn)亂碼問題,遂 都換成 STRING
感謝各位的閱讀,以上就是“Flink CDC怎么監(jiān)聽MySQL表”的內容了,經(jīng)過本文的學習后,相信大家對Flink CDC怎么監(jiān)聽MySQL表這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是創(chuàng)新互聯(lián),小編將為大家推送更多相關知識點的文章,歡迎關注!
文章標題:FlinkCDC怎么監(jiān)聽MySQL表
網(wǎng)站地址:http://www.dlmjj.cn/article/pscpie.html