日本综合一区二区|亚洲中文天堂综合|日韩欧美自拍一区|男女精品天堂一区|欧美自拍第6页亚洲成人精品一区|亚洲黄色天堂一区二区成人|超碰91偷拍第一页|日韩av夜夜嗨中文字幕|久久蜜综合视频官网|精美人妻一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
FlinkSQL知其所以然:SQLDDL!

SQL 語法篇

一、DDL:Create 子句

大家好,我是老羊,今天來學一波 Flink SQL 中的 DDL。

創(chuàng)新互聯(lián)專注于高縣網(wǎng)站建設(shè)服務(wù)及定制,我們擁有豐富的企業(yè)做網(wǎng)站經(jīng)驗。 熱誠為您提供高縣營銷型網(wǎng)站建設(shè),高縣網(wǎng)站制作、高縣網(wǎng)頁設(shè)計、高縣網(wǎng)站官網(wǎng)定制、小程序開發(fā)服務(wù),打造高縣網(wǎng)絡(luò)公司原創(chuàng)品牌,更為您提供高縣網(wǎng)站排名全網(wǎng)營銷落地服務(wù)。

CREATE 語句用于向當前或指定的 Catalog 中注冊庫、表、視圖或函數(shù)。注冊后的庫、表、視圖和函數(shù)可以在 SQL 查詢中使用。

目前 Flink SQL 支持下列 CREATE 語句:

  1. CREATE TABLE。
  2. CREATE DATABASE。
  3. CREATE VIEW。
  4. CREATE FUNCTION。

此節(jié)重點介紹建表,建數(shù)據(jù)庫、視圖和 UDF 會在后面的擴展章節(jié)進行介紹。

1、建表語句

下面的 SQL 語句就是建表語句的定義,根據(jù)指定的表名創(chuàng)建一個表,如果同名表已經(jīng)在 catalog 中存在了,則無法注冊。

CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name
(
{ | | }[ , ...n]
[ ]
[ ][ , ...n]
)
[COMMENT table_comment]
[PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
WITH (key1=val1, key2=val2, ...)
[ LIKE source_table [( )] ]
:
column_name column_type [ ] [COMMENT column_comment]
:
[CONSTRAINT constraint_name] PRIMARY KEY NOT ENFORCED
:
[CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED
:
column_name column_type METADATA [ FROM metadata_key ] [ VIRTUAL ]
:
column_name AS computed_column_expression [COMMENT column_comment]
:
WATERMARK FOR rowtime_column_name AS watermark_strategy_expression
:
[catalog_name.][db_name.]table_name
:
{
{ INCLUDING | EXCLUDING } { ALL | CONSTRAINTS | PARTITIONS }
| { INCLUDING | EXCLUDING | OVERWRITING } { GENERATED | OPTIONS | WATERMARKS }
}[, ...]

2、表中的列

  • 常規(guī)列(即物理列)

物理列是數(shù)據(jù)庫中所說的常規(guī)列。其定義了物理介質(zhì)中存儲的數(shù)據(jù)中字段的名稱、類型和順序。

其他類型的列可以在物理列之間聲明,但不會影響最終的物理列的讀取。

舉一個僅包含常規(guī)列的表的案例:

CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING
) WITH (
...
);
  • 元數(shù)據(jù)列

元數(shù)據(jù)列是 SQL 標準的擴展,允許訪問數(shù)據(jù)源本身具有的一些元數(shù)據(jù)。元數(shù)據(jù)列由 METADATA 關(guān)鍵字標識。

例如,我們可以使用元數(shù)據(jù)列從 Kafka 數(shù)據(jù)中讀取 Kafka 數(shù)據(jù)自帶的時間戳(這個時間戳不是數(shù)據(jù)中的某個時間戳字段,而是數(shù)據(jù)寫入 Kafka 時,Kafka 引擎給這條數(shù)據(jù)打上的時間戳標記),然后我們可以在 Flink SQL 中使用這個時間戳,比如進行基于時間的窗口操作。

舉例:

CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING,
-- 讀取 kafka 本身自帶的時間戳
`record_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka'
...
);

元數(shù)據(jù)列可以用于后續(xù)數(shù)據(jù)的處理,或者寫入到目標表中。

舉例:

INSERT INTO MyTable 
SELECT
user_id
, name
, record_time + INTERVAL '1' SECOND
FROM MyTable;

如果自定義的列名稱和 Connector 中定義 metadata 字段的名稱一樣的話,F(xiàn)ROM xxx 子句是可以被省略的。

舉例:

CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING,
-- 讀取 kafka 本身自帶的時間戳
`timestamp` TIMESTAMP_LTZ(3) METADATA
) WITH (
'connector' = 'kafka'
...
);

關(guān)于 Flink SQL 的每種 Connector 都提供了哪些 metadata 字段,詳細可見官網(wǎng)文檔 https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/overview/。

如果自定義列的數(shù)據(jù)類型和 Connector 中定義的 metadata 字段的數(shù)據(jù)類型不一致的話,程序運行時會自動 cast 強轉(zhuǎn)。但是這要求兩種數(shù)據(jù)類型是可以強轉(zhuǎn)的。舉例如下:

CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING,
-- 將時間戳強轉(zhuǎn)為 BIGINT
`timestamp` BIGINT METADATA
) WITH (
'connector' = 'kafka'
...
);

默認情況下,F(xiàn)link SQL planner 認為 metadata 列是可以 讀取 也可以 寫入 的。但是有些外部存儲系統(tǒng)的元數(shù)據(jù)信息是只能用于讀取,不能寫入的。

那么在往一個表寫入的場景下,我們就可以使用 VIRTUAL 關(guān)鍵字來標識某個元數(shù)據(jù)列不寫入到外部存儲中(不持久化)。

以 Kafka 舉例:

CREATE TABLE MyTable (
-- sink 時會寫入
`timestamp` BIGINT METADATA,
-- sink 時不寫入
`offset` BIGINT METADATA VIRTUAL,
`user_id` BIGINT,
`name` STRING,
) WITH (
'connector' = 'kafka'
...
);

在上面這個案例中,Kafka 引擎的 offset 是只讀的。所以我們在把 MyTable 作為數(shù)據(jù)源(輸入)表時,schema 中是包含 offset 的。在把 MyTable 作為數(shù)據(jù)匯(輸出)表時,schema 中是不包含 offset 的。如下:

-- 當做數(shù)據(jù)源(輸入)的 schema
MyTable(`timestamp` BIGINT, `offset` BIGINT, `user_id` BIGINT, `name` STRING)
-- 當做數(shù)據(jù)匯(輸出)的 schema
MyTable(`timestamp` BIGINT, `user_id` BIGINT, `name` STRING)

所以這里在寫入時需要注意,不要在 SQL 的 INSERT INTO 語句中寫入 offset 列,否則 Flink SQL 任務(wù)會直接報錯。

  • 計算列

計算列其實就是在寫建表的 DDL 時,可以拿已有的一些列經(jīng)過一些自定義的運算生成的新列。這些列本身是沒有以物理形式存儲到數(shù)據(jù)源中的。

舉例:

CREATE TABLE MyTable (
`user_id` BIGINT,
`price` DOUBLE,
`quantity` DOUBLE,
-- cost 就是使用 price 和 quanitity 生成的計算列,計算方式為 price * quanitity
`cost` AS price * quanitity,
) WITH (
'connector' = 'kafka'
...
);

注意!!!

計算列可以包含其他列、常量或者函數(shù),但是不能寫一個子查詢進去。

小伙伴萌這時會問到一個問題,既然只能包含列、常量或者函數(shù)計算,我就直接在 DML query 代碼中寫就完事了唄,為啥還要專門在 DDL 中定義呢?

結(jié)論:沒錯,如果只是簡單的四則運算的話直接寫在 DML 中就可以,但是計算列一般是用于定義時間屬性的(因為在 SQL 任務(wù)中時間屬性只能在 DDL 中定義,不能在 DML 語句中定義)。比如要把輸入數(shù)據(jù)的時間格式標準化。處理時間、事件時間分別舉例如下:

  • 處理時間:使用 PROCTIME() 函數(shù)來定義處理時間列
  • 事件時間:事件時間的時間戳可以在聲明 Watermark 之前進行預(yù)處理。比如如果字段不是 TIMESTAMP(3) 類型或者時間戳是嵌套在 JSON 字符串中的,則可以使用計算列進行預(yù)處理。

注意!!!和虛擬 metadata 列是類似的,計算列也是只能讀不能寫的。

也就是說,我們在把 MyTable 作為數(shù)據(jù)源(輸入)表時,schema 中是包含 cost 的。

在把 MyTable 作為數(shù)據(jù)匯(輸出)表時,schema 中是不包含 cost 的。舉例:

-- 當做數(shù)據(jù)源(輸入)的 schema
MyTable(`user_id` BIGINT, `price` DOUBLE, `quantity` DOUBLE, `cost` DOUBLE)
-- 當做數(shù)據(jù)匯(輸出)的 schema
MyTable(`user_id` BIGINT, `price` DOUBLE, `quantity` DOUBLE)

3、定義 Watermark

Watermark 是在 Create Table 中進行定義的。具體 SQL 語法標準是 WATERMARK FOR rowtime_column_name AS watermark_strategy_expression。

其中:

  1. rowtime_column_name:表的事件時間屬性字段。該列必須是 TIMESTAMP(3)、TIMESTAMP_LTZ(3) 類,這個時間可以是一個計算列。
  2. watermark_strategy_expression:定義 Watermark 的生成策略。Watermark 的一般都是由 rowtime_column_name 列減掉一段固定時間間隔。SQL 中 Watermark 的生產(chǎn)策略是:當前 Watermark 大于上次發(fā)出的 Watermark 時發(fā)出當前 Watermark。

注意:

如果你使用的是事件時間語義,那么必須要設(shè)設(shè)置事件時間屬性和 WATERMARK 生成策略。

Watermark 的發(fā)出頻率:Watermark 發(fā)出一般是間隔一定時間的,Watermark 的發(fā)出間隔時間可以由 pipeline.auto-watermark-interval 進行配置,如果設(shè)置為 200ms 則每 200ms 會計算一次 Watermark,然如果比之前發(fā)出的 Watermark 大,則發(fā)出。如果間隔設(shè)為 0ms,則 Watermark 只要滿足觸發(fā)條件就會發(fā)出,不會受到間隔時間控制。

Flink SQL 提供了幾種 WATERMARK 生產(chǎn)策略:

  1. 有界無序:設(shè)置方式為 WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL 'string' timeUnit。此類策略就可以用于設(shè)置最大亂序時間,假如設(shè)置為 WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '5' SECOND,則生成的是運行 5s 延遲的 Watermark。一般都用這種 Watermark 生成策略,此類 Watermark 生成策略通常用于有數(shù)據(jù)亂序的場景中,而對應(yīng)到實際的場景中,數(shù)據(jù)都是會存在亂序的,所以基本都使用此類策略。
  2. 嚴格升序:設(shè)置方式為 WATERMARK FOR rowtime_column AS rowtime_column。一般基本不用這種方式。如果你能保證你的數(shù)據(jù)源的時間戳是嚴格升序的,那就可以使用這種方式。嚴格升序代表 Flink 任務(wù)認為時間戳只會越來越大,也不存在相等的情況,只要相等或者小于之前的,就認為是遲到的數(shù)據(jù)。
  3. 遞增:設(shè)置方式為 WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND。一般基本不用這種方式。如果設(shè)置此類,則允許有相同的時間戳出現(xiàn)。

4、Create Table With 子句

先看一個案例:

CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
)

可以看到 DDL 中 With 子句就是在建表時,描述數(shù)據(jù)源、數(shù)據(jù)匯的具體外部存儲的元數(shù)據(jù)信息的。

一般 With 中的配置項由 Flink SQL 的 Connector(鏈接外部存儲的連接器) 來定義,每種 Connector 提供的 With 配置項都是不同的。

注意:

Flink SQL 中 Connector 其實就是 Flink 用于鏈接外部數(shù)據(jù)源的接口。舉一個類似的例子,在 Java 中想連接到 MySQL,需要使用 mysql-connector-java 包提供的 Java API 去鏈接。映射到 Flink SQL 中,在 Flink SQL 中要連接到 Kafka,需要使用 kafka connector。

Flink SQL 已經(jīng)提供了一系列的內(nèi)置 Connector,具體可見 https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/overview/。

回到上述案例中,With 聲明了以下幾項信息:

  1. 'connector' = 'kafka':聲明外部存儲是 Kafka。
  2. 'topic' = 'user_behavior':聲明 Flink SQL 任務(wù)要連接的 Kafka 表的 topic 是 user_behavior。
  3. 'properties.bootstrap.servers' = 'localhost:9092':聲明 Kafka 的 server ip 是 localhost:9092。
  4. 'properties.group.id' = 'testGroup':聲明 Flink SQL 任務(wù)消費這個 Kafka topic,會使用 testGroup 的 group id 去消費。
  5. 'scan.startup.mode' = 'earliest-offset':聲明 Flink SQL 任務(wù)消費這個 Kafka topic 會從最早位點開始消費。
  6. 'format' = 'csv':聲明 Flink SQL 任務(wù)讀入或者寫出時對于 Kafka 消息的序列化方式是 csv 格式。

從這里也可以看出來 With 中具體要配置哪些配置項都是和每種 Connector 決定的。

5、Create Table Like 子句

Like 子句是 Create Table 子句的一個延伸。舉例:

下面定義了一張 Orders 表:

CREATE TABLE Orders (
`user` BIGINT,
product STRING,
order_time TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'scan.startup.mode' = 'earliest-offset'
);

但是忘記定義 Watermark 了,那如果想加上 Watermark,就可以用 Like 子句定義一張帶 Watermark 的新表:

CREATE TABLE Orders_with_watermark (
-- 1. 添加了 WATERMARK 定義
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
-- 2. 覆蓋了原 Orders 表中 scan.startup.mode 參數(shù)
'scan.startup.mode' = 'latest-offset'
)
-- 3. Like 子句聲明是在原來的 Orders 表的基礎(chǔ)上定義 Orders_with_watermark 表
LIKE Orders;

上面這個語句的效果就等同于:

CREATE TABLE Orders_with_watermark (
`user` BIGINT,
product STRING,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'scan.startup.mode' = 'latest-offset'
);

不過這種不常使用。就不過多介紹了。如果小伙伴萌感興趣,直接去官網(wǎng)參考具體注意事項:

https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/sql/create/#like。


分享文章:FlinkSQL知其所以然:SQLDDL!
本文鏈接:http://www.dlmjj.cn/article/dpoeess.html