日本综合一区二区|亚洲中文天堂综合|日韩欧美自拍一区|男女精品天堂一区|欧美自拍第6页亚洲成人精品一区|亚洲黄色天堂一区二区成人|超碰91偷拍第一页|日韩av夜夜嗨中文字幕|久久蜜综合视频官网|精美人妻一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問(wèn)題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營(yíng)銷解決方案
FlinkSQL知其所以然:Explain、Show、Load、Set子句

EXPLAIN 子句

大家好,我是老羊,今天我們來(lái)學(xué)習(xí) Flink SQL 中的的 Explain、Show、Load、Set 共 4 個(gè)子句。

成都創(chuàng)新互聯(lián)主營(yíng)長(zhǎng)清網(wǎng)站建設(shè)的網(wǎng)絡(luò)公司,主營(yíng)網(wǎng)站建設(shè)方案,重慶APP開發(fā)公司,長(zhǎng)清h5小程序開發(fā)搭建,長(zhǎng)清網(wǎng)站營(yíng)銷推廣歡迎長(zhǎng)清等地區(qū)企業(yè)咨詢

  1. 應(yīng)用場(chǎng)景:EXPLAIN 子句其實(shí)就是用于查看當(dāng)前這個(gè) sql 查詢的邏輯計(jì)劃以及優(yōu)化的執(zhí)行計(jì)劃。
  2. SQL 語(yǔ)法標(biāo)準(zhǔn):
EXPLAIN PLAN FOR 
  1. 實(shí)際案例:
public class Explain_Test {
public static void main(String[] args) throws Exception {
FlinkEnv flinkEnv = FlinkEnvUtils.getStreamTableEnv(args);
flinkEnv.env().setParallelism(1);
String sql = "CREATE TABLE source_table (\n"
+ " user_id BIGINT COMMENT '用戶 id',\n"
+ " name STRING COMMENT '用戶姓名',\n"
+ " server_timestamp BIGINT COMMENT '用戶訪問(wèn)時(shí)間戳',\n"
+ " proctime AS PROCTIME()\n"
+ ") WITH (\n"
+ " 'connector' = 'datagen',\n"
+ " 'rows-per-second' = '1',\n"
+ " 'fields.name.length' = '1',\n"
+ " 'fields.user_id.min' = '1',\n"
+ " 'fields.user_id.max' = '10',\n"
+ " 'fields.server_timestamp.min' = '1',\n"
+ " 'fields.server_timestamp.max' = '100000'\n"
+ ");\n"
+ "\n"
+ "CREATE TABLE sink_table (\n"
+ " user_id BIGINT,\n"
+ " name STRING,\n"
+ " server_timestamp BIGINT\n"
+ ") WITH (\n"
+ " 'connector' = 'print'\n"
+ ");\n"
+ "\n"
+ "EXPLAIN PLAN FOR\n"
+ "INSERT INTO sink_table\n"
+ "select user_id,\n"
+ " name,\n"
+ " server_timestamp\n"
+ "from (\n"
+ " SELECT\n"
+ " user_id,\n"
+ " name,\n"
+ " server_timestamp,\n"
+ " row_number() over(partition by user_id order by proctime) as rn\n"
+ " FROM source_table\n"
+ ")\n"
+ "where rn = 1";
/**
* 算子 {@link org.apache.flink.streaming.api.operators.KeyedProcessOperator}
* -- {@link org.apache.flink.table.runtime.operators.deduplicate.ProcTimeDeduplicateKeepFirstRowFunction}
*/
for (String innerSql : sql.split(";")) {
TableResult tableResult = flinkEnv.streamTEnv().executeSql(innerSql);
tableResult.print();
}
}
}

上述代碼執(zhí)行結(jié)果如下:

1. 抽象語(yǔ)法樹
== Abstract Syntax Tree ==
LogicalSink(table=[default_catalog.default_database.sink_table], fields=[user_id, name, server_timestamp])
+- LogicalProject(user_id=[$0], name=[$1], server_timestamp=[$2])
+- LogicalFilter(condition=[=($3, 1)])
+- LogicalProject(user_id=[$0], name=[$1], server_timestamp=[$2], rn=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY PROCTIME() NULLS FIRST)])
+- LogicalTableScan(table=[[default_catalog, default_database, source_table]])

2. 優(yōu)化后的物理計(jì)劃
== Optimized Physical Plan ==
Sink(table=[default_catalog.default_database.sink_table], fields=[user_id, name, server_timestamp])
+- Calc(select=[user_id, name, server_timestamp])
+- Deduplicate(keep=[FirstRow], key=[user_id], order=[PROCTIME])
+- Exchange(distribution=[hash[user_id]])
+- Calc(select=[user_id, name, server_timestamp, PROCTIME() AS $3])
+- TableSourceScan(table=[[default_catalog, default_database, source_table]], fields=[user_id, name, server_timestamp])

3. 優(yōu)化后的執(zhí)行計(jì)劃
== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.sink_table], fields=[user_id, name, server_timestamp])
+- Calc(select=[user_id, name, server_timestamp])
+- Deduplicate(keep=[FirstRow], key=[user_id], order=[PROCTIME])
+- Exchange(distribution=[hash[user_id]])
+- Calc(select=[user_id, name, server_timestamp, PROCTIME() AS $3])
+- TableSourceScan(table=[[default_catalog, default_database, source_table]], fields=[user_id, name, server_timestamp])

USE 子句

  1. 應(yīng)用場(chǎng)景:如果熟悉 MySQL 的同學(xué)會(huì)非常熟悉這個(gè)子句,在 MySQL 中,USE 子句通常被用于切換庫(kù),那么在 Flink SQL 體系中,它的作用也是和 MySQL 中 USE 子句的功能基本一致,用于切換 Catalog,DataBase,使用 Module。
  2. SQL 語(yǔ)法標(biāo)準(zhǔn):
  • 切換 Catalog:
USE CATALOG catalog_name
  • 使用 Module:
USE MODULES module_name1[, module_name2, ...]
  • 切換 Database:
USE db名稱
  1. 實(shí)際案例:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// create a catalog
tEnv.executeSql("CREATE CATALOG cat1 WITH (...)");
tEnv.executeSql("SHOW CATALOGS").print();
// +-----------------+
// | catalog name |
// +-----------------+
// | default_catalog |
// | cat1 |
// +-----------------+
// change default catalog
tEnv.executeSql("USE CATALOG cat1");
tEnv.executeSql("SHOW DATABASES").print();
// databases are empty
// +---------------+
// | database name |
// +---------------+
// +---------------+
// create a database
tEnv.executeSql("CREATE DATABASE db1 WITH (...)");
tEnv.executeSql("SHOW DATABASES").print();
// +---------------+
// | database name |
// +---------------+
// | db1 |
// +---------------+
// change default database
tEnv.executeSql("USE db1");
// change module resolution order and enabled status
tEnv.executeSql("USE MODULES hive");
tEnv.executeSql("SHOW FULL MODULES").print();
// +-------------+-------+
// | module name | used |
// +-------------+-------+
// | hive | true |
// | core | false |
// +-------------+-------+

SHOW 子句

  1. 應(yīng)用場(chǎng)景:如果熟悉 MySQL 的同學(xué)會(huì)非常熟悉這個(gè)子句,在 MySQL 中,SHOW 子句常常用于查詢庫(kù)、表、函數(shù)等,在 Flink SQL 體系中也類似。Flink SQL 支持 SHOW 以下內(nèi)容。
  2. SQL 語(yǔ)法標(biāo)準(zhǔn):
SHOW CATALOGS:展示所有 Catalog
SHOW CURRENT CATALOG:展示當(dāng)前的 Catalog
SHOW DATABASES:展示當(dāng)前 Catalog 下所有 Database
SHOW CURRENT DATABASE:展示當(dāng)前的 Database
SHOW TABLES:展示當(dāng)前 Database 下所有表
SHOW VIEWS:展示所有視圖
SHOW FUNCTIONS:展示所有的函數(shù)
SHOW MODULES:展示所有的 Module(Module 是用于 UDF 擴(kuò)展)
  1. 實(shí)際案例:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// show catalogs
tEnv.executeSql("SHOW CATALOGS").print();
// +-----------------+
// | catalog name |
// +-----------------+
// | default_catalog |
// +-----------------+
// show current catalog
tEnv.executeSql("SHOW CURRENT CATALOG").print();
// +----------------------+
// | current catalog name |
// +----------------------+
// | default_catalog |
// +----------------------+
// show databases
tEnv.executeSql("SHOW DATABASES").print();
// +------------------+
// | database name |
// +------------------+
// | default_database |
// +------------------+
// show current database
tEnv.executeSql("SHOW CURRENT DATABASE").print();
// +-----------------------+
// | current database name |
// +-----------------------+
// | default_database |
// +-----------------------+
// create a table
tEnv.executeSql("CREATE TABLE my_table (...) WITH (...)");
// show tables
tEnv.executeSql("SHOW TABLES").print();
// +------------+
// | table name |
// +------------+
// | my_table |
// +------------+
// create a view
tEnv.executeSql("CREATE VIEW my_view AS ...");
// show views
tEnv.executeSql("SHOW VIEWS").print();
// +-----------+
// | view name |
// +-----------+
// | my_view |
// +-----------+
// show functions
tEnv.executeSql("SHOW FUNCTIONS").print();
// +---------------+
// | function name |
// +---------------+
// | mod |
// | sha256 |
// | ... |
// +---------------+
// create a user defined function
tEnv.executeSql("CREATE FUNCTION f1 AS ...");
// show user defined functions
tEnv.executeSql("SHOW USER FUNCTIONS").print();
// +---------------+
// | function name |
// +---------------+
// | f1 |
// | ... |
// +---------------+
// show modules
tEnv.executeSql("SHOW MODULES").print();
// +-------------+
// | module name |
// +-------------+
// | core |
// +-------------+
// show full modules
tEnv.executeSql("SHOW FULL MODULES").print();
// +-------------+-------+
// | module name | used |
// +-------------+-------+
// | core | true |
// | hive | false |
// +-------------+-------+

LOAD、UNLOAD 子句

  1. 應(yīng)用場(chǎng)景:我們可以使用 LOAD 子句去加載 Flink SQL 體系內(nèi)置的或者用戶自定義的 Module,UNLOAD 子句去卸載 Flink SQL 體系內(nèi)置的或者用戶自定義的 Module。
  2. SQL 語(yǔ)法標(biāo)準(zhǔn):
-- 加載
LOAD MODULE module_name [WITH ('key1' = 'val1', 'key2' = 'val2', ...)]
-- 卸載
UNLOAD MODULE module_name
  1. 實(shí)際案例:
  • LOAD 案例:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// 加載 Flink SQL 體系內(nèi)置的 Hive module
tEnv.executeSql("LOAD MODULE hive WITH ('hive-version' = '3.1.2')");
tEnv.executeSql("SHOW MODULES").print();
// +-------------+
// | module name |
// +-------------+
// | core |
// | hive |
// +-------------+
  • UNLOAD 案例:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// 卸載唯一的一個(gè) CoreModule
tEnv.executeSql("UNLOAD MODULE core");
tEnv.executeSql("SHOW MODULES").print();
// 結(jié)果啥 Moudle 都沒(méi)有了

SET、RESET 子句

  1. 應(yīng)用場(chǎng)景:SET 子句可以用于修改一些 Flink SQL 的環(huán)境配置,RESET 子句是可以將所有的環(huán)境配置恢復(fù)成默認(rèn)配置,但只能在 SQL CLI 中進(jìn)行使用,主要是為了讓用戶更純粹的使用 SQL 而不必使用其他方式或者切換系統(tǒng)環(huán)境。
  2. SQL 語(yǔ)法標(biāo)準(zhǔn):
SET (key = value)?
RESET (key)?
  1. 實(shí)際案例:

啟動(dòng)一個(gè) SQL CLI 之后,在 SQL CLI 中可以進(jìn)行以下 SET 設(shè)置:

Flink SQL> SET table.planner = blink;
[INFO] Session property has been set.
Flink SQL> SET;
table.planner=blink;
Flink SQL> RESET table.planner;
[INFO] Session property has been reset.
Flink SQL> RESET;
[INFO] All session properties have been set to their default values.

標(biāo)題名稱:FlinkSQL知其所以然:Explain、Show、Load、Set子句
分享URL:http://www.dlmjj.cn/article/djdeoie.html