日本综合一区二区|亚洲中文天堂综合|日韩欧美自拍一区|男女精品天堂一区|欧美自拍第6页亚洲成人精品一区|亚洲黄色天堂一区二区成人|超碰91偷拍第一页|日韩av夜夜嗨中文字幕|久久蜜综合视频官网|精美人妻一区二区三区

RELATEED CONSULTING
相關咨詢
選擇下列產品馬上在線溝通
服務時間:8:30-17:00
你可能遇到了下面的問題
關閉右側工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
基于Flink的典型ETL場景是怎么實現(xiàn)

本篇文章為大家展示了基于Flink的典型ETL場景是怎么實現(xiàn),內容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細介紹希望你能有所收獲。

目前累計服務客戶上千余家,積累了豐富的產品開發(fā)及服務經(jīng)驗。以網(wǎng)站設計水平和技術實力,樹立企業(yè)形象,為客戶提供做網(wǎng)站、網(wǎng)站制作、網(wǎng)站策劃、網(wǎng)頁設計、網(wǎng)絡營銷、VI設計、網(wǎng)站改版、漏洞修補等服務。創(chuàng)新互聯(lián)公司始終以務實、誠信為根本,不斷創(chuàng)新和提高建站品質,通過對領先技術的掌握、對創(chuàng)意設計的研究、對客戶形象的視覺傳遞、對應用系統(tǒng)的結合,為客戶提供更好的一站式互聯(lián)網(wǎng)解決方案,攜手廣大客戶,共同發(fā)展進步。

下面將從數(shù)倉誕生的背景、數(shù)倉架構、離線與實時數(shù)倉的對比著手,綜述數(shù)倉發(fā)展演進,然后分享基于 Flink 實現(xiàn)典型 ETL 場景的幾個方案。

1.實時數(shù)倉的相關概述

1.1 實時數(shù)倉產生背景

我們先來回顧一下數(shù)據(jù)倉庫的概念。

基于Flink的典型ETL場景是怎么實現(xiàn)

數(shù)據(jù)倉庫的概念是于90年代由 Bill Inmon 提出, 當時的背景是傳統(tǒng)的 OLTP 數(shù)據(jù)庫無法很好的支持長周期分析決策場景,所以數(shù)據(jù)倉庫概念的4個核心點,我們要結合著 OLTP 數(shù)據(jù)庫當時的狀態(tài)來對比理解。

  1. 面向主題的:數(shù)據(jù)倉庫的數(shù)據(jù)組織方式與 OLTP 面向事務處理不同。因為數(shù)據(jù)倉庫是面向分析決策的,所以數(shù)據(jù)經(jīng)常按分析場景或者是分析對象等主題形式來組織。

  2. 集成的:對于數(shù)據(jù)倉庫來說,經(jīng)常需要去集合多個分散的、異構的數(shù)據(jù)源,做一些數(shù)據(jù)清洗等 ETL 處理,整合成一塊數(shù)據(jù)倉庫,OLTP 則不需要做類似的集成操作。

  3. 相對穩(wěn)定的:OLTP 數(shù)據(jù)庫一般都是面向業(yè)務的,它主要的作用是把當前的業(yè)務狀態(tài)精準的反映出來,所以 OLTP 數(shù)據(jù)庫需要支持大量的增、刪、改的操作。但是對于數(shù)據(jù)倉庫來說,只要是入倉存下來的數(shù)據(jù),一般使用場景都是查詢,因此數(shù)據(jù)是相對穩(wěn)定的。

  4. 反映歷史變化:數(shù)據(jù)倉庫是反映歷史變化的數(shù)據(jù)集合,可以理解成它會將歷史的一些數(shù)據(jù)的快照存下來。而對于 OLTP 數(shù)據(jù)庫來說,只要反映當時的最新的狀態(tài)就可以了。

以上這4個點是數(shù)據(jù)倉庫的一個核心的定義。我們也可以看出對于實時數(shù)據(jù)倉庫來說,傳統(tǒng)數(shù)據(jù)倉庫也就是離線數(shù)據(jù)倉庫中的一些定義會被弱化,比如說在反映歷史變化這一點。介紹完數(shù)據(jù)倉庫的基本概念,簡單說下數(shù)據(jù)倉庫建模這塊會用到一些經(jīng)典的建模方法,主要有范式建模、維度建模和 Data Vault。在互聯(lián)網(wǎng)大數(shù)據(jù)場景下,用的最多的是維度建模方法。

然后先看一下離線數(shù)倉的經(jīng)典架構。

這個數(shù)倉架構主要是偏向互聯(lián)網(wǎng)大數(shù)據(jù)的場景方案,由上圖可以看出有三個核心環(huán)節(jié)。

1.第一個環(huán)節(jié)是數(shù)據(jù)源部分,一般互聯(lián)網(wǎng)公司的數(shù)據(jù)源主要有兩類:

  • 第1類是通過在客戶端埋點上報,收集用戶的行為日志,以及一些后端日志的日志類型數(shù)據(jù)源。對于埋點行為日志來說,一般會經(jīng)過一個這樣的流程,首先數(shù)據(jù)會上報到 Nginx 然后經(jīng)過 Flume 收集,然后存儲到 Kafka 這樣的消息隊列,然后再由實時或者離線的一些拉取的任務,拉取到我們的離線數(shù)據(jù)倉庫 HDFS。

  • 第2類數(shù)據(jù)源是業(yè)務數(shù)據(jù)庫,而對于業(yè)務數(shù)據(jù)庫的話,一般會經(jīng)過 Canal 收集它的 binlog,然后也是收集到消息隊列中,最終再由 Camus 拉取到 HDFS。

這兩部分數(shù)據(jù)源最終都會落地到 HDFS 中的 ODS 層,也叫貼源數(shù)據(jù)層,這層數(shù)據(jù)和原始數(shù)據(jù)源是保持一致的。

2.第二個環(huán)節(jié)是離線數(shù)據(jù)倉庫,是圖中藍色的框展示的部分??梢钥吹剿且粋€分層的結構,其中的模型設計是依據(jù)維度建模思路。

  • 最底層是 ODS 層,這一層將數(shù)據(jù)保持無信息損失的存放在 HDFS,基本保持原始的日志數(shù)據(jù)不變。

  • 在 ODS 層之上,一般會進行統(tǒng)一的數(shù)據(jù)清洗、歸一,就得到了 DWD 明細數(shù)據(jù)層。這一層也包含統(tǒng)一的維度數(shù)據(jù)。

  • 然后基于 DWD 明細數(shù)據(jù)層,我們會按照一些分析場景、分析實體等去組織我們的數(shù)據(jù),組織成一些分主題的匯總數(shù)據(jù)層 DWS。

  • 在 DWS 之上,我們會面向應用場景去做一些更貼近應用的 APP 應用數(shù)據(jù)層,這些數(shù)據(jù)應該是高度匯總的,并且能夠直接導入到我們的應用服務去使用。

在中間的離線數(shù)據(jù)倉庫的生產環(huán)節(jié),一般都是采用一些離線生產的架構引擎,比如說 MapReduce、Hive、Spark 等等,數(shù)據(jù)一般是存在 HDFS 上。

3.經(jīng)過前兩個環(huán)節(jié)后,我們的一些應用層的數(shù)據(jù)會存儲到數(shù)據(jù)服務里,比如說 HBase 、redis、Kylin 這樣的一些 KV 的存儲。并且會針對存在這些數(shù)據(jù)存儲上的一些數(shù)據(jù),封裝對應的服務接口,對外提供服務。在最外層我們會去產出一些面向業(yè)務的報表、面向分析的數(shù)據(jù)產品,以及會支持線上的一些業(yè)務產品等等。這一層的話,稱之為更貼近業(yè)務端的數(shù)據(jù)應用部分。

以上是一個基本的離線數(shù)倉經(jīng)典架構的介紹。

大家都了解到現(xiàn)在隨著移動設備的普及,我們逐漸的由制造業(yè)時代過渡到了互聯(lián)網(wǎng)時代。在制造業(yè)的時代,傳統(tǒng)的數(shù)倉,主要是為了去支持以前的一些傳統(tǒng)行業(yè)的企業(yè)的業(yè)務決策者、管理者,去做一些業(yè)務決策。那個時代的業(yè)務決策周期是比較長的,同時當時的數(shù)據(jù)量較小,Oracle、DB2 這一類數(shù)據(jù)庫就已經(jīng)足夠存了。

但隨著分布式計算技術的發(fā)展、智能化技術發(fā)展、以及整體算力的提升、互聯(lián)網(wǎng)的發(fā)展等等因素,我們現(xiàn)在在互聯(lián)網(wǎng)上收集的數(shù)據(jù)量,已經(jīng)呈指數(shù)級的增長。并且業(yè)務不再只依賴人做決策,做決策的主體很大部分已轉變?yōu)橛嬎銠C算法,比如一些智能推薦場景等等。所以這個時候決策的周期,就由原來的天級要求提升到秒級,決策時間是非常短的。在場景上的話,也會面對更多的需要實時數(shù)據(jù)處理的場景,例如實時的個性化推薦、廣告的場景、甚至一些傳統(tǒng)企業(yè)已經(jīng)開始實時監(jiān)控加工的產品是否有質量問題,以及金融行業(yè)重度依賴的反作弊等等。因此在這樣的一個背景下,實時數(shù)倉就必須被提出來了。

1.2 實時數(shù)倉架構

首先跟大家介紹一下實時數(shù)倉經(jīng)典架構 - Lambda 架構:

基于Flink的典型ETL場景是怎么實現(xiàn)

這個架構是 Storm 的作者提出來的,其實 Lambda 架構的主要思路是在原來離線數(shù)倉架構的基礎上疊加上實時數(shù)倉的部分,然后將離線的存量數(shù)據(jù)與我們 t+0 的實時的數(shù)據(jù)做一個 merge,就可以產生數(shù)據(jù)狀態(tài)實時更新的結果。

  • 和上述1.1離線數(shù)據(jù)倉庫架構圖比較可以明顯的看到,實時數(shù)倉增加的部分是上圖黃色的這塊區(qū)域。我們一般會把實時數(shù)倉數(shù)據(jù)放在 Kafka 這樣的消息隊列上,也會有維度建模的一些分層,但是在匯總數(shù)據(jù)的部分,我們不會將 APP 層的一些數(shù)據(jù)放在實時數(shù)倉,而是更多的會移到數(shù)據(jù)服務側去做一些計算。

  • 然后在實時計算的部分,我們經(jīng)常會使用 Flink、Spark-streaming 和 Storm 這樣的計算引擎,時效性上,由原來的天級、小時級可以提升到秒級、分鐘級。

大家也可以看到這個架構圖中,中間數(shù)據(jù)倉庫環(huán)節(jié)有兩個部分,一個是離線的數(shù)據(jù)倉庫,一個是實時的數(shù)據(jù)倉庫。我們必須要運維兩套(實時計算和離線計算)引擎,并且在代碼層面,我們也需要去實現(xiàn)實時和離線的業(yè)務代碼。然后在合并的時候,我們需要保證實施和離線的數(shù)據(jù)一致性,所以但凡我們的代碼做變更,我們也需要去做大量的這種實時離線數(shù)據(jù)的對比和校驗。其實這對于不管是資源還是運維成本來說都是比較高的。這是 Lamda 架構上比較明顯和突出的一個問題。因此就產生了 Kappa 結構。

Kappa 架構的一個主要的思路就是在數(shù)倉部分移除了離線數(shù)倉,數(shù)倉的生產全部采用實時數(shù)倉。從上圖可以看到剛才中間的部分,離線數(shù)倉模塊已經(jīng)沒有了。

關于 Kappa 架構,熟悉實時數(shù)倉生產的同學,可能會有一個疑問。因為我們經(jīng)常會面臨業(yè)務變更,所以很多業(yè)務邏輯是需要去迭代的。之前產出的一些數(shù)據(jù),如果口徑變更了,就需要重算,甚至重刷歷史數(shù)據(jù)。對于實時數(shù)倉來說,怎么去解決數(shù)據(jù)重算問題?

Kappa 架構在這一塊的思路是:首先要準備好一個能夠存儲歷史數(shù)據(jù)的消息隊列,比如 Kafka,并且這個消息對列是可以支持你從某個歷史的節(jié)點重新開始消費的。 接著需要新起一個任務,從原來比較早的一個時間節(jié)點去消費 Kafka 上的數(shù)據(jù),然后當這個新的任務運行的進度已經(jīng)能夠和現(xiàn)在的正在跑的任務齊平的時候,你就可以把現(xiàn)在任務的下游切換到新的任務上面,舊的任務就可以停掉,并且原來產出的結果表也可以被刪掉。

隨著我們現(xiàn)在實時 OLAP 技術的一些提升,有一個新的實時架構被提了出來,這里暫且稱為實時 OLAP 變體。

基于Flink的典型ETL場景是怎么實現(xiàn)

這個思路是把大量的聚合、分析、計算由實時 OLAP 引擎來承擔。在實時數(shù)倉計算的部分,我們不需要做的特別重,尤其是聚合相關的一些邏輯,然后這樣就可以保障我們在數(shù)據(jù)應用層能靈活的面對各種業(yè)務分析的需求變更,整個架構更加靈活。

最后我們來整體對比一下,實時數(shù)倉的這幾種架構

這是整體三個關于實時數(shù)倉架構的一個對比:

  • 從計算引擎角度:Lamda 架構它需要去維護批流兩套計算引擎,Kappa 架構和實時 OLAP 變體只需要維護流計算引擎就好了。

  • 開發(fā)成本:對 Lamda 架構來說,因為它需要維護實時離線兩套代碼,所以它的開發(fā)成本會高一些。 Kappa 架構和實時 OLAP 變體只用維護一套代碼就可以了。

  • 分析靈活性:實時 OLAP 變體是相對最靈活的。

  • 在實時 OLAP 引擎依賴上:實時 OLAP 變體是強依賴實時 OLAP 變體引擎的能力的,前兩者則不強依賴。

  • 計算資源:Lamda 架構需要批流兩套計算資源,Kappa 架構只需要流計算資源,實時 OLAP 變體需要額外的 OLAP 資源。

  • 邏輯變更重算:Lamda 架構是通過批處理來重算的,Kappa 架構需要按照前面介紹的方式去重新消費消息隊列重算,實時 OLAP 變體也需要重新消費消息隊列,并且這個數(shù)據(jù)還要重新導入到 OLAP 引擎里,去做計算。

1.3 傳統(tǒng)數(shù)倉 vs 實時數(shù)倉

然后我們來看一下傳統(tǒng)數(shù)倉和實時數(shù)倉整體的差異。

基于Flink的典型ETL場景是怎么實現(xiàn)

  1. 首先從時效性來看:離線數(shù)倉是支持小時級和天級的,實時數(shù)倉到秒級分鐘級,所以實時數(shù)倉時效性是非常高的。

  2. 數(shù)據(jù)存儲方式來看:離線數(shù)倉它需要存在HDFS和RDS上面,實時數(shù)倉一般是存在消息隊列,還有一些kv存儲,像維度數(shù)據(jù)的話會更多的存在kv存儲上。

  3. 生產加工過程方面,離線數(shù)倉需要依賴離線計算引擎以及離線的調度。 但對于實時數(shù)倉來說,主要是依賴實時計算引擎。

2.基于 Flink 實現(xiàn)典型的 ETL 場景

這里我們主要介紹兩大實時 ETL 場景:維表 join 和雙流 join。

  • 維表 join

    • 預加載維表

    • 熱存儲關聯(lián)

    • 廣播維表

    • Temporal table function join

  • 雙流 join

    • 離線join vs. 實時join

    • Regular join

    • Interval join

    • Window join

2.1 維表 join

2.1.1 預加載維表

方案1:

將維表全量預加載到內存里去做關聯(lián),具體的實現(xiàn)方式就是我們定義一個類,去實現(xiàn) RichFlatMapFunction,然后在 open 函數(shù)中讀取維度數(shù)據(jù)庫,再將數(shù)據(jù)全量的加載到內存,然后在 probe 流上使用算子 ,運行時與內存維度數(shù)據(jù)做關聯(lián)。

這個方案的優(yōu)點就是實現(xiàn)起來比較簡單,缺點也比較明顯,因為我們要把每個維度數(shù)據(jù)都加載到內存里面,所以它只支持少量的維度數(shù)據(jù)。同時如果我們要去更新維表的話,還需要重啟作業(yè),所以它在維度數(shù)據(jù)的更新方面代價是有點高的,而且會造成一段時間的延遲。對于預加載維表來說,它適用的場景就是小維表,變更頻率訴求不是很高,且對于變更的及時性的要求也比較低的這種場景。

這里定義了一個 DimFlatMapFunction 來實現(xiàn) RichFlatMapFunction。其中有一個 Map 類型的 dim,其實就是為了之后在讀取 DB 的維度數(shù)據(jù)以后,可以用于存放我們的維度數(shù)據(jù),然后在 open 函數(shù)里面我們需要去連接我們的 DB,進而獲取 DB 里的數(shù)據(jù)。然后在下面代碼可以看到我們的場景是從一個商品表里面去取出商品的 ID、商品的名字。然后我們在獲取到 DB 里面的維度數(shù)據(jù)以后會把它存放到 dim 里面。

接下來在 flatMap 函數(shù)里面我們就會使用到 dim,我們在獲取了 probe 流的數(shù)據(jù)以后,我們會去 dim 里面比較。是否含有同樣的商品 ID 的數(shù)據(jù),如果有的話就把相關的商品名稱 append 到數(shù)據(jù)元組,然后做一個輸出。這就是一個基本的流程。

其實這是一個基本最初版的方案實現(xiàn)。但這個方案也有一個改進的方式,就是在 open 函數(shù)里面,可以新建一個線程,定時的去加載維表。這樣就不需要人工的去重啟 job 來讓維度數(shù)據(jù)做更新,可以實現(xiàn)一個周期性的維度數(shù)據(jù)的更新。

方案2:

通過 Distributed cash 的機制去分發(fā)本地的維度文件到task manager后再加載到內存做關聯(lián)。實現(xiàn)方式可以分為三步:

  • 第1步是通過 env.registerCached 注冊文件。

  • 第2步是實現(xiàn) RichFunction,在 open 函數(shù)里面通過 RuntimeContext 來獲取 cache 文件。

  • 第3步是解析和使用這部分文件數(shù)據(jù)。

這種方式的一個優(yōu)點是你不需要去準備或者依賴外部數(shù)據(jù)庫,缺點就是因為數(shù)據(jù)也是要加載到內存中,所以支持的維表數(shù)據(jù)量也是比較小。而且如果這個維度數(shù)據(jù)需要做更新,也需要重啟作業(yè)。 因此在正規(guī)的生產過程中不太建議使用這個方案,因為其實從數(shù)倉角度,希望所有的數(shù)據(jù)都能夠通過 schema 化方式來管理。把數(shù)據(jù)放在文件里面去做這樣一個操作,不利于我們做整體數(shù)據(jù)的管理和規(guī)范化。所以這個方式的話,大家在做一些小的 demo 的時候,或者一些測試的時候可以去使用。

那么它適用的場景就是維度數(shù)據(jù)是文件形式的、數(shù)據(jù)量比較小、并且更新的頻率也比較低的一些場景,比如說我們讀一個靜態(tài)的碼表、配置文件等等。

2.1.2 熱存儲關聯(lián)

基于Flink的典型ETL場景是怎么實現(xiàn)

維表 join 里第二類大的實現(xiàn)思路是熱存儲關聯(lián)。具體是我們把維度數(shù)據(jù)導入到像 Redis、Tair、HBase 這樣的一些熱存儲中,然后通過異步 IO 去查詢,并且疊加使用 Cache 機制,還可以加一些淘汰的機制,最后將維度數(shù)據(jù)緩存在內存里,來減輕整體對熱存儲的訪問壓力。

如上圖展示的這樣的一個流程。在 Cache 這塊的話,比較推薦谷歌的 Guava Cache,它封裝了一些關于 Cache 的一些異步的交互,還有 Cache 淘汰的一些機制,用起來是比較方便的。

剛才的實驗方案里面有兩個重要點,一個就是我們需要用異步 IO 方式去訪問存儲,這里也跟大家一起再回顧一下同步 IO 與異步 IO 的區(qū)別:

  • 對于同步 IO 來說,發(fā)出一個請求以后,必須等待請求返回以后才能繼續(xù)去發(fā)新的 request。所以整體吞吐是比較小的。由于實時數(shù)據(jù)處理對于延遲特別關注,這種同步 IO 的方式,在很多場景是不太能夠接受的。

  • 異步 IO 就是可以并行發(fā)出多個請求,整個吞吐是比較高的,延遲會相對低很多。如果使用異步 IO 的話,它對于外部存儲的吞吐量上升以后,會使得外部存儲有比較大的壓力,有時也會成為我們整個數(shù)據(jù)處理上延遲的瓶頸。所以引入 Cache 機制是希望通過 Cache 來去減少我們對外部存儲的訪問量。

剛才提到的 Cuava Cache,它的使用是非常簡單的,大家可以去嘗試使用。對于熱存儲關聯(lián)方案來說,它的優(yōu)點就是維度數(shù)據(jù)因為不用全量加載在內存里,所以就不受限于內存大小,維度數(shù)據(jù)量可以更多。在美團點評的流量場景里面,我們的維度數(shù)據(jù)可以支持到 10 億量級。另一方面該方案的缺點也是比較明顯的,我們需要依賴熱存儲資源,而且維度的更新反饋到結果是有一定延遲的。 因為我們首先需要把數(shù)據(jù)導入到熱存儲,然后同時在 Cache 過期的時間上也會有損失。

總體來說這個方法適用的場景是維度數(shù)據(jù)量比較大,又能夠接受維度更新有一定延遲的情況。

2.1.3 廣播維表

第三個大的思路是廣播維表,主要是利用 broadcast State 將維度數(shù)據(jù)流廣播到下游 task 做 join。

實現(xiàn)方式:

  • 將維度數(shù)據(jù)發(fā)送到 Kafka 作為廣播原始流 S1

  • 定義狀態(tài)描述符 MapStateDescriptor。調用 S1.broadcast(),獲得 broadCastStream S2

  • 調用非廣播流 S3.connect(S2),得到 BroadcastConnectedStream S4

  • 在 KeyedBroadcastProcessFunction/BroadcastProcessFunction 實現(xiàn)關聯(lián)處理邏輯,并作為參數(shù)調用 S4.process()

這個方案,它的優(yōu)點是維度的變更可以及時的更新到結果。然后缺點就是數(shù)據(jù)還是需要保存在內存中,因為它是存在 state 里的,所以支持維表數(shù)據(jù)量仍然不是很大。適用的場景就是我們需要時時的去感知維度的變更,且維度數(shù)據(jù)又可以轉化為實時流。

下面是一個小的 demo:

基于Flink的典型ETL場景是怎么實現(xiàn)

我們這里面用到的廣播流 pageStream,它其實是定義了一個頁面 ID 和頁面的名稱。對于非廣播流 probeStream,它是一個 json 格式的 string,里面包含了設備 ID、頁面的 ID、還有時間戳,我們可以理解成用戶在設備上做 PV 訪問的行為記錄。

整個實現(xiàn)來看,就是遵循上述4個步驟:

  • 第1步驟是要定義廣播的狀態(tài)描述符。

  • 第2步驟我們這里去生成 broadCastStream。

  • 第3步驟的話我們就需要去把兩個 stream 做 connect。

  • 第4步最主要的一個環(huán)節(jié)就是需要實現(xiàn) BroadcastProcessFunction。第1個參數(shù)是我們的 probeStream,第2個參數(shù)是廣播流的數(shù)據(jù),第3個參數(shù)就是我們的要輸出的數(shù)據(jù),可以看到主要的數(shù)據(jù)處理邏輯是在processElement里面。

在數(shù)據(jù)處理過程中,我們首先通過 context 來獲取我們的 broadcastStateDesc,然后解析 probe 流的數(shù)據(jù),最終獲取到對應的一個 pageid。接著就在我們剛才拿到了 state 里面去查詢是否有同樣的 pageid,如果能夠找到對應的 pageid 話,就把對應的 pagename 添加到我們整個 json stream 去做輸出。

2.1.4 Temporal table function join

介紹完了上面的方法以后,還有一種比較重要的方法是用 Temporal table function join。首先說明一下什么是 Temporal table?它其實是一個概念:就是能夠返回持續(xù)變化表的某一時刻數(shù)據(jù)內容的視圖,持續(xù)變化表也就是 changingtable,可以是一個實時的 changelog 的數(shù)據(jù),也可以是放在外部存儲上的一個物化的維表。

它的實現(xiàn)是通過 UDTF 去做 probe 流和 Temporal table 的 join,稱之 Temporal table function join。這種 join 的方式,它適用的場景是維度數(shù)據(jù)為 changelog 流的形式,而且我們有需要按時間版本去關聯(lián)的訴求。

首先來看一個例子,這里使用的是官網(wǎng)關于匯率和貨幣交易的一個例子。對于我們的維度數(shù)據(jù)來說,也就是剛剛提到的 changelog stream,它是 RateHistory。它反映的是不同的貨幣相對于日元來說,不同時刻的匯率。

第1個字段是時間,第2個字段是 currency 貨幣。第3個字段是相對日元的匯率,然后在我們的 probe table 來看的話,它定義的是購買不同貨幣的訂單的情況。比如說在 10:15 購買了兩歐元,該表記錄的是貨幣交易的一個情況。在這個例子里面,我們要求的是購買貨幣的總的日元交易額,如何通 Temporal table function join 來去實現(xiàn)我們這個目標呢?

  • 第1步首先我們要在 changelog 流上面去定義 TemporalTableFunction,這里面有兩個關鍵的參數(shù)是必要的。第1個參數(shù)就是能夠幫我們去識別版本信息的一個 time attribute,第2個參數(shù)是需要去做關聯(lián)的組件,這里的話我們選擇的是 currency。

  • 接著的話我們在 tableEnv 里面去注冊 TemporalTableFunction 的名字。

然后我們來看一下我們注冊的 TemporalTableFunction,它能夠起到什么樣的效果。

基于Flink的典型ETL場景是怎么實現(xiàn)

比如說如果我們使用 rates 函數(shù),去獲取11:50的狀態(tài)。可以看到對于美元來說,它在11:50的狀態(tài)其實落在11:49~11:56這個區(qū)間的,所以選取的是99。然后對于歐元來說,11:50的時刻是落在11:15和12:10之間的,所以我們會選取119這樣的一條數(shù)據(jù)。它其實實現(xiàn)的是我們在一剛開始定義的 TemporalTable 的概念,能夠獲取到 changelog 某一時刻有效數(shù)據(jù)。定義好 TemporalTableFunction 以后,我們就要需要使用這個 Function,具體實現(xiàn)業(yè)務邏輯。

大家注意這里需要去指定我們具體需要用到的 join key。比如說因為兩個流都是在一直持續(xù)更新的,對于我們的 order table 里面 11:00 的這一條記錄來說,關聯(lián)到的就是歐元在 10:45 這一條狀態(tài),然后它是 116,所以最后的結果就是 232。

剛才介紹的就是 Temporal table function join 的用法。

2.1.5 維表 join 的對比

然后來整體回顧一下在維表 join 這塊,各個維度 join 的一些差異,便于我們更好的去理解各個方法適用的場景。

基于Flink的典型ETL場景是怎么實現(xiàn)

  • 在實現(xiàn)復雜度上面的:除了熱存儲關聯(lián)稍微復雜一些,其它的實現(xiàn)方式基本上復雜度是比較低的。

  • 在維表數(shù)據(jù)量上:熱存儲關聯(lián)和 Temporal table function join 兩種方式可以支持比較多的數(shù)據(jù)量。其它的方式因為都要把維表加載到內存,所以就受限內存的大小。

  • 在維表更新頻率上面:因為預加載 DB 數(shù)據(jù)到內存和 Distributed Cache 在重新更新維表數(shù)據(jù)的時候都需要重啟,所以它們不適合維表需要經(jīng)常變更的場景。而對于廣播維表和 Temporal table function join 來說,可以實時的更新維表數(shù)據(jù)并反映到結果,所以它們可以支持維表頻繁更新的場景。

  • 對維表更新實時性來說:在廣播維表和 Temporal table function join,它們可以達到比較快的實時更新的效果。熱存儲關聯(lián)在大部分場景也是可以滿足業(yè)務需求的。

  • 在維表形式上面:可以看到第1種方式主要是支持訪問 DB 存儲少量數(shù)據(jù)的形式,Distributed Cache 支持文件的形式,熱存儲關聯(lián)需要訪問 HBase 和 Tair 等等這種熱存儲。廣播維表和 Temporal table function join 都需要維度數(shù)據(jù)能轉化成實時流的形式。

  • 在外部存儲上面:第1種方式和熱存儲關聯(lián)都是需要依賴外部存儲的。

在維表 join 這一塊,我們就先介紹這幾個基本方法。可能有的同學還有一些其他方案,之后可以反饋交流,這里主要提了一些比較常用的方案,但并不限于這些方案。

2.2 雙流 join

首先我們來回顧一下,批處理是怎么去處理兩個表 join的?一般批處理引擎實現(xiàn)的時候,會采用兩個思路。

一個是基于排序的 Sort-Merge join。另外一個是轉化為 Hash table 加載到內存里做 Hash join。這兩個思路對于雙流 join 的場景是否還同樣適用?在雙流 join 場景里面要處理的對象不再是這種批數(shù)據(jù)、有限的數(shù)據(jù),而是是無窮數(shù)據(jù)集,對于無窮數(shù)據(jù)集來說,我們沒有辦法排序以后再做處理,同樣也沒有辦法把無窮數(shù)據(jù)集全部轉成 Cache 加載到內存去做處理。 所以這兩種方式基本是不能夠適用的。同時在雙流 join 場景里面,我們的 join 對象是兩個流,數(shù)據(jù)也是不斷在進入的,所以我們 join 的結果也是需要持續(xù)更新的。

那么我們應該有什么樣的方案去解決雙流 join 的實現(xiàn)問題?Flink 的一個基本的思路是將兩個流的數(shù)據(jù)持續(xù)性的存到 state 中,然后使用。因為需要不斷的去更新 join 的結果,之前的數(shù)據(jù)理論上如果沒有任何附加條件的話是不能丟棄的。但是從實現(xiàn)上來說 state 又不能永久的保存所有的數(shù)據(jù),所以需要通過一些方式將 join 的這種全局范圍局部化,就是說把一個無限的數(shù)據(jù)流,盡可能給它拆分切分成一段一段的有線數(shù)據(jù)集去做 join。

其實基本就是這樣一個大的思路,接下來去看一下具體的實現(xiàn)方式。

2.2.1 離線 join vs. 實時 join

接下來我們以 inner join 為例看一下,

左流是黑色標出來的這一條,右流是藍色標出來的,這條兩流需要做 inner join。首先左流和右流在元素進入以后,需要把相關的元素存儲到對應的 state 上面。除了存儲到 state 上面以外,左流的數(shù)據(jù)元素到來以后需要去和右邊的 Right State 去做比較看能不能匹配到。同樣右邊的流元素到了以后,也需要和左邊的 Left State 去做比較看是否能夠 match,能夠 match 的話就會作為 inner join 的結果輸出。這個圖是比較粗的展示出來一個 inner join 的大概細節(jié)。也是讓大家大概的體會雙流 join 的實現(xiàn)思路。

2.2.2 Regular join

我們首先來看一下第1類雙流 join 的方式,Regular join。這種 join 方式需要去保留兩個流的狀態(tài),持續(xù)性地保留并且不會去做清除。兩邊的數(shù)據(jù)對于對方的流都是所有可見的,所以數(shù)據(jù)就需要持續(xù)性的存在state里面,那么 state 又不能存的過大,因此這個場景的只適合有界數(shù)據(jù)流。它的語法可以看一下,比較像離線批處理的 SQL:

基于Flink的典型ETL場景是怎么實現(xiàn)

在上圖頁面里面是現(xiàn)在 Flink 支持 Regular join 的一些寫法,可以看到和我們普通的 SQL 基本是一致的。

2.2.3 Interval join

在雙流 join 里面 Flink支持的第2類 join 就是 Interval join 也叫區(qū)間 join。它是什么意思呢?就是加入了一個時間窗口的限定,要求在兩個流做 join 的時候,其中一個流必須落在另一個流的時間戳的一定時間范圍內,并且它們的 join key 相同才能夠完成 join。加入了時間窗口的限定,就使得我們可以對超出時間范圍的數(shù)據(jù)做一個清理,這樣的話就不需要去保留全量的 State。

Interval join 是同時支持 processing time 和 even time去定義時間的。如果使用的是 processing time,F(xiàn)link 內部會使用系統(tǒng)時間去劃分窗口,并且去做相關的 state 清理。如果使用 even time 就會利用 Watermark 的機制去劃分窗口,并且做 State 清理。

Flink 的作者之前有一個內容非常直觀的分享,這里就直接復用了他這部分的一個示例:

基于Flink的典型ETL場景是怎么實現(xiàn)

我們可以看到對于 Interval join 來說:它定義一個時間的下限,就可以使得我們對于在時間下限之外的數(shù)據(jù)做清理。比如在剛才的 SQL 里面,其實我們就限定了 join 條件是 ordertime 必須要大于 shiptime 減去4個小時。 對于 Shipments 流來說,如果接收到12:00 點的 Watermark,就意味著對于 Orders 流的數(shù)據(jù)小于 8:00 點之前的數(shù)據(jù)時間戳就可以去做丟棄,不再保留在 state 里面了。

同時對于 shiptime 來說,其實它也設定了一個時間的下限,就是它必須要大于 ordertime。對于 Orders 流來說如果接收到了一個10:15點的 Watermark, 那么 Shipments 的 state 10:15 之前的數(shù)據(jù)就可以拋棄掉。 所以 Interval join 使得我們可以對于一部分歷史的 state 去做清理。

2.2.4 Window join

最后來說一下雙流 join 的第3種 Window join:它的概念是將兩個流中有相同 key 和處在相同 window 里的元素去做 join。它的執(zhí)行的邏輯比較像 Inner join,必須同時滿足 join key 相同,而且在同一個 window 里元素才能夠在最終結果中輸出。具體使用的方式是這樣的:

基于Flink的典型ETL場景是怎么實現(xiàn)

目前 Window join 只支持 Datastream 的 API,所以這里使用方式也是 Datastream 的一個形式。可以看到我們首先把兩流去做 join,然后在 where 和 equalTo 里面去定義 join key 的條件,然后在 window 中需要去指定 window 劃分的方式 WindowAssigner,最后要去定義 JoinFunction 或者是 FlatJoinFunction,來實現(xiàn)我們匹配元素的具體處理邏輯。

因為 window 其實劃分為三類,所以我們的 Window join 這里也會分為三類:

  • 第1類 Tumbling Window join:它是按照時間區(qū)間去做劃分的 window。

可以看到這個圖里面是兩個流(綠色的流年和黃色的流)。在這個例子里我們定義的是一個兩毫秒的窗口,每一個圈是我們每個流上一個單個元素,上面的時間戳代表元素對應的時間,所以我們可以看到它是按照兩毫秒的間隔去做劃分的,window 和 window 之間是不會重疊的。 對于第1個窗口我們可以看到綠色的流有兩個元素符合,然后黃色流也有兩個元素符合,它們會以 pair 的方式組合,最后輸入到 JoinFunction 或者是 FlatJoinFunction 里面去做具體的處理。

  • 第2類 window 是 Sliding Window Join:這里用的是 Sliding Window。

基于Flink的典型ETL場景是怎么實現(xiàn)

sliding window 是首先定義一個窗口大小,然后再定義一個滑動時間窗的大小。如果滑動時間窗的大小小于定義的窗口大小,窗口和窗口之間會存在重疊的情況。就像這個圖里顯示出來的,紅色的窗口和黃色窗口是有重疊的,其中綠色流的0元素同時處于紅色的窗口和黃色窗口,說明一個元素是可以同時處于兩個窗口的。然后在具體的 Sliding Window Join 的時候,可以看到對于紅色的窗口來說有兩個元素,綠色0和黃色的0,它們兩個元素是符合 window join 條件的,于是它們會組成一個0,0的 pair。 然后對于黃色的窗口符合條件的是綠色的0與黃色0和1兩位數(shù),它們會去組合成0,1、0,0和1,0兩個pair,最后會進入到我們定義的 JoinFunction 里面去做處理。

  • 第3類是 SessionWindow join:這里面用到的 window 是 session window。

session window 是定義一個時間間隔,如果一個流在這個時間間隔內沒有元素到達的話,那么它就會重新開一個新的窗口。在上圖里面我們可以看到窗口和窗口之間是不會重疊的。我們這里定義的Gap是1,對于第1個窗口來說,可以看到有綠色的0元素和黃色的1、2元素都是在同一個窗口內,所以它會組成在1 ,0和2,0這樣的一個pair。剩余的也是類似,符合條件的pair都會進入到最后 JoinFunction 里面去做處理。

上述內容就是基于Flink的典型ETL場景是怎么實現(xiàn),你們學到知識或技能了嗎?如果還想學到更多技能或者豐富自己的知識儲備,歡迎關注創(chuàng)新互聯(lián)行業(yè)資訊頻道。


標題名稱:基于Flink的典型ETL場景是怎么實現(xiàn)
文章分享:http://www.dlmjj.cn/article/pcescj.html