新聞中心
一、什么是Table API

創(chuàng)新互聯(lián)專(zhuān)注于如皋企業(yè)網(wǎng)站建設(shè),響應(yīng)式網(wǎng)站開(kāi)發(fā),商城網(wǎng)站定制開(kāi)發(fā)。如皋網(wǎng)站建設(shè)公司,為如皋等地區(qū)提供建站服務(wù)。全流程按需網(wǎng)站制作,專(zhuān)業(yè)設(shè)計(jì),全程項(xiàng)目跟蹤,創(chuàng)新互聯(lián)專(zhuān)業(yè)和態(tài)度為您提供的服務(wù)
在《Apache Flink 漫談系列(08) - SQL概覽》中我們概要的向大家介紹了什么是好SQL,SQL和Table API是Apache Flink中的同一層次的API抽象,如下圖所示:
Apache Flink 針對(duì)不同的用戶(hù)場(chǎng)景提供了三層用戶(hù)API,最下層ProcessFunction API可以對(duì)State,Timer等復(fù)雜機(jī)制進(jìn)行有效的控制,但用戶(hù)使用的便捷性很弱,也就是說(shuō)即使很簡(jiǎn)單統(tǒng)計(jì)邏輯,也要較多的代碼開(kāi)發(fā)。第二層DataStream API對(duì)窗口,聚合等算子進(jìn)行了封裝,用戶(hù)的便捷性有所增強(qiáng)。最上層是SQL/Table API,Table API是Apache Flink中的聲明式,可被查詢(xún)優(yōu)化器優(yōu)化的高級(jí)分析API。
二、Table API的特點(diǎn)
Table API和SQL都是Apache Flink中最高層的分析API,SQL所具備的特點(diǎn)Table API也都具有,如下:
- 聲明式 - 用戶(hù)只關(guān)心做什么,不用關(guān)心怎么做;
- 高性能 - 支持查詢(xún)優(yōu)化,可以獲取最好的執(zhí)行性能;
- 流批統(tǒng)一 - 相同的統(tǒng)計(jì)邏輯,既可以流模式運(yùn)行,也可以批模式運(yùn)行;
- 標(biāo)準(zhǔn)穩(wěn)定 - 語(yǔ)義遵循SQL標(biāo)準(zhǔn),語(yǔ)法語(yǔ)義明確,不易變動(dòng)。
當(dāng)然除了SQL的特性,因?yàn)門(mén)able API是在Flink中專(zhuān)門(mén)設(shè)計(jì)的,所以Table API還具有自身的特點(diǎn):
- 表達(dá)方式的擴(kuò)展性 - 在Flink中可以為T(mén)able API開(kāi)發(fā)很多便捷性功能,如:Row.flatten(), map/flatMap 等
- 功能的擴(kuò)展性 - 在Flink中可以為T(mén)able API擴(kuò)展更多的功能,如:Iteration,flatAggregate 等新功能
- 編譯檢查 - Table API支持java和scala語(yǔ)言開(kāi)發(fā),支持IDE中進(jìn)行編譯檢查。
說(shuō)明:上面說(shuō)的map/flatMap/flatAggregate都是Apache Flink 社區(qū) FLIP-29 中規(guī)劃的新功能。
三、HelloWorld
在介紹Table API所有算子之前我們先編寫(xiě)一個(gè)簡(jiǎn)單的HelloWorld來(lái)直觀了解如何進(jìn)行Table API的開(kāi)發(fā)。
1. Maven 依賴(lài)
在pom文件中增加如下配置,本篇以flink-1.7.0功能為準(zhǔn)進(jìn)行后續(xù)介紹。
1.7.0 org.apache.flink flink-table_2.11 ${table.version} org.apache.flink flink-scala_2.11 ${table.version} org.apache.flink flink-streaming-scala_2.11 ${table.version} org.apache.flink flink-streaming-java_2.11 ${table.version}
2. 程序結(jié)構(gòu)
在編寫(xiě)第一Flink Table API job之前我們先簡(jiǎn)單了解一下Flink Table API job的結(jié)構(gòu),如下圖所示:
- 外部數(shù)據(jù)源,比如Kafka, Rabbitmq, CSV 等等;
- 查詢(xún)計(jì)算邏輯,比如最簡(jiǎn)單的數(shù)據(jù)導(dǎo)入select,雙流Join,Window Aggregate 等;
- 外部結(jié)果存儲(chǔ),比如Kafka,Cassandra,CSV等。
說(shuō)明:1和3 在Apache Flink中統(tǒng)稱(chēng)為Connector。
3. 主程序
我們以一個(gè)統(tǒng)計(jì)單詞數(shù)量的業(yè)務(wù)場(chǎng)景,編寫(xiě)第一個(gè)HelloWorld程序。
根據(jù)上面Flink job基本結(jié)構(gòu)介紹,要Table API完成WordCount的計(jì)算需求,我們需要完成三部分代碼:
- TableSoruce Code - 用于創(chuàng)建數(shù)據(jù)源的代碼
- Table API Query - 用于進(jìn)行word count統(tǒng)計(jì)的Table API 查詢(xún)邏輯
- TableSink Code - 用于保存word count計(jì)算結(jié)果的結(jié)果表代碼
(1) 運(yùn)行模式選擇
一個(gè)job我們要選擇是Stream方式運(yùn)行還是Batch模式運(yùn)行,所以任何統(tǒng)計(jì)job的第一步是進(jìn)行運(yùn)行模式選擇,如下我們選擇Stream方式運(yùn)行。
- // Stream運(yùn)行環(huán)境
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
(2) 構(gòu)建測(cè)試Source
我們用最簡(jiǎn)單的構(gòu)建Source方式進(jìn)行本次測(cè)試,代碼如下:
- // 測(cè)試數(shù)據(jù)
- val data = Seq("Flink", "Bob", "Bob", "something", "Hello", "Flink", "Bob")
- // 最簡(jiǎn)單的獲取Source方式
- val source = env.fromCollection(data).toTable(tEnv, 'word)
(3) WordCount 統(tǒng)計(jì)邏輯
WordCount核心統(tǒng)計(jì)邏輯就是按照單詞分組,然后計(jì)算每個(gè)單詞的數(shù)量,統(tǒng)計(jì)邏輯如下:
- // 單詞統(tǒng)計(jì)核心邏輯
- val result = source
- .groupBy('word) // 單詞分組
- .select('word, 'word.count) // 單詞統(tǒng)計(jì)
(4) 定義Sink
將WordCount的統(tǒng)計(jì)結(jié)果寫(xiě)入Sink中,代碼如下:
- // 自定義Sink
- val sink = new RetractSink // 自定義Sink(下面有完整代碼)
- // 計(jì)算結(jié)果寫(xiě)入sink
- result.toRetractStream[(String, Long)].addSink(sink)
(5) 完整的HelloWord代碼
為了方便大家運(yùn)行WordCount查詢(xún)統(tǒng)計(jì),將完整的代碼分享大家(基于flink-1.7.0),如下:
- import org.apache.flink.api.scala._
- import org.apache.flink.configuration.Configuration
- import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
- import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
- import org.apache.flink.table.api.TableEnvironment
- import org.apache.flink.table.api.scala._
- import scala.collection.mutable
- object HelloWord {
- def main(args: Array[String]): Unit = {
- // 測(cè)試數(shù)據(jù)
- val data = Seq("Flink", "Bob", "Bob", "something", "Hello", "Flink", "Bob")
- // Stream運(yùn)行環(huán)境
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- // 最簡(jiǎn)單的獲取Source方式
- val source = env.fromCollection(data).toTable(tEnv, 'word)
- // 單詞統(tǒng)計(jì)核心邏輯
- val result = source
- .groupBy('word) // 單詞分組
- .select('word, 'word.count) // 單詞統(tǒng)計(jì)
- // 自定義Sink
- val sink = new RetractSink
- // 計(jì)算結(jié)果寫(xiě)入sink
- result.toRetractStream[(String, Long)].addSink(sink)
- env.execute
- }
- }
- class RetractSink extends RichSinkFunction[(Boolean, (String, Long))] {
- private var resultSet: mutable.Set[(String, Long)] = _
- override def open(parameters: Configuration): Unit = {
- // 初始化內(nèi)存存儲(chǔ)結(jié)構(gòu)
- resultSet = new mutable.HashSet[(String, Long)]
- }
- override def invoke(v: (Boolean, (String, Long)), context: SinkFunction.Context[_]): Unit = {
- if (v._1) {
- // 計(jì)算數(shù)據(jù)
- resultSet.add(v._2)
- }
- else {
- // 撤回?cái)?shù)據(jù)
- resultSet.remove(v._2)
- }
- }
- override def close(): Unit = {
- // 打印寫(xiě)入sink的結(jié)果數(shù)據(jù)
- resultSet.foreach(println)
- }
- }
運(yùn)行結(jié)果如下:
雖然上面用了較長(zhǎng)的紙墨介紹簡(jiǎn)單的WordCount統(tǒng)計(jì)邏輯,但source和sink部分都是可以在學(xué)習(xí)后面算子中被復(fù)用的。本例核心的統(tǒng)計(jì)邏輯只有一行代碼:
- source.groupBy('word).select('word, 'word.count)
所以Table API開(kāi)發(fā)技術(shù)任務(wù)非常的簡(jiǎn)潔高效。
四、Table API 算子
雖然Table API與SQL的算子語(yǔ)義一致,但在表達(dá)方式上面SQL以文本的方式展現(xiàn),Table API是以java或者scala語(yǔ)言的方式進(jìn)行開(kāi)發(fā)。為了大家方便閱讀,即便是在《Apache Flink 漫談系列(08) - SQL概覽》中介紹過(guò)的算子,在這里也會(huì)再次進(jìn)行介紹,當(dāng)然對(duì)于Table API和SQL不同的地方會(huì)進(jìn)行詳盡介紹。
1. 示例數(shù)據(jù)及測(cè)試類(lèi)
(1) 測(cè)試數(shù)據(jù)
- customer_tab 表 - 客戶(hù)表保存客戶(hù)id,客戶(hù)姓名和客戶(hù)描述信息。字段及測(cè)試數(shù)據(jù)如下:
- order_tab 表 - 訂單表保存客戶(hù)購(gòu)買(mǎi)的訂單信息,包括訂單id,訂單時(shí)間和訂單描述信息。 字段節(jié)測(cè)試數(shù)據(jù)如下:
- Item_tab商品表, 攜帶商品id,商品類(lèi)型,出售時(shí)間,價(jià)格等信息,具體如下:
- PageAccess_tab頁(yè)面訪(fǎng)問(wèn)表,包含用戶(hù)ID,訪(fǎng)問(wèn)時(shí)間,用戶(hù)所在地域信息,具體數(shù)據(jù)如下:
- PageAccessCount_tab頁(yè)面訪(fǎng)問(wèn)表,訪(fǎng)問(wèn)量,訪(fǎng)問(wèn)時(shí)間,用戶(hù)所在地域信息,具體數(shù)據(jù)如下:
- PageAccessSession_tab頁(yè)面訪(fǎng)問(wèn)表,訪(fǎng)問(wèn)量,訪(fǎng)問(wèn)時(shí)間,用戶(hù)所在地域信息,具體數(shù)據(jù)如下:
(2) 測(cè)試類(lèi)
我們創(chuàng)建一個(gè)TableAPIOverviewITCase.scala 用于接下來(lái)介紹Flink Table API算子的功能體驗(yàn)。代碼如下:
- import org.apache.flink.api.scala._
- import org.apache.flink.runtime.state.StateBackend
- import org.apache.flink.runtime.state.memory.MemoryStateBackend
- import org.apache.flink.streaming.api.TimeCharacteristic
- import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
- import org.apache.flink.streaming.api.functions.source.SourceFunction
- import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
- import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
- import org.apache.flink.streaming.api.watermark.Watermark
- import org.apache.flink.table.api.{Table, TableEnvironment}
- import org.apache.flink.table.api.scala._
- import org.apache.flink.types.Row
- import org.junit.rules.TemporaryFolder
- import org.junit.{Rule, Test}
- import scala.collection.mutable
- import scala.collection.mutable.ArrayBuffer
- class Table APIOverviewITCase {
- // 客戶(hù)表數(shù)據(jù)
- val customer_data = new mutable.MutableList[(String, String, String)]
- customer_data.+=(("c_001", "Kevin", "from JinLin"))
- customer_data.+=(("c_002", "Sunny", "from JinLin"))
- customer_data.+=(("c_003", "JinCheng", "from HeBei"))
- // 訂單表數(shù)據(jù)
- val order_data = new mutable.MutableList[(String, String, String, String)]
- order_data.+=(("o_001", "c_002", "2018-11-05 10:01:01", "iphone"))
- order_data.+=(("o_002", "c_001", "2018-11-05 10:01:55", "ipad"))
- order_data.+=(("o_003", "c_001", "2018-11-05 10:03:44", "flink book"))
- // 商品銷(xiāo)售表數(shù)據(jù)
- val item_data = Seq(
- Left((1510365660000L, (1510365660000L, 20, "ITEM001", "Electronic"))),
- Right((1510365660000L)),
- Left((1510365720000L, (1510365720000L, 50, "ITEM002", "Electronic"))),
- Right((1510365720000L)),
- Left((1510365780000L, (1510365780000L, 30, "ITEM003", "Electronic"))),
- Left((1510365780000L, (1510365780000L, 60, "ITEM004", "Electronic"))),
- Right((1510365780000L)),
- Left((1510365900000L, (1510365900000L, 40, "ITEM005", "Electronic"))),
- Right((1510365900000L)),
- Left((1510365960000L, (1510365960000L, 20, "ITEM006", "Electronic"))),
- Right((1510365960000L)),
- Left((1510366020000L, (1510366020000L, 70, "ITEM007", "Electronic"))),
- Right((1510366020000L)),
- Left((1510366080000L, (1510366080000L, 20, "ITEM008", "Clothes"))),
- Right((151036608000L)))
- // 頁(yè)面訪(fǎng)問(wèn)表數(shù)據(jù)
- val pageAccess_data = Seq(
- Left((1510365660000L, (1510365660000L, "ShangHai", "U0010"))),
- Right((1510365660000L)),
- Left((1510365660000L, (1510365660000L, "BeiJing", "U1001"))),
- Right((1510365660000L)),
- Left((1510366200000L, (1510366200000L, "BeiJing", "U2032"))),
- Right((1510366200000L)),
- Left((1510366260000L, (1510366260000L, "BeiJing", "U1100"))),
- Right((1510366260000L)),
- Left((1510373400000L, (1510373400000L, "ShangHai", "U0011"))),
- Right((1510373400000L)))
- // 頁(yè)面訪(fǎng)問(wèn)量表數(shù)據(jù)2
- val pageAccessCount_data = Seq(
- Left((1510365660000L, (1510365660000L, "ShangHai", 100))),
- Right((1510365660000L)),
- Left((1510365660000L, (1510365660000L, "BeiJing", 86))),
- Right((1510365660000L)),
- Left((1510365960000L, (1510365960000L, "BeiJing", 210))),
- Right((1510366200000L)),
- Left((1510366200000L, (1510366200000L, "BeiJing", 33))),
- Right((1510366200000L)),
- Left((1510373400000L, (1510373400000L, "ShangHai", 129))),
- Right((1510373400000L)))
- // 頁(yè)面訪(fǎng)問(wèn)表數(shù)據(jù)3
- val pageAccessSession_data = Seq(
- Left((1510365660000L, (1510365660000L, "ShangHai", "U0011"))),
- Right((1510365660000L)),
- Left((1510365720000L, (1510365720000L, "ShangHai", "U0012"))),
- Right((1510365720000L)),
- Left((1510365720000L, (1510365720000L, "ShangHai", "U0013"))),
- Right((1510365720000L)),
- Left((1510365900000L, (1510365900000L, "ShangHai", "U0015"))),
- Right((1510365900000L)),
- Left((1510366200000L, (1510366200000L, "ShangHai", "U0011"))),
- Right((1510366200000L)),
- Left((1510366200000L, (1510366200000L, "BeiJing", "U2010"))),
- Right((1510366200000L)),
- Left((1510366260000L, (1510366260000L, "ShangHai", "U0011"))),
- Right((1510366260000L)),
- Left((1510373760000L, (1510373760000L, "ShangHai", "U0410"))),
- Right((1510373760000L)))
- val _tempFolder = new TemporaryFolder
- // Streaming 環(huán)境
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- env.setParallelism(1)
- env.setStateBackend(getStateBackend)
- def getProcTimeTables(): (Table, Table) = {
- // 將order_tab, customer_tab 注冊(cè)到catalog
- val customer = env.fromCollection(customer_data).toTable(tEnv).as('c_id, 'c_name, 'c_desc)
- val order = env.fromCollection(order_data).toTable(tEnv).as('o_id, 'c_id, 'o_time, 'o_desc)
- (customer, order)
- }
- def getEventTimeTables(): (Table, Table, Table, Table) = {
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- // 將item_tab, pageAccess_tab 注冊(cè)到catalog
- val item =
- env.addSource(new EventTimeSourceFunction[(Long, Int, String, String)](item_data))
- .toTable(tEnv, 'onSellTime, 'price, 'itemID, 'itemType, 'rowtime.rowtime)
- val pageAccess =
- env.addSource(new EventTimeSourceFunction[(Long, String, String)](pageAccess_data))
- .toTable(tEnv, 'accessTime, 'region, 'userId, 'rowtime.rowtime)
- val pageAccessCount =
- env.addSource(new EventTimeSourceFunction[(Long, String, Int)](pageAccessCount_data))
- .toTable(tEnv, 'accessTime, 'region, 'accessCount, 'rowtime.rowtime)
- val pageAccessSession =
- env.addSource(new EventTimeSourceFunction[(Long, String, String)](pageAccessSession_data))
- .toTable(tEnv, 'accessTime, 'region, 'userId, 'rowtime.rowtime)
- (item, pageAccess, pageAccessCount, pageAccessSession)
- }
- @Rule
- def tempFolder: TemporaryFolder = _tempFolder
- def getStateBackend: StateBackend = {
- new MemoryStateBackend()
- }
- def procTimePrint(result: Table): Unit = {
- val sink = new RetractingSink
- result.toRetractStream[Row].addSink(sink)
- env.execute()
- }
- def rowTimePrint(result: Table): Unit = {
- val sink = new RetractingSink
- result.toRetractStream[Row].addSink(sink)
- env.execute()
- }
- @Test
- def testProc(): Unit = {
- val (customer, order) = getProcTimeTables()
- val result = ...// 測(cè)試的查詢(xún)邏輯
- procTimePrint(result)
- }
- @Test
- def testEvent(): Unit = {
- val (item, pageAccess, pageAccessCount, pageAccessSession) = getEventTimeTables()
- val result = ...// 測(cè)試的查詢(xún)邏輯
- procTimePrint(result)
- }
- }
- // 自定義Sink
- final class RetractingSink extends RichSinkFunction[(Boolean, Row)] {
- var retractedResults: ArrayBuffer[String] = null
- override def open(parameters: Configuration): Unit = {
- super.open(parameters)
- retractedResults = mutable.ArrayBuffer.empty[String]
- }
- def invoke(v: (Boolean, Row)) {
- retractedResults.synchronized {
- val vvalue = v._2.toString
- if (v._1) {
- retractedResults += value
- } else {
- val idx = retractedResults.indexOf(value)
- if (idx >= 0) {
- retractedResults.remove(idx)
- } else {
- throw new RuntimeException("Tried to retract a value that wasn't added first. " +
- "This is probably an incorrectly implemented test. " +
- "Try to set the parallelism of the sink to 1.")
- }
- }
- }
- }
- override def close(): Unit = {
- super.close()
- retractedResults.sorted.foreach(println(_))
- }
- }
- // Water mark 生成器
- class EventTimeSourceFunction[T](
- dataWithTimestampList: Seq[Either[(Long, T), Long]]) extends SourceFunction[T] {
- override def run(ctx: SourceContext[T]): Unit = {
- dataWithTimestampList.foreach {
- case Left(t) => ctx.collectWithTimestamp(t._2, t._1)
- case Right(w) => ctx.emitWatermark(new Watermark(w))
- }
- }
- override def cancel(): Unit = ???
- }
2. SELECT
SELECT 用于從數(shù)據(jù)集/流中選擇數(shù)據(jù),語(yǔ)義是關(guān)系代數(shù)中的投影(Projection),對(duì)關(guān)系進(jìn)行垂直分割,消去或增加某些列, 如下圖所示:
(1) Table API 示例
從customer_tab選擇用戶(hù)姓名,并用內(nèi)置的CONCAT函數(shù)拼接客戶(hù)信息,如下:
- val result = customer
- .select('c_name, concat_ws('c_name, " come ", 'c_desc))
(2) Result
(3) 特別說(shuō)明
大家看到在 SELECT 不僅可以使用普通的字段選擇,還可以使用ScalarFunction,當(dāng)然也包括User-Defined Function,同時(shí)還可以進(jìn)行字段的alias設(shè)置。其實(shí)SELECT可以結(jié)合聚合,在GROUPBY部分會(huì)進(jìn)行介紹,一個(gè)比較特殊的使用場(chǎng)景是去重的場(chǎng)景,示例如下:
- Table API示例
在訂單表查詢(xún)所有的客戶(hù)id,消除重復(fù)客戶(hù)id, 如下:
- val result = order
- .groupBy('c_id)
- .select('c_id)
- Result
3. WHERE
WHERE 用于從數(shù)據(jù)集/流中過(guò)濾數(shù)據(jù),與SELECT一起使用,語(yǔ)義是關(guān)系代數(shù)的Selection,根據(jù)某些條件對(duì)關(guān)系做水平分割,即選擇符合條件的記錄,如下所示:
(1) Table API 示例
在customer_tab查詢(xún)客戶(hù)id為c_001和c_003的客戶(hù)信息,如下:
- val result = customer
- .where("c_|| c_")
- .select( 'c_id, 'c_name, 'c_desc)
(2) Result
(3) 特別說(shuō)明
我們發(fā)現(xiàn)WHERE是對(duì)滿(mǎn)足一定條件的數(shù)據(jù)進(jìn)行過(guò)濾,WHERE支持=, <, >, <>, >=, <=以及&&, ||等表達(dá)式的組合,最終滿(mǎn)足過(guò)濾條件的數(shù)據(jù)會(huì)被選擇出來(lái)。 SQL中的IN和NOT IN在Table API里面用intersect 和 minus描述(flink-1.7.0版本)。
- Intersect 示例
Intersect只在Batch模式下進(jìn)行支持,Stream模式下我們可以利用雙流JOIN來(lái)實(shí)現(xiàn),如:在customer_tab查詢(xún)已經(jīng)下過(guò)訂單的客戶(hù)信息,如下:
- // 計(jì)算客戶(hù)id,并去重
- val distinct_cids = order
- .groupBy('c_id) // 去重
- .select('c_id as 'o_c_id)
- val result = customer
- .join(distinct_cids, 'c_id === 'o_c_id)
- .select('c_id, 'c_name, 'c_desc)
- Result
- Minus 示例
Minus只在Batch模式下進(jìn)行支持,Stream模式下我們可以利用雙流JOIN來(lái)實(shí)現(xiàn),如:在customer_tab查詢(xún)沒(méi)有下過(guò)訂單的客戶(hù)信息,如下:
- // 查詢(xún)下過(guò)訂單的客戶(hù)id,并去重
- val distinct_cids = order
- .groupBy('c_id)
- .select('c_id as 'o_c_id)
- // 查詢(xún)沒(méi)有下過(guò)訂單的客戶(hù)信息
- val result = customer
- .leftOuterJoin(distinct_cids, 'c_id === 'o_c_id)
- .where('o_c_id isNull)
- .select('c_id, 'c_name, 'c_desc)
說(shuō)明上面實(shí)現(xiàn)邏輯比較復(fù)雜,我們后續(xù)考慮如何在流上支持更簡(jiǎn)潔的方式。
- Result
- Intersect/Minus與關(guān)系代數(shù)
如上介紹Intersect是關(guān)系代數(shù)中的Intersection, Minus是關(guān)系代數(shù)的Difference, 如下圖示意:
a. Intersect(Intersection):
b. Minus(Difference):
4. GROUP BY
GROUP BY 是對(duì)數(shù)據(jù)進(jìn)行分組的操作,比如我需要分別計(jì)算一下一個(gè)學(xué)生表里面女生和男生的人數(shù)分別是多少,如下:
(1) Table API 示例
將order_tab信息按c_id分組統(tǒng)計(jì)訂單數(shù)量,簡(jiǎn)單示例如下:
- val result = order
- .groupBy('c_id)
- .select('c_id, 'o_id.count)
(2) Result
(3) 特別說(shuō)明
在實(shí)際的業(yè)務(wù)場(chǎng)景中,GROUP BY除了按業(yè)務(wù)字段進(jìn)行分組外,很多時(shí)候用戶(hù)也可以用時(shí)間來(lái)進(jìn)行分組(相當(dāng)于劃分窗口),比如統(tǒng)計(jì)每分鐘的訂單數(shù)量:
- Table API 示例
按時(shí)間進(jìn)行分組,查詢(xún)每分鐘的訂單數(shù)量,如下:
- ```
- val result = order
- .select('o_id, 'c_id, 'o_time.substring(1, 16) as 'o_time_min)
- .groupBy('o_time_min)
- .select('o_time_min, 'o_id.count)
- ```
- Result
說(shuō)明:如果我們時(shí)間字段是timestamp類(lèi)型,建議使用內(nèi)置的 DATE_FORMAT 函數(shù)。
5. UNION ALL
UNION ALL 將兩個(gè)表合并起來(lái),要求兩個(gè)表的字段完全一致,包括字段類(lèi)型、字段順序,語(yǔ)義對(duì)應(yīng)關(guān)系代數(shù)的Union,只是關(guān)系代數(shù)是Set集合操作,會(huì)有去重復(fù)操作,UNION ALL 不進(jìn)行去重,如下所示:
(1) Table API 示例
我們簡(jiǎn)單的將customer_tab查詢(xún)2次,將查詢(xún)結(jié)果合并起來(lái),如下:
- val result = customer.unionAll(customer)
(2) Result
(3) 特別說(shuō)明
UNION ALL 對(duì)結(jié)果數(shù)據(jù)不進(jìn)行去重,如果想對(duì)結(jié)果數(shù)據(jù)進(jìn)行去重,傳統(tǒng)數(shù)據(jù)庫(kù)需要進(jìn)行UNION操作。
6. UNION
UNION 將兩個(gè)流給合并起來(lái),要求兩個(gè)流的字段完全一致,包括字段類(lèi)型、字段順序,并其UNION 不同于UNION ALL,UNION會(huì)對(duì)結(jié)果數(shù)據(jù)去重,與關(guān)系代數(shù)的Union語(yǔ)義一致,如下:
(1) Table API 示例
我們簡(jiǎn)單的將customer_tab查詢(xún)2次,將查詢(xún)結(jié)果合并起來(lái),如下:
- val result = customer.union(customer)
我們發(fā)現(xiàn)完全一樣的表數(shù)據(jù)進(jìn)行 UNION之后,數(shù)據(jù)是被去重的,UNION之后的數(shù)據(jù)并沒(méi)有增加。
(2) Result
(3) 特別說(shuō)明
UNION 對(duì)結(jié)果數(shù)據(jù)進(jìn)行去重,在實(shí)際的實(shí)現(xiàn)過(guò)程需要對(duì)數(shù)據(jù)進(jìn)行排序操作,所以非必要去重情況請(qǐng)使用UNION ALL操作。
7. JOIN
JOIN 用于把來(lái)自?xún)蓚€(gè)表的行聯(lián)合起來(lái)形成一個(gè)寬表,Apache Flink支持的JOIN類(lèi)型:
- JOIN - INNER JOIN
- LEFT JOIN - LEFT OUTER JOIN
- RIGHT JOIN - RIGHT OUTER JOIN
- FULL JOIN - FULL OUTER JOIN
JOIN與關(guān)系代數(shù)的Join語(yǔ)義相同,具體如下:
(1) Table API 示例 (JOIN)
INNER JOIN只選擇滿(mǎn)足ON條件的記錄,我們查詢(xún)customer_tab 和 order_tab表,將有訂單的客戶(hù)和訂單信息選擇出來(lái),如下:
- val result = customer
- .join(order.select('o_id, 'c_id as 'o_c_id, 'o_time, 'o_desc), 'c_id === 'o_c_id)
(2)Result:
(3) Table API 示例 (LEFT JOIN)
LEFT JOIN與INNER JOIN的區(qū)別是當(dāng)右表沒(méi)有與左邊相JOIN的數(shù)據(jù)時(shí)候,右邊對(duì)應(yīng)的字段補(bǔ)NULL輸出,語(yǔ)義如下:
對(duì)應(yīng)的SQL語(yǔ)句如下(LEFT JOIN):
- SELECT ColA, ColB, T2.ColC, ColE FROM TI LEFT JOIN T2 ON T1.ColC = T2.ColC ;
細(xì)心的讀者可能發(fā)現(xiàn)上面T2.ColC是添加了前綴T2了,這里需要說(shuō)明一下,當(dāng)兩張表有字段名字一樣的時(shí)候,我需要指定是從那個(gè)表里面投影的。
我們查詢(xún)customer_tab 和 order_tab表,將客戶(hù)和訂單信息選擇出來(lái)如下:
- val result = customer
- .leftOuterJoin(order.select('o_id, 'c_id as 'o_c_id, 'o_time, 'o_desc), 'c_id === 'o_c_id)
(4) Result
(5) 特別說(shuō)明
RIGHT JOIN 相當(dāng)于 LEFT JOIN 左右兩個(gè)表交互一下位置。FULL JOIN相當(dāng)于 RIGHT JOIN 和 LEFT JOIN 之后進(jìn)行UNION ALL操作。
8. Time-Interval JOIN
Time-Interval JOIN 相對(duì)于UnBounded的雙流JOIN來(lái)說(shuō)是Bounded JOIN。就是每條流的每一條數(shù)據(jù)會(huì)與另一條流上的不同時(shí)間區(qū)域的數(shù)據(jù)進(jìn)行JOIN。對(duì)應(yīng)Apache Flink官方文檔的 Time-windowed JOIN(release-1.7之前都叫Time-Windowed JOIN)。 Time-Interval JOIN的語(yǔ)義和實(shí)現(xiàn)原理詳見(jiàn)《Apache Flink 漫談系列(12) - Time Interval(Time-windowed) JOIN》。其Table API核心的語(yǔ)法示例,如下:
- ...
- val result = left
- .join(right)
- // 定義Time Interval
- .where('a === 'd && 'c >= 'f - 5.seconds && 'c < 'f + 6.seconds)
- ...
9. Lateral JOIN
Apache Flink Lateral JOIN 是左邊Table與一個(gè)UDTF進(jìn)行JOIN,詳細(xì)的語(yǔ)義和實(shí)現(xiàn)原理請(qǐng)參考《Apache Flink 漫談系列(10) - JOIN LATERAL》。其Table API核心的語(yǔ)法示例,如下:
- ...
- val udtf = new UDTF
- val result = source.join(udtf('c) as ('d, 'e))
- ...
10. Temporal Table JOIN
Temporal Table JOIN 是左邊表與右邊一個(gè)攜帶版本信息的表進(jìn)行JOIN,詳細(xì)的語(yǔ)法,語(yǔ)義和實(shí)現(xiàn)原理詳見(jiàn)《Apache Flink 漫談系列(11) - Temporal Table JOIN》,其Table API核心的語(yǔ)法示例,如下:
- ...
- val rates = tEnv.scan("versonedTable").createTemporalTableFunction('rowtime, 'r_currency)
- val result = left.join(rates('o_rowtime), 'r_currency === 'o_currency)...
11. Window
在Apache Flink中有2種類(lèi)型的Window,一種是OverWindow,即傳統(tǒng)數(shù)據(jù)庫(kù)的標(biāo)準(zhǔn)開(kāi)窗,每一個(gè)元素都對(duì)應(yīng)一個(gè)窗口。一種是GroupWindow,目前在SQL中GroupWindow都是基于時(shí)間進(jìn)行窗口劃分的。
(1) Over Window
Apache Flink中對(duì)OVER Window的定義遵循標(biāo)準(zhǔn)SQL的定義語(yǔ)法。
按ROWS和RANGE分類(lèi)是傳統(tǒng)數(shù)據(jù)庫(kù)的標(biāo)準(zhǔn)分類(lèi)方法,在Apache Flink中還可以根據(jù)時(shí)間類(lèi)型(ProcTime/EventTime)和窗口的有限和無(wú)限(Bounded/UnBounded)進(jìn)行分類(lèi),共計(jì)8種類(lèi)型。為了避免大家對(duì)過(guò)細(xì)分類(lèi)造成困擾,我們按照確定當(dāng)前行的不同方式將OVER Window分成兩大類(lèi)進(jìn)行介紹,如下:
- ROWS OVER Window - 每一行元素都視為新的計(jì)算行,即,每一行都是一個(gè)新的窗口。
- RANGE OVER Window - 具有相同時(shí)間值的所有元素行視為同一計(jì)算行,即,具有相同時(shí)間值的所有行都是同一個(gè)窗口。
(a) Bounded ROWS OVER Window
Bounded ROWS OVER Window 每一行元素都視為新的計(jì)算行,即,每一行都是一個(gè)新的窗口。
- 語(yǔ)義
我們以3個(gè)元素(2 PRECEDING)的窗口為例,如下圖:
上圖所示窗口 user 1 的 w5和w6, user 2的 窗口 w2 和 w3,雖然有元素都是同一時(shí)刻到達(dá),但是他們?nèi)匀皇窃诓煌拇翱?,這一點(diǎn)有別于RANGE OVER Window。
- Table API 示例
利用item_tab測(cè)試數(shù)據(jù),我們統(tǒng)計(jì)同類(lèi)商品中當(dāng)前和當(dāng)前商品之前2個(gè)商品中的最高價(jià)格。
- val result = item
- .window(Over partitionBy 'itemType orderBy 'rowtime preceding 2.rows following CURRENT_ROW as 'w)
- .select('itemID, 'itemType, 'onSellTime, 'price, 'price.max over 'w as 'maxPrice)
- Result
(b) Bounded RANGE OVER Window
Bounded RANGE OVER Window 具有相同時(shí)間值的所有元素行視為同一計(jì)算行,即,具有相同時(shí)間值的所有行都是同一個(gè)窗口。
- 語(yǔ)義
我們以3秒中數(shù)據(jù)(INTERVAL '2' SECOND)的窗口為例,如下圖:
注意: 上圖所示窗口 user 1 的 w6, user 2的 窗口 w3,元素都是同一時(shí)刻到達(dá),他們是在同一個(gè)窗口,這一點(diǎn)有別于ROWS OVER Window。
- Tabel API 示例
我們統(tǒng)計(jì)同類(lèi)商品中當(dāng)前和當(dāng)前商品之前2分鐘商品中的最高價(jià)格。
- val result = item
- .window(Over partitionBy 'itemType orderBy 'rowtime preceding 2.minute following CURRENT_RANGE as 'w)
- .select('itemID, 'itemType, 'onSellTime, 'price, 'price.max over 'w as 'maxPrice)
(c) Result(Bounded RANGE OVER Window)
- 特別說(shuō)明
OverWindow最重要是要理解每一行數(shù)據(jù)都確定一個(gè)窗口,同時(shí)目前在Apache Flink中只支持按時(shí)間字段排序。并且OverWindow開(kāi)窗與GroupBy方式數(shù)據(jù)分組最大的不同在于,GroupBy數(shù)據(jù)分組統(tǒng)計(jì)時(shí)候,在SELECT中除了GROUP BY的key,不能直接選擇其他非key的字段,但是OverWindow沒(méi)有這個(gè)限制,SELECT可以選擇任何字段。比如一張表table(a,b,c,d)4個(gè)字段,如果按d分組求c的最大值,兩種寫(xiě)完如下:
- GROUP BY - tab.groupBy('d).select(d, MAX(c))
- OVER Window = tab.window(Over.. as 'w).select('a, 'b, 'c, 'd, c.max over 'w)
如上 OVER Window 雖然PARTITION BY d,但SELECT 中仍然可以選擇 a,b,c字段。但在GROUPBY中,SELECT 只能選擇 d 字段。
(2) Group Window
根據(jù)窗口數(shù)據(jù)劃分的不同,目前Apache Flink有如下3種Bounded Winodw:
- Tumble - 滾動(dòng)窗口,窗口數(shù)據(jù)有固定的大小,窗口數(shù)據(jù)無(wú)疊加;
- Hop - 滑動(dòng)窗口,窗口數(shù)據(jù)有固定大小,并且有固定的窗口重建頻率,窗口數(shù)據(jù)有疊加;
- Session - 會(huì)話(huà)窗口,窗口數(shù)據(jù)沒(méi)有固定的大小,根據(jù)窗口數(shù)據(jù)活躍程度劃分窗口,窗口數(shù)據(jù)無(wú)疊加。
說(shuō)明: Aapche Flink 還支持UnBounded的 Group Window,也就是全局Window,流上所有數(shù)據(jù)都在一個(gè)窗口里面,語(yǔ)義非常簡(jiǎn)單,這里不做詳細(xì)介紹了。
(a) Tumble
- 語(yǔ)義
Tumble 滾動(dòng)窗口有固定size,窗口數(shù)據(jù)不重疊,具體語(yǔ)義如下:
- Table API 示例
利用pageAccess_tab測(cè)試數(shù)據(jù),我們需要按不同地域統(tǒng)計(jì)每2分鐘的淘寶首頁(yè)的訪(fǎng)問(wèn)量(PV)。
- val result = pageAccess
- .window(Tumble over 2.minute on 'rowtime as 'w)
- .groupBy('w, 'region)
- .select('region, 'w.start, 'w.end, 'region.count as 'pv)
- Result
(b) Hop
Hop 滑動(dòng)窗口和滾動(dòng)窗口類(lèi)似,窗口有固定的size,與滾動(dòng)窗口不同的是滑動(dòng)窗口可以通過(guò)slide參數(shù)控制滑動(dòng)窗口的新建頻率。因此當(dāng)slide值小于窗口size的值的時(shí)候多個(gè)滑動(dòng)窗口會(huì)重疊。
- 語(yǔ)義
Hop 滑動(dòng)窗口語(yǔ)義如下所示:
- Table API 示例
利用pageAccessCount_tab測(cè)試數(shù)據(jù),我們需要每5分鐘統(tǒng)計(jì)近10分鐘的頁(yè)面訪(fǎng)問(wèn)量(PV).
- val result = pageAccessCount
- .window(Slide over 10.minute every 5.minute on 'rowtime as 'w)
- .groupBy('w)
- .select('w.start, 'w.end, 'accessCount.sum as 'accessCount)
- Result
(c) Session
Seeeion 會(huì)話(huà)窗口 是沒(méi)有固定大小的窗口,通過(guò)session的活躍度分組元素。不同于滾動(dòng)窗口和滑動(dòng)窗口,會(huì)話(huà)窗口不重疊,也沒(méi)有固定的起止時(shí)間。一個(gè)會(huì)話(huà)窗口在一段時(shí)間內(nèi)沒(méi)有接收到元素時(shí),即當(dāng)出現(xiàn)非活躍間隙時(shí)關(guān)閉。一個(gè)會(huì)話(huà)窗口 分配器通過(guò)配置session gap來(lái)指定非活躍周期的時(shí)長(zhǎng).
- 語(yǔ)義
Session 會(huì)話(huà)窗口語(yǔ)義如下所示:
- val result = pageAccessSession
- .window(Session withGap 3.minute on 'rowtime as 'w)
- .groupBy('w, 'region)
- .select('region, 'w.start, 'w.end, 'region.count as 'pv)
- Result
(d) 嵌套Window
在Window之后再進(jìn)行Window劃分也是比較常見(jiàn)的統(tǒng)計(jì)需求,那么在一個(gè)Event-Time的Window之后,如何再寫(xiě)一個(gè)Event-Time的Window呢?一個(gè)Window之后再描述一個(gè)Event-Time的Window最重要的是Event-time屬性的傳遞,在Table API中我們可以利用'w.rowtime來(lái)傳遞時(shí)間屬性,比如:Tumble Window之后再接一個(gè)Session Window 示例如下:
- ...
- val result = pageAccess
- .window(Tumble over 2.minute on 'rowtime as 'w1)
- .groupBy('w1)
- .select('w1.rowtime as 'rowtime, 'col1.count as 'cnt)
- .window(Session withGap 3.minute on 'rowtime as 'w2)
- .groupBy('w2)
- .select('cnt.sum)
- ...
五、Source&Sink
上面我們介紹了Apache Flink Table API核心算子的語(yǔ)義和具體示例,這部分將選取Bounded EventTime Tumble Window為例為大家編寫(xiě)一個(gè)完整的包括Source和Sink定義的Apache Flink Table API Job。假設(shè)有一張?zhí)詫氻?yè)面訪(fǎng)問(wèn)表(PageAccess_tab),有地域,用戶(hù)ID和訪(fǎng)問(wèn)時(shí)間。我們需要按不同地域統(tǒng)計(jì)每2分鐘的淘寶首頁(yè)的訪(fǎng)問(wèn)量(PV)。具體數(shù)據(jù)如下:
1. Source 定義
自定義Apache Flink Stream Source需要實(shí)現(xiàn)StreamTableSource, StreamTableSource中通過(guò)StreamExecutionEnvironment 的addSource方法獲取DataStream, 所以我們需要自定義一個(gè) SourceFunction, 并且要支持產(chǎn)生WaterMark,也就是要實(shí)現(xiàn)DefinedRowtimeAttributes接口。
(1) Source Function定義
支持接收攜帶EventTime的數(shù)據(jù)集合,Either的數(shù)據(jù)結(jié)構(gòu),Right表示W(wǎng)aterMark和Left表示數(shù)據(jù):
- class MySourceFunction[T](dataWithTimestampList: Seq[Either[(Long, T), Long]])
- extends SourceFunction[T] {
- override def run(ctx: SourceContext[T]): Unit = {
- dataWithTimestampList.foreach {
- case Left(t) => ctx.collectWithTimestamp(t._2, t._1)
- case Right(w) => ctx.emitWatermark(new Watermark(w))
- }
- }
- override def cancel(): Unit = ???}
(2) 定義 StreamTableSource
我們自定義的Source要攜帶我們測(cè)試的數(shù)據(jù),以及對(duì)應(yīng)的WaterMark數(shù)據(jù),具體如下:
- class MyTableSource extends StreamTableSource[Row] with DefinedRowtimeAttributes {
- val fieldNames = Array("accessTime", "region", "userId")
- val schema = new TableSchema(fieldNames, Array(Types.SQL_TIMESTAMP, Types.STRING, Types.STRING))
- val rowType = new RowTypeInfo(
- Array(Types.LONG, Types.STRING, Types.STRING).asInstanceOf[Array[TypeInformation[_]]],
- fieldNames)
- // 頁(yè)面訪(fǎng)問(wèn)表數(shù)據(jù) rows with timestamps and watermarks
- val data = Seq(
- Left(1510365660000L, Row.of(new JLong(1510365660000L), "ShangHai", "U0010")),
- Right(1510365660000L),
- Left(1510365660000L, Row.of(new JLong(1510365660000L), "BeiJing", "U1001")),
- Right(1510365660000L),
- Left(1510366200000L, Row.of(new JLong(1510366200000L), "BeiJing", "U2032")),
- Right(1510366200000L),
- Left(1510366260000L, Row.of(new JLong(1510366260000L), "BeiJing", "U1100")),
- Right(1510366260000L),
- Left(1510373400000L, Row.of(new JLong(1510373400000L), "ShangHai", "U0011")),
- Right(1510373400000L)
- )
- override def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor] = {
- Collections.singletonList(new RowtimeAttributeDescriptor(
- "accessTime",
- new ExistingField("accessTime"),
- PreserveWatermarks.INSTANCE))
- }
- override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
- execEnv.addSource(new MySourceFunction[Row](data)).returns(rowType).setParallelism(1)
- }
- override def getReturnType: TypeInformation[Row] = rowType
- override def getTableSchema: TableSchema = schema
- }
(3) Sink 定義
我們簡(jiǎn)單的將計(jì)算結(jié)果寫(xiě)入到Apache Flink內(nèi)置支持的CSVSink中,定義Sink如下:
- def getCsvTableSink: TableSink[Row] = {
- val tempFile = File.createTempFile("csv_sink_", "tem")
- // 打印sink的文件路徑,方便我們查看運(yùn)行結(jié)果
- println("Sink path : " + tempFile)
- if (tempFile.exists()) {
- tempFile.delete
網(wǎng)頁(yè)名稱(chēng):ApacheFlink漫談系列(13)-TableAPI概述
網(wǎng)址分享:http://www.dlmjj.cn/article/ccoscog.html


咨詢(xún)
建站咨詢(xún)
