日本综合一区二区|亚洲中文天堂综合|日韩欧美自拍一区|男女精品天堂一区|欧美自拍第6页亚洲成人精品一区|亚洲黄色天堂一区二区成人|超碰91偷拍第一页|日韩av夜夜嗨中文字幕|久久蜜综合视频官网|精美人妻一区二区三区

RELATEED CONSULTING
相關(guān)咨詢(xún)
選擇下列產(chǎn)品馬上在線(xiàn)溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問(wèn)題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營(yíng)銷(xiāo)解決方案
ApacheFlink漫談系列(13)-TableAPI概述

一、什么是Table API

創(chuàng)新互聯(lián)專(zhuān)注于如皋企業(yè)網(wǎng)站建設(shè),響應(yīng)式網(wǎng)站開(kāi)發(fā),商城網(wǎng)站定制開(kāi)發(fā)。如皋網(wǎng)站建設(shè)公司,為如皋等地區(qū)提供建站服務(wù)。全流程按需網(wǎng)站制作,專(zhuān)業(yè)設(shè)計(jì),全程項(xiàng)目跟蹤,創(chuàng)新互聯(lián)專(zhuān)業(yè)和態(tài)度為您提供的服務(wù)

在《Apache Flink 漫談系列(08) - SQL概覽》中我們概要的向大家介紹了什么是好SQL,SQL和Table API是Apache Flink中的同一層次的API抽象,如下圖所示:

Apache Flink 針對(duì)不同的用戶(hù)場(chǎng)景提供了三層用戶(hù)API,最下層ProcessFunction API可以對(duì)State,Timer等復(fù)雜機(jī)制進(jìn)行有效的控制,但用戶(hù)使用的便捷性很弱,也就是說(shuō)即使很簡(jiǎn)單統(tǒng)計(jì)邏輯,也要較多的代碼開(kāi)發(fā)。第二層DataStream API對(duì)窗口,聚合等算子進(jìn)行了封裝,用戶(hù)的便捷性有所增強(qiáng)。最上層是SQL/Table API,Table API是Apache Flink中的聲明式,可被查詢(xún)優(yōu)化器優(yōu)化的高級(jí)分析API。

二、Table API的特點(diǎn)

Table API和SQL都是Apache Flink中最高層的分析API,SQL所具備的特點(diǎn)Table API也都具有,如下:

  • 聲明式 - 用戶(hù)只關(guān)心做什么,不用關(guān)心怎么做;
  • 高性能 - 支持查詢(xún)優(yōu)化,可以獲取最好的執(zhí)行性能;
  • 流批統(tǒng)一 - 相同的統(tǒng)計(jì)邏輯,既可以流模式運(yùn)行,也可以批模式運(yùn)行;
  • 標(biāo)準(zhǔn)穩(wěn)定 - 語(yǔ)義遵循SQL標(biāo)準(zhǔn),語(yǔ)法語(yǔ)義明確,不易變動(dòng)。

當(dāng)然除了SQL的特性,因?yàn)門(mén)able API是在Flink中專(zhuān)門(mén)設(shè)計(jì)的,所以Table API還具有自身的特點(diǎn):

  • 表達(dá)方式的擴(kuò)展性 - 在Flink中可以為T(mén)able API開(kāi)發(fā)很多便捷性功能,如:Row.flatten(), map/flatMap 等
  • 功能的擴(kuò)展性 - 在Flink中可以為T(mén)able API擴(kuò)展更多的功能,如:Iteration,flatAggregate 等新功能
  • 編譯檢查 - Table API支持java和scala語(yǔ)言開(kāi)發(fā),支持IDE中進(jìn)行編譯檢查。

說(shuō)明:上面說(shuō)的map/flatMap/flatAggregate都是Apache Flink 社區(qū) FLIP-29 中規(guī)劃的新功能。

三、HelloWorld

在介紹Table API所有算子之前我們先編寫(xiě)一個(gè)簡(jiǎn)單的HelloWorld來(lái)直觀了解如何進(jìn)行Table API的開(kāi)發(fā)。

1. Maven 依賴(lài)

在pom文件中增加如下配置,本篇以flink-1.7.0功能為準(zhǔn)進(jìn)行后續(xù)介紹。

 
 
 
 
  1. 1.7.0
  2. org.apache.flink
  3. flink-table_2.11
  4. ${table.version}
  5. org.apache.flink
  6. flink-scala_2.11
  7. ${table.version}
  8. org.apache.flink
  9. flink-streaming-scala_2.11
  10. ${table.version}
  11. org.apache.flink
  12. flink-streaming-java_2.11
  13. ${table.version}

2. 程序結(jié)構(gòu)

在編寫(xiě)第一Flink Table API job之前我們先簡(jiǎn)單了解一下Flink Table API job的結(jié)構(gòu),如下圖所示:

  • 外部數(shù)據(jù)源,比如Kafka, Rabbitmq, CSV 等等;
  • 查詢(xún)計(jì)算邏輯,比如最簡(jiǎn)單的數(shù)據(jù)導(dǎo)入select,雙流Join,Window Aggregate 等;
  • 外部結(jié)果存儲(chǔ),比如Kafka,Cassandra,CSV等。

說(shuō)明:1和3 在Apache Flink中統(tǒng)稱(chēng)為Connector。

3. 主程序

我們以一個(gè)統(tǒng)計(jì)單詞數(shù)量的業(yè)務(wù)場(chǎng)景,編寫(xiě)第一個(gè)HelloWorld程序。

根據(jù)上面Flink job基本結(jié)構(gòu)介紹,要Table API完成WordCount的計(jì)算需求,我們需要完成三部分代碼:

  • TableSoruce Code - 用于創(chuàng)建數(shù)據(jù)源的代碼
  • Table API Query - 用于進(jìn)行word count統(tǒng)計(jì)的Table API 查詢(xún)邏輯
  • TableSink Code - 用于保存word count計(jì)算結(jié)果的結(jié)果表代碼

(1) 運(yùn)行模式選擇

一個(gè)job我們要選擇是Stream方式運(yùn)行還是Batch模式運(yùn)行,所以任何統(tǒng)計(jì)job的第一步是進(jìn)行運(yùn)行模式選擇,如下我們選擇Stream方式運(yùn)行。

 
 
 
 
  1. // Stream運(yùn)行環(huán)境
  2. val env = StreamExecutionEnvironment.getExecutionEnvironment
  3. val tEnv = TableEnvironment.getTableEnvironment(env)

(2) 構(gòu)建測(cè)試Source

我們用最簡(jiǎn)單的構(gòu)建Source方式進(jìn)行本次測(cè)試,代碼如下:

 
 
 
 
  1. // 測(cè)試數(shù)據(jù)
  2. val data = Seq("Flink", "Bob", "Bob", "something", "Hello", "Flink", "Bob")
  3. // 最簡(jiǎn)單的獲取Source方式
  4. val source = env.fromCollection(data).toTable(tEnv, 'word)

(3) WordCount 統(tǒng)計(jì)邏輯

WordCount核心統(tǒng)計(jì)邏輯就是按照單詞分組,然后計(jì)算每個(gè)單詞的數(shù)量,統(tǒng)計(jì)邏輯如下:

 
 
 
 
  1. // 單詞統(tǒng)計(jì)核心邏輯
  2. val result = source
  3. .groupBy('word) // 單詞分組
  4. .select('word, 'word.count) // 單詞統(tǒng)計(jì)

(4) 定義Sink

將WordCount的統(tǒng)計(jì)結(jié)果寫(xiě)入Sink中,代碼如下:

 
 
 
 
  1. // 自定義Sink
  2. val sink = new RetractSink // 自定義Sink(下面有完整代碼)
  3. // 計(jì)算結(jié)果寫(xiě)入sink
  4. result.toRetractStream[(String, Long)].addSink(sink)

(5) 完整的HelloWord代碼

為了方便大家運(yùn)行WordCount查詢(xún)統(tǒng)計(jì),將完整的代碼分享大家(基于flink-1.7.0),如下:

 
 
 
 
  1. import org.apache.flink.api.scala._
  2. import org.apache.flink.configuration.Configuration
  3. import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
  4. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
  5. import org.apache.flink.table.api.TableEnvironment
  6. import org.apache.flink.table.api.scala._
  7. import scala.collection.mutable
  8. object HelloWord {
  9. def main(args: Array[String]): Unit = {
  10. // 測(cè)試數(shù)據(jù)
  11. val data = Seq("Flink", "Bob", "Bob", "something", "Hello", "Flink", "Bob")
  12. // Stream運(yùn)行環(huán)境
  13. val env = StreamExecutionEnvironment.getExecutionEnvironment
  14. val tEnv = TableEnvironment.getTableEnvironment(env)
  15. // 最簡(jiǎn)單的獲取Source方式
  16. val source = env.fromCollection(data).toTable(tEnv, 'word)
  17. // 單詞統(tǒng)計(jì)核心邏輯
  18. val result = source
  19. .groupBy('word) // 單詞分組
  20. .select('word, 'word.count) // 單詞統(tǒng)計(jì)
  21. // 自定義Sink
  22. val sink = new RetractSink
  23. // 計(jì)算結(jié)果寫(xiě)入sink
  24. result.toRetractStream[(String, Long)].addSink(sink)
  25. env.execute
  26. }
  27. }
  28. class RetractSink extends RichSinkFunction[(Boolean, (String, Long))] {
  29. private var resultSet: mutable.Set[(String, Long)] = _
  30. override def open(parameters: Configuration): Unit = {
  31. // 初始化內(nèi)存存儲(chǔ)結(jié)構(gòu)
  32. resultSet = new mutable.HashSet[(String, Long)]
  33. }
  34. override def invoke(v: (Boolean, (String, Long)), context: SinkFunction.Context[_]): Unit = {
  35. if (v._1) {
  36. // 計(jì)算數(shù)據(jù)
  37. resultSet.add(v._2)
  38. }
  39. else {
  40. // 撤回?cái)?shù)據(jù)
  41. resultSet.remove(v._2)
  42. }
  43. }
  44. override def close(): Unit = {
  45. // 打印寫(xiě)入sink的結(jié)果數(shù)據(jù)
  46. resultSet.foreach(println)
  47. }
  48. }

運(yùn)行結(jié)果如下:

雖然上面用了較長(zhǎng)的紙墨介紹簡(jiǎn)單的WordCount統(tǒng)計(jì)邏輯,但source和sink部分都是可以在學(xué)習(xí)后面算子中被復(fù)用的。本例核心的統(tǒng)計(jì)邏輯只有一行代碼:

 
 
 
 
  1. source.groupBy('word).select('word, 'word.count)

所以Table API開(kāi)發(fā)技術(shù)任務(wù)非常的簡(jiǎn)潔高效。

四、Table API 算子

雖然Table API與SQL的算子語(yǔ)義一致,但在表達(dá)方式上面SQL以文本的方式展現(xiàn),Table API是以java或者scala語(yǔ)言的方式進(jìn)行開(kāi)發(fā)。為了大家方便閱讀,即便是在《Apache Flink 漫談系列(08) - SQL概覽》中介紹過(guò)的算子,在這里也會(huì)再次進(jìn)行介紹,當(dāng)然對(duì)于Table API和SQL不同的地方會(huì)進(jìn)行詳盡介紹。

1. 示例數(shù)據(jù)及測(cè)試類(lèi)

(1) 測(cè)試數(shù)據(jù)

  • customer_tab 表 - 客戶(hù)表保存客戶(hù)id,客戶(hù)姓名和客戶(hù)描述信息。字段及測(cè)試數(shù)據(jù)如下:
  • order_tab 表 - 訂單表保存客戶(hù)購(gòu)買(mǎi)的訂單信息,包括訂單id,訂單時(shí)間和訂單描述信息。 字段節(jié)測(cè)試數(shù)據(jù)如下:
  • Item_tab商品表, 攜帶商品id,商品類(lèi)型,出售時(shí)間,價(jià)格等信息,具體如下:
  • PageAccess_tab頁(yè)面訪(fǎng)問(wèn)表,包含用戶(hù)ID,訪(fǎng)問(wèn)時(shí)間,用戶(hù)所在地域信息,具體數(shù)據(jù)如下:
  • PageAccessCount_tab頁(yè)面訪(fǎng)問(wèn)表,訪(fǎng)問(wèn)量,訪(fǎng)問(wèn)時(shí)間,用戶(hù)所在地域信息,具體數(shù)據(jù)如下:
  • PageAccessSession_tab頁(yè)面訪(fǎng)問(wèn)表,訪(fǎng)問(wèn)量,訪(fǎng)問(wèn)時(shí)間,用戶(hù)所在地域信息,具體數(shù)據(jù)如下:

(2) 測(cè)試類(lèi)

我們創(chuàng)建一個(gè)TableAPIOverviewITCase.scala 用于接下來(lái)介紹Flink Table API算子的功能體驗(yàn)。代碼如下:

 
 
 
 
  1. import org.apache.flink.api.scala._
  2. import org.apache.flink.runtime.state.StateBackend
  3. import org.apache.flink.runtime.state.memory.MemoryStateBackend
  4. import org.apache.flink.streaming.api.TimeCharacteristic
  5. import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
  6. import org.apache.flink.streaming.api.functions.source.SourceFunction
  7. import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
  8. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
  9. import org.apache.flink.streaming.api.watermark.Watermark
  10. import org.apache.flink.table.api.{Table, TableEnvironment}
  11. import org.apache.flink.table.api.scala._
  12. import org.apache.flink.types.Row
  13. import org.junit.rules.TemporaryFolder
  14. import org.junit.{Rule, Test}
  15. import scala.collection.mutable
  16. import scala.collection.mutable.ArrayBuffer
  17. class Table APIOverviewITCase {
  18. // 客戶(hù)表數(shù)據(jù)
  19. val customer_data = new mutable.MutableList[(String, String, String)]
  20. customer_data.+=(("c_001", "Kevin", "from JinLin"))
  21. customer_data.+=(("c_002", "Sunny", "from JinLin"))
  22. customer_data.+=(("c_003", "JinCheng", "from HeBei"))
  23. // 訂單表數(shù)據(jù)
  24. val order_data = new mutable.MutableList[(String, String, String, String)]
  25. order_data.+=(("o_001", "c_002", "2018-11-05 10:01:01", "iphone"))
  26. order_data.+=(("o_002", "c_001", "2018-11-05 10:01:55", "ipad"))
  27. order_data.+=(("o_003", "c_001", "2018-11-05 10:03:44", "flink book"))
  28. // 商品銷(xiāo)售表數(shù)據(jù)
  29. val item_data = Seq(
  30. Left((1510365660000L, (1510365660000L, 20, "ITEM001", "Electronic"))),
  31. Right((1510365660000L)),
  32. Left((1510365720000L, (1510365720000L, 50, "ITEM002", "Electronic"))),
  33. Right((1510365720000L)),
  34. Left((1510365780000L, (1510365780000L, 30, "ITEM003", "Electronic"))),
  35. Left((1510365780000L, (1510365780000L, 60, "ITEM004", "Electronic"))),
  36. Right((1510365780000L)),
  37. Left((1510365900000L, (1510365900000L, 40, "ITEM005", "Electronic"))),
  38. Right((1510365900000L)),
  39. Left((1510365960000L, (1510365960000L, 20, "ITEM006", "Electronic"))),
  40. Right((1510365960000L)),
  41. Left((1510366020000L, (1510366020000L, 70, "ITEM007", "Electronic"))),
  42. Right((1510366020000L)),
  43. Left((1510366080000L, (1510366080000L, 20, "ITEM008", "Clothes"))),
  44. Right((151036608000L)))
  45. // 頁(yè)面訪(fǎng)問(wèn)表數(shù)據(jù)
  46. val pageAccess_data = Seq(
  47. Left((1510365660000L, (1510365660000L, "ShangHai", "U0010"))),
  48. Right((1510365660000L)),
  49. Left((1510365660000L, (1510365660000L, "BeiJing", "U1001"))),
  50. Right((1510365660000L)),
  51. Left((1510366200000L, (1510366200000L, "BeiJing", "U2032"))),
  52. Right((1510366200000L)),
  53. Left((1510366260000L, (1510366260000L, "BeiJing", "U1100"))),
  54. Right((1510366260000L)),
  55. Left((1510373400000L, (1510373400000L, "ShangHai", "U0011"))),
  56. Right((1510373400000L)))
  57. // 頁(yè)面訪(fǎng)問(wèn)量表數(shù)據(jù)2
  58. val pageAccessCount_data = Seq(
  59. Left((1510365660000L, (1510365660000L, "ShangHai", 100))),
  60. Right((1510365660000L)),
  61. Left((1510365660000L, (1510365660000L, "BeiJing", 86))),
  62. Right((1510365660000L)),
  63. Left((1510365960000L, (1510365960000L, "BeiJing", 210))),
  64. Right((1510366200000L)),
  65. Left((1510366200000L, (1510366200000L, "BeiJing", 33))),
  66. Right((1510366200000L)),
  67. Left((1510373400000L, (1510373400000L, "ShangHai", 129))),
  68. Right((1510373400000L)))
  69. // 頁(yè)面訪(fǎng)問(wèn)表數(shù)據(jù)3
  70. val pageAccessSession_data = Seq(
  71. Left((1510365660000L, (1510365660000L, "ShangHai", "U0011"))),
  72. Right((1510365660000L)),
  73. Left((1510365720000L, (1510365720000L, "ShangHai", "U0012"))),
  74. Right((1510365720000L)),
  75. Left((1510365720000L, (1510365720000L, "ShangHai", "U0013"))),
  76. Right((1510365720000L)),
  77. Left((1510365900000L, (1510365900000L, "ShangHai", "U0015"))),
  78. Right((1510365900000L)),
  79. Left((1510366200000L, (1510366200000L, "ShangHai", "U0011"))),
  80. Right((1510366200000L)),
  81. Left((1510366200000L, (1510366200000L, "BeiJing", "U2010"))),
  82. Right((1510366200000L)),
  83. Left((1510366260000L, (1510366260000L, "ShangHai", "U0011"))),
  84. Right((1510366260000L)),
  85. Left((1510373760000L, (1510373760000L, "ShangHai", "U0410"))),
  86. Right((1510373760000L)))
  87. val _tempFolder = new TemporaryFolder
  88. // Streaming 環(huán)境
  89. val env = StreamExecutionEnvironment.getExecutionEnvironment
  90. val tEnv = TableEnvironment.getTableEnvironment(env)
  91. env.setParallelism(1)
  92. env.setStateBackend(getStateBackend)
  93. def getProcTimeTables(): (Table, Table) = {
  94. // 將order_tab, customer_tab 注冊(cè)到catalog
  95. val customer = env.fromCollection(customer_data).toTable(tEnv).as('c_id, 'c_name, 'c_desc)
  96. val order = env.fromCollection(order_data).toTable(tEnv).as('o_id, 'c_id, 'o_time, 'o_desc)
  97. (customer, order)
  98. }
  99. def getEventTimeTables(): (Table, Table, Table, Table) = {
  100. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  101. // 將item_tab, pageAccess_tab 注冊(cè)到catalog
  102. val item =
  103. env.addSource(new EventTimeSourceFunction[(Long, Int, String, String)](item_data))
  104. .toTable(tEnv, 'onSellTime, 'price, 'itemID, 'itemType, 'rowtime.rowtime)
  105. val pageAccess =
  106. env.addSource(new EventTimeSourceFunction[(Long, String, String)](pageAccess_data))
  107. .toTable(tEnv, 'accessTime, 'region, 'userId, 'rowtime.rowtime)
  108. val pageAccessCount =
  109. env.addSource(new EventTimeSourceFunction[(Long, String, Int)](pageAccessCount_data))
  110. .toTable(tEnv, 'accessTime, 'region, 'accessCount, 'rowtime.rowtime)
  111. val pageAccessSession =
  112. env.addSource(new EventTimeSourceFunction[(Long, String, String)](pageAccessSession_data))
  113. .toTable(tEnv, 'accessTime, 'region, 'userId, 'rowtime.rowtime)
  114. (item, pageAccess, pageAccessCount, pageAccessSession)
  115. }
  116. @Rule
  117. def tempFolder: TemporaryFolder = _tempFolder
  118. def getStateBackend: StateBackend = {
  119. new MemoryStateBackend()
  120. }
  121. def procTimePrint(result: Table): Unit = {
  122. val sink = new RetractingSink
  123. result.toRetractStream[Row].addSink(sink)
  124. env.execute()
  125. }
  126. def rowTimePrint(result: Table): Unit = {
  127. val sink = new RetractingSink
  128. result.toRetractStream[Row].addSink(sink)
  129. env.execute()
  130. }
  131. @Test
  132. def testProc(): Unit = {
  133. val (customer, order) = getProcTimeTables()
  134. val result = ...// 測(cè)試的查詢(xún)邏輯
  135. procTimePrint(result)
  136. }
  137. @Test
  138. def testEvent(): Unit = {
  139. val (item, pageAccess, pageAccessCount, pageAccessSession) = getEventTimeTables()
  140. val result = ...// 測(cè)試的查詢(xún)邏輯
  141. procTimePrint(result)
  142. }
  143. }
  144. // 自定義Sink
  145. final class RetractingSink extends RichSinkFunction[(Boolean, Row)] {
  146. var retractedResults: ArrayBuffer[String] = null
  147. override def open(parameters: Configuration): Unit = {
  148. super.open(parameters)
  149. retractedResults = mutable.ArrayBuffer.empty[String]
  150. }
  151. def invoke(v: (Boolean, Row)) {
  152. retractedResults.synchronized {
  153. val vvalue = v._2.toString
  154. if (v._1) {
  155. retractedResults += value
  156. } else {
  157. val idx = retractedResults.indexOf(value)
  158. if (idx >= 0) {
  159. retractedResults.remove(idx)
  160. } else {
  161. throw new RuntimeException("Tried to retract a value that wasn't added first. " +
  162. "This is probably an incorrectly implemented test. " +
  163. "Try to set the parallelism of the sink to 1.")
  164. }
  165. }
  166. }
  167. }
  168. override def close(): Unit = {
  169. super.close()
  170. retractedResults.sorted.foreach(println(_))
  171. }
  172. }
  173. // Water mark 生成器
  174. class EventTimeSourceFunction[T](
  175. dataWithTimestampList: Seq[Either[(Long, T), Long]]) extends SourceFunction[T] {
  176. override def run(ctx: SourceContext[T]): Unit = {
  177. dataWithTimestampList.foreach {
  178. case Left(t) => ctx.collectWithTimestamp(t._2, t._1)
  179. case Right(w) => ctx.emitWatermark(new Watermark(w))
  180. }
  181. }
  182. override def cancel(): Unit = ???
  183. }

2. SELECT

SELECT 用于從數(shù)據(jù)集/流中選擇數(shù)據(jù),語(yǔ)義是關(guān)系代數(shù)中的投影(Projection),對(duì)關(guān)系進(jìn)行垂直分割,消去或增加某些列, 如下圖所示:

(1) Table API 示例

從customer_tab選擇用戶(hù)姓名,并用內(nèi)置的CONCAT函數(shù)拼接客戶(hù)信息,如下:

 
 
 
 
  1. val result = customer
  2. .select('c_name, concat_ws('c_name, " come ", 'c_desc))

(2) Result

(3) 特別說(shuō)明

大家看到在 SELECT 不僅可以使用普通的字段選擇,還可以使用ScalarFunction,當(dāng)然也包括User-Defined Function,同時(shí)還可以進(jìn)行字段的alias設(shè)置。其實(shí)SELECT可以結(jié)合聚合,在GROUPBY部分會(huì)進(jìn)行介紹,一個(gè)比較特殊的使用場(chǎng)景是去重的場(chǎng)景,示例如下:

  • Table API示例

在訂單表查詢(xún)所有的客戶(hù)id,消除重復(fù)客戶(hù)id, 如下:

 
 
 
 
  1. val result = order
  2. .groupBy('c_id)
  3. .select('c_id)
  • Result

3. WHERE

WHERE 用于從數(shù)據(jù)集/流中過(guò)濾數(shù)據(jù),與SELECT一起使用,語(yǔ)義是關(guān)系代數(shù)的Selection,根據(jù)某些條件對(duì)關(guān)系做水平分割,即選擇符合條件的記錄,如下所示:

(1) Table API 示例

在customer_tab查詢(xún)客戶(hù)id為c_001和c_003的客戶(hù)信息,如下:

 
 
 
 
  1. val result = customer
  2. .where("c_|| c_")
  3. .select( 'c_id, 'c_name, 'c_desc)

(2) Result

(3) 特別說(shuō)明

我們發(fā)現(xiàn)WHERE是對(duì)滿(mǎn)足一定條件的數(shù)據(jù)進(jìn)行過(guò)濾,WHERE支持=, <, >, <>, >=, <=以及&&, ||等表達(dá)式的組合,最終滿(mǎn)足過(guò)濾條件的數(shù)據(jù)會(huì)被選擇出來(lái)。 SQL中的IN和NOT IN在Table API里面用intersect 和 minus描述(flink-1.7.0版本)。

  • Intersect 示例

Intersect只在Batch模式下進(jìn)行支持,Stream模式下我們可以利用雙流JOIN來(lái)實(shí)現(xiàn),如:在customer_tab查詢(xún)已經(jīng)下過(guò)訂單的客戶(hù)信息,如下:

 
 
 
 
  1. // 計(jì)算客戶(hù)id,并去重
  2. val distinct_cids = order
  3. .groupBy('c_id) // 去重
  4. .select('c_id as 'o_c_id)
  5. val result = customer
  6. .join(distinct_cids, 'c_id === 'o_c_id)
  7. .select('c_id, 'c_name, 'c_desc)
  • Result

  • Minus 示例

Minus只在Batch模式下進(jìn)行支持,Stream模式下我們可以利用雙流JOIN來(lái)實(shí)現(xiàn),如:在customer_tab查詢(xún)沒(méi)有下過(guò)訂單的客戶(hù)信息,如下:

 
 
 
 
  1. // 查詢(xún)下過(guò)訂單的客戶(hù)id,并去重
  2. val distinct_cids = order
  3. .groupBy('c_id)
  4. .select('c_id as 'o_c_id)
  5. // 查詢(xún)沒(méi)有下過(guò)訂單的客戶(hù)信息
  6. val result = customer
  7. .leftOuterJoin(distinct_cids, 'c_id === 'o_c_id)
  8. .where('o_c_id isNull)
  9. .select('c_id, 'c_name, 'c_desc)

說(shuō)明上面實(shí)現(xiàn)邏輯比較復(fù)雜,我們后續(xù)考慮如何在流上支持更簡(jiǎn)潔的方式。

  • Result

  • Intersect/Minus與關(guān)系代數(shù)

如上介紹Intersect是關(guān)系代數(shù)中的Intersection, Minus是關(guān)系代數(shù)的Difference, 如下圖示意:

a. Intersect(Intersection):

b. Minus(Difference):

4. GROUP BY

GROUP BY 是對(duì)數(shù)據(jù)進(jìn)行分組的操作,比如我需要分別計(jì)算一下一個(gè)學(xué)生表里面女生和男生的人數(shù)分別是多少,如下:

(1) Table API 示例

將order_tab信息按c_id分組統(tǒng)計(jì)訂單數(shù)量,簡(jiǎn)單示例如下:

 
 
 
 
  1. val result = order
  2. .groupBy('c_id)
  3. .select('c_id, 'o_id.count)

(2) Result

(3) 特別說(shuō)明

在實(shí)際的業(yè)務(wù)場(chǎng)景中,GROUP BY除了按業(yè)務(wù)字段進(jìn)行分組外,很多時(shí)候用戶(hù)也可以用時(shí)間來(lái)進(jìn)行分組(相當(dāng)于劃分窗口),比如統(tǒng)計(jì)每分鐘的訂單數(shù)量:

  • Table API 示例

按時(shí)間進(jìn)行分組,查詢(xún)每分鐘的訂單數(shù)量,如下:

 
 
 
 
  1. ```
  2. val result = order
  3. .select('o_id, 'c_id, 'o_time.substring(1, 16) as 'o_time_min)
  4. .groupBy('o_time_min)
  5. .select('o_time_min, 'o_id.count)
  6. ```
  • Result

說(shuō)明:如果我們時(shí)間字段是timestamp類(lèi)型,建議使用內(nèi)置的 DATE_FORMAT 函數(shù)。

5. UNION ALL

UNION ALL 將兩個(gè)表合并起來(lái),要求兩個(gè)表的字段完全一致,包括字段類(lèi)型、字段順序,語(yǔ)義對(duì)應(yīng)關(guān)系代數(shù)的Union,只是關(guān)系代數(shù)是Set集合操作,會(huì)有去重復(fù)操作,UNION ALL 不進(jìn)行去重,如下所示:

(1) Table API 示例

我們簡(jiǎn)單的將customer_tab查詢(xún)2次,將查詢(xún)結(jié)果合并起來(lái),如下:

 
 
 
 
  1. val result = customer.unionAll(customer)

(2) Result

(3) 特別說(shuō)明

UNION ALL 對(duì)結(jié)果數(shù)據(jù)不進(jìn)行去重,如果想對(duì)結(jié)果數(shù)據(jù)進(jìn)行去重,傳統(tǒng)數(shù)據(jù)庫(kù)需要進(jìn)行UNION操作。

6. UNION

UNION 將兩個(gè)流給合并起來(lái),要求兩個(gè)流的字段完全一致,包括字段類(lèi)型、字段順序,并其UNION 不同于UNION ALL,UNION會(huì)對(duì)結(jié)果數(shù)據(jù)去重,與關(guān)系代數(shù)的Union語(yǔ)義一致,如下:

(1) Table API 示例

我們簡(jiǎn)單的將customer_tab查詢(xún)2次,將查詢(xún)結(jié)果合并起來(lái),如下:

 
 
 
 
  1. val result = customer.union(customer)

我們發(fā)現(xiàn)完全一樣的表數(shù)據(jù)進(jìn)行 UNION之后,數(shù)據(jù)是被去重的,UNION之后的數(shù)據(jù)并沒(méi)有增加。

(2) Result

(3) 特別說(shuō)明

UNION 對(duì)結(jié)果數(shù)據(jù)進(jìn)行去重,在實(shí)際的實(shí)現(xiàn)過(guò)程需要對(duì)數(shù)據(jù)進(jìn)行排序操作,所以非必要去重情況請(qǐng)使用UNION ALL操作。

7. JOIN

JOIN 用于把來(lái)自?xún)蓚€(gè)表的行聯(lián)合起來(lái)形成一個(gè)寬表,Apache Flink支持的JOIN類(lèi)型:

  • JOIN - INNER JOIN
  • LEFT JOIN - LEFT OUTER JOIN
  • RIGHT JOIN - RIGHT OUTER JOIN
  • FULL JOIN - FULL OUTER JOIN

JOIN與關(guān)系代數(shù)的Join語(yǔ)義相同,具體如下:

(1) Table API 示例 (JOIN)

INNER JOIN只選擇滿(mǎn)足ON條件的記錄,我們查詢(xún)customer_tab 和 order_tab表,將有訂單的客戶(hù)和訂單信息選擇出來(lái),如下:

 
 
 
 
  1. val result = customer
  2. .join(order.select('o_id, 'c_id as 'o_c_id, 'o_time, 'o_desc), 'c_id === 'o_c_id)

(2)Result:

(3) Table API 示例 (LEFT JOIN)

LEFT JOIN與INNER JOIN的區(qū)別是當(dāng)右表沒(méi)有與左邊相JOIN的數(shù)據(jù)時(shí)候,右邊對(duì)應(yīng)的字段補(bǔ)NULL輸出,語(yǔ)義如下:

對(duì)應(yīng)的SQL語(yǔ)句如下(LEFT JOIN):

 
 
 
 
  1. SELECT ColA, ColB, T2.ColC, ColE FROM TI LEFT JOIN T2 ON T1.ColC = T2.ColC ;

細(xì)心的讀者可能發(fā)現(xiàn)上面T2.ColC是添加了前綴T2了,這里需要說(shuō)明一下,當(dāng)兩張表有字段名字一樣的時(shí)候,我需要指定是從那個(gè)表里面投影的。

我們查詢(xún)customer_tab 和 order_tab表,將客戶(hù)和訂單信息選擇出來(lái)如下:

 
 
 
 
  1. val result = customer
  2. .leftOuterJoin(order.select('o_id, 'c_id as 'o_c_id, 'o_time, 'o_desc), 'c_id === 'o_c_id)

(4) Result

(5) 特別說(shuō)明

RIGHT JOIN 相當(dāng)于 LEFT JOIN 左右兩個(gè)表交互一下位置。FULL JOIN相當(dāng)于 RIGHT JOIN 和 LEFT JOIN 之后進(jìn)行UNION ALL操作。

8. Time-Interval JOIN

Time-Interval JOIN 相對(duì)于UnBounded的雙流JOIN來(lái)說(shuō)是Bounded JOIN。就是每條流的每一條數(shù)據(jù)會(huì)與另一條流上的不同時(shí)間區(qū)域的數(shù)據(jù)進(jìn)行JOIN。對(duì)應(yīng)Apache Flink官方文檔的 Time-windowed JOIN(release-1.7之前都叫Time-Windowed JOIN)。 Time-Interval JOIN的語(yǔ)義和實(shí)現(xiàn)原理詳見(jiàn)《Apache Flink 漫談系列(12) - Time Interval(Time-windowed) JOIN》。其Table API核心的語(yǔ)法示例,如下:

 
 
 
 
  1. ...
  2. val result = left
  3. .join(right)
  4. // 定義Time Interval
  5. .where('a === 'd && 'c >= 'f - 5.seconds && 'c < 'f + 6.seconds)
  6. ...

9. Lateral JOIN

Apache Flink Lateral JOIN 是左邊Table與一個(gè)UDTF進(jìn)行JOIN,詳細(xì)的語(yǔ)義和實(shí)現(xiàn)原理請(qǐng)參考《Apache Flink 漫談系列(10) - JOIN LATERAL》。其Table API核心的語(yǔ)法示例,如下:

 
 
 
 
  1. ...
  2. val udtf = new UDTF
  3. val result = source.join(udtf('c) as ('d, 'e))
  4. ...

10. Temporal Table JOIN

Temporal Table JOIN 是左邊表與右邊一個(gè)攜帶版本信息的表進(jìn)行JOIN,詳細(xì)的語(yǔ)法,語(yǔ)義和實(shí)現(xiàn)原理詳見(jiàn)《Apache Flink 漫談系列(11) - Temporal Table JOIN》,其Table API核心的語(yǔ)法示例,如下:

 
 
 
 
  1. ...
  2. val rates = tEnv.scan("versonedTable").createTemporalTableFunction('rowtime, 'r_currency)
  3. val result = left.join(rates('o_rowtime), 'r_currency === 'o_currency)...

11. Window

在Apache Flink中有2種類(lèi)型的Window,一種是OverWindow,即傳統(tǒng)數(shù)據(jù)庫(kù)的標(biāo)準(zhǔn)開(kāi)窗,每一個(gè)元素都對(duì)應(yīng)一個(gè)窗口。一種是GroupWindow,目前在SQL中GroupWindow都是基于時(shí)間進(jìn)行窗口劃分的。

(1) Over Window

Apache Flink中對(duì)OVER Window的定義遵循標(biāo)準(zhǔn)SQL的定義語(yǔ)法。

按ROWS和RANGE分類(lèi)是傳統(tǒng)數(shù)據(jù)庫(kù)的標(biāo)準(zhǔn)分類(lèi)方法,在Apache Flink中還可以根據(jù)時(shí)間類(lèi)型(ProcTime/EventTime)和窗口的有限和無(wú)限(Bounded/UnBounded)進(jìn)行分類(lèi),共計(jì)8種類(lèi)型。為了避免大家對(duì)過(guò)細(xì)分類(lèi)造成困擾,我們按照確定當(dāng)前行的不同方式將OVER Window分成兩大類(lèi)進(jìn)行介紹,如下:

  • ROWS OVER Window - 每一行元素都視為新的計(jì)算行,即,每一行都是一個(gè)新的窗口。
  • RANGE OVER Window - 具有相同時(shí)間值的所有元素行視為同一計(jì)算行,即,具有相同時(shí)間值的所有行都是同一個(gè)窗口。

(a) Bounded ROWS OVER Window

Bounded ROWS OVER Window 每一行元素都視為新的計(jì)算行,即,每一行都是一個(gè)新的窗口。

  • 語(yǔ)義

我們以3個(gè)元素(2 PRECEDING)的窗口為例,如下圖:

上圖所示窗口 user 1 的 w5和w6, user 2的 窗口 w2 和 w3,雖然有元素都是同一時(shí)刻到達(dá),但是他們?nèi)匀皇窃诓煌拇翱?,這一點(diǎn)有別于RANGE OVER Window。

  • Table API 示例

利用item_tab測(cè)試數(shù)據(jù),我們統(tǒng)計(jì)同類(lèi)商品中當(dāng)前和當(dāng)前商品之前2個(gè)商品中的最高價(jià)格。

 
 
 
 
  1. val result = item
  2. .window(Over partitionBy 'itemType orderBy 'rowtime preceding 2.rows following CURRENT_ROW as 'w)
  3. .select('itemID, 'itemType, 'onSellTime, 'price, 'price.max over 'w as 'maxPrice)
  • Result

(b) Bounded RANGE OVER Window

Bounded RANGE OVER Window 具有相同時(shí)間值的所有元素行視為同一計(jì)算行,即,具有相同時(shí)間值的所有行都是同一個(gè)窗口。

  • 語(yǔ)義

我們以3秒中數(shù)據(jù)(INTERVAL '2' SECOND)的窗口為例,如下圖:

注意: 上圖所示窗口 user 1 的 w6, user 2的 窗口 w3,元素都是同一時(shí)刻到達(dá),他們是在同一個(gè)窗口,這一點(diǎn)有別于ROWS OVER Window。

  • Tabel API 示例

我們統(tǒng)計(jì)同類(lèi)商品中當(dāng)前和當(dāng)前商品之前2分鐘商品中的最高價(jià)格。

 
 
 
 
  1. val result = item
  2. .window(Over partitionBy 'itemType orderBy 'rowtime preceding 2.minute following CURRENT_RANGE as 'w)
  3. .select('itemID, 'itemType, 'onSellTime, 'price, 'price.max over 'w as 'maxPrice)

(c) Result(Bounded RANGE OVER Window)

  • 特別說(shuō)明

OverWindow最重要是要理解每一行數(shù)據(jù)都確定一個(gè)窗口,同時(shí)目前在Apache Flink中只支持按時(shí)間字段排序。并且OverWindow開(kāi)窗與GroupBy方式數(shù)據(jù)分組最大的不同在于,GroupBy數(shù)據(jù)分組統(tǒng)計(jì)時(shí)候,在SELECT中除了GROUP BY的key,不能直接選擇其他非key的字段,但是OverWindow沒(méi)有這個(gè)限制,SELECT可以選擇任何字段。比如一張表table(a,b,c,d)4個(gè)字段,如果按d分組求c的最大值,兩種寫(xiě)完如下:

  • GROUP BY - tab.groupBy('d).select(d, MAX(c))
  • OVER Window = tab.window(Over.. as 'w).select('a, 'b, 'c, 'd, c.max over 'w)

如上 OVER Window 雖然PARTITION BY d,但SELECT 中仍然可以選擇 a,b,c字段。但在GROUPBY中,SELECT 只能選擇 d 字段。

(2) Group Window

根據(jù)窗口數(shù)據(jù)劃分的不同,目前Apache Flink有如下3種Bounded Winodw:

  • Tumble - 滾動(dòng)窗口,窗口數(shù)據(jù)有固定的大小,窗口數(shù)據(jù)無(wú)疊加;
  • Hop - 滑動(dòng)窗口,窗口數(shù)據(jù)有固定大小,并且有固定的窗口重建頻率,窗口數(shù)據(jù)有疊加;
  • Session - 會(huì)話(huà)窗口,窗口數(shù)據(jù)沒(méi)有固定的大小,根據(jù)窗口數(shù)據(jù)活躍程度劃分窗口,窗口數(shù)據(jù)無(wú)疊加。

說(shuō)明: Aapche Flink 還支持UnBounded的 Group Window,也就是全局Window,流上所有數(shù)據(jù)都在一個(gè)窗口里面,語(yǔ)義非常簡(jiǎn)單,這里不做詳細(xì)介紹了。

(a) Tumble

  • 語(yǔ)義

Tumble 滾動(dòng)窗口有固定size,窗口數(shù)據(jù)不重疊,具體語(yǔ)義如下:

  • Table API 示例

利用pageAccess_tab測(cè)試數(shù)據(jù),我們需要按不同地域統(tǒng)計(jì)每2分鐘的淘寶首頁(yè)的訪(fǎng)問(wèn)量(PV)。

 
 
 
 
  1. val result = pageAccess
  2. .window(Tumble over 2.minute on 'rowtime as 'w)
  3. .groupBy('w, 'region)
  4. .select('region, 'w.start, 'w.end, 'region.count as 'pv)
  • Result

(b) Hop

Hop 滑動(dòng)窗口和滾動(dòng)窗口類(lèi)似,窗口有固定的size,與滾動(dòng)窗口不同的是滑動(dòng)窗口可以通過(guò)slide參數(shù)控制滑動(dòng)窗口的新建頻率。因此當(dāng)slide值小于窗口size的值的時(shí)候多個(gè)滑動(dòng)窗口會(huì)重疊。

  • 語(yǔ)義

Hop 滑動(dòng)窗口語(yǔ)義如下所示:

  • Table API 示例

利用pageAccessCount_tab測(cè)試數(shù)據(jù),我們需要每5分鐘統(tǒng)計(jì)近10分鐘的頁(yè)面訪(fǎng)問(wèn)量(PV).

 
 
 
 
  1. val result = pageAccessCount
  2. .window(Slide over 10.minute every 5.minute on 'rowtime as 'w)
  3. .groupBy('w)
  4. .select('w.start, 'w.end, 'accessCount.sum as 'accessCount)
  • Result

(c) Session

Seeeion 會(huì)話(huà)窗口 是沒(méi)有固定大小的窗口,通過(guò)session的活躍度分組元素。不同于滾動(dòng)窗口和滑動(dòng)窗口,會(huì)話(huà)窗口不重疊,也沒(méi)有固定的起止時(shí)間。一個(gè)會(huì)話(huà)窗口在一段時(shí)間內(nèi)沒(méi)有接收到元素時(shí),即當(dāng)出現(xiàn)非活躍間隙時(shí)關(guān)閉。一個(gè)會(huì)話(huà)窗口 分配器通過(guò)配置session gap來(lái)指定非活躍周期的時(shí)長(zhǎng).

  • 語(yǔ)義

Session 會(huì)話(huà)窗口語(yǔ)義如下所示:

 
 
 
 
  1. val result = pageAccessSession
  2. .window(Session withGap 3.minute on 'rowtime as 'w)
  3. .groupBy('w, 'region)
  4. .select('region, 'w.start, 'w.end, 'region.count as 'pv)
  • Result

(d) 嵌套Window

在Window之后再進(jìn)行Window劃分也是比較常見(jiàn)的統(tǒng)計(jì)需求,那么在一個(gè)Event-Time的Window之后,如何再寫(xiě)一個(gè)Event-Time的Window呢?一個(gè)Window之后再描述一個(gè)Event-Time的Window最重要的是Event-time屬性的傳遞,在Table API中我們可以利用'w.rowtime來(lái)傳遞時(shí)間屬性,比如:Tumble Window之后再接一個(gè)Session Window 示例如下:

 
 
 
 
  1. ...
  2. val result = pageAccess
  3. .window(Tumble over 2.minute on 'rowtime as 'w1)
  4. .groupBy('w1)
  5. .select('w1.rowtime as 'rowtime, 'col1.count as 'cnt)
  6. .window(Session withGap 3.minute on 'rowtime as 'w2)
  7. .groupBy('w2)
  8. .select('cnt.sum)
  9. ...

五、Source&Sink

上面我們介紹了Apache Flink Table API核心算子的語(yǔ)義和具體示例,這部分將選取Bounded EventTime Tumble Window為例為大家編寫(xiě)一個(gè)完整的包括Source和Sink定義的Apache Flink Table API Job。假設(shè)有一張?zhí)詫氻?yè)面訪(fǎng)問(wèn)表(PageAccess_tab),有地域,用戶(hù)ID和訪(fǎng)問(wèn)時(shí)間。我們需要按不同地域統(tǒng)計(jì)每2分鐘的淘寶首頁(yè)的訪(fǎng)問(wèn)量(PV)。具體數(shù)據(jù)如下:

1. Source 定義

自定義Apache Flink Stream Source需要實(shí)現(xiàn)StreamTableSource, StreamTableSource中通過(guò)StreamExecutionEnvironment 的addSource方法獲取DataStream, 所以我們需要自定義一個(gè) SourceFunction, 并且要支持產(chǎn)生WaterMark,也就是要實(shí)現(xiàn)DefinedRowtimeAttributes接口。

(1) Source Function定義

支持接收攜帶EventTime的數(shù)據(jù)集合,Either的數(shù)據(jù)結(jié)構(gòu),Right表示W(wǎng)aterMark和Left表示數(shù)據(jù):

 
 
 
 
  1. class MySourceFunction[T](dataWithTimestampList: Seq[Either[(Long, T), Long]])
  2. extends SourceFunction[T] {
  3. override def run(ctx: SourceContext[T]): Unit = {
  4. dataWithTimestampList.foreach {
  5. case Left(t) => ctx.collectWithTimestamp(t._2, t._1)
  6. case Right(w) => ctx.emitWatermark(new Watermark(w))
  7. }
  8. }
  9. override def cancel(): Unit = ???}

(2) 定義 StreamTableSource

我們自定義的Source要攜帶我們測(cè)試的數(shù)據(jù),以及對(duì)應(yīng)的WaterMark數(shù)據(jù),具體如下:

 
 
 
 
  1. class MyTableSource extends StreamTableSource[Row] with DefinedRowtimeAttributes {
  2. val fieldNames = Array("accessTime", "region", "userId")
  3. val schema = new TableSchema(fieldNames, Array(Types.SQL_TIMESTAMP, Types.STRING, Types.STRING))
  4. val rowType = new RowTypeInfo(
  5. Array(Types.LONG, Types.STRING, Types.STRING).asInstanceOf[Array[TypeInformation[_]]],
  6. fieldNames)
  7. // 頁(yè)面訪(fǎng)問(wèn)表數(shù)據(jù) rows with timestamps and watermarks
  8. val data = Seq(
  9. Left(1510365660000L, Row.of(new JLong(1510365660000L), "ShangHai", "U0010")),
  10. Right(1510365660000L),
  11. Left(1510365660000L, Row.of(new JLong(1510365660000L), "BeiJing", "U1001")),
  12. Right(1510365660000L),
  13. Left(1510366200000L, Row.of(new JLong(1510366200000L), "BeiJing", "U2032")),
  14. Right(1510366200000L),
  15. Left(1510366260000L, Row.of(new JLong(1510366260000L), "BeiJing", "U1100")),
  16. Right(1510366260000L),
  17. Left(1510373400000L, Row.of(new JLong(1510373400000L), "ShangHai", "U0011")),
  18. Right(1510373400000L)
  19. )
  20. override def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor] = {
  21. Collections.singletonList(new RowtimeAttributeDescriptor(
  22. "accessTime",
  23. new ExistingField("accessTime"),
  24. PreserveWatermarks.INSTANCE))
  25. }
  26. override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
  27. execEnv.addSource(new MySourceFunction[Row](data)).returns(rowType).setParallelism(1)
  28. }
  29. override def getReturnType: TypeInformation[Row] = rowType
  30. override def getTableSchema: TableSchema = schema
  31. }

(3) Sink 定義

我們簡(jiǎn)單的將計(jì)算結(jié)果寫(xiě)入到Apache Flink內(nèi)置支持的CSVSink中,定義Sink如下:

 
 
 
 
  1. def getCsvTableSink: TableSink[Row] = {
  2. val tempFile = File.createTempFile("csv_sink_", "tem")
  3. // 打印sink的文件路徑,方便我們查看運(yùn)行結(jié)果
  4. println("Sink path : " + tempFile)
  5. if (tempFile.exists()) {
  6. tempFile.delete
    網(wǎng)頁(yè)名稱(chēng):ApacheFlink漫談系列(13)-TableAPI概述
    網(wǎng)址分享:http://www.dlmjj.cn/article/ccoscog.html