日本综合一区二区|亚洲中文天堂综合|日韩欧美自拍一区|男女精品天堂一区|欧美自拍第6页亚洲成人精品一区|亚洲黄色天堂一区二区成人|超碰91偷拍第一页|日韩av夜夜嗨中文字幕|久久蜜综合视频官网|精美人妻一区二区三区

RELATEED CONSULTING
相關咨詢
選擇下列產(chǎn)品馬上在線溝通
服務時間:8:30-17:00
你可能遇到了下面的問題
關閉右側工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
自適應批作業(yè)調(diào)度器:為Flink批作業(yè)自動推導并行度

?01引言

對大部分用戶來說,為 Flink 算子配置合適的并行度并不是一件容易的事。對于批作業(yè),小的并行度會導致作業(yè)運行時間長,故障恢復慢,而不必要的大并行度會導致資源浪費,任務部署和數(shù)據(jù) shuffle 開銷也會變大。

成都創(chuàng)新互聯(lián)公司長期為近1000家客戶提供的網(wǎng)站建設服務,團隊從業(yè)經(jīng)驗10年,關注不同地域、不同群體,并針對不同對象提供差異化的產(chǎn)品和服務;打造開放共贏平臺,與合作伙伴共同營造健康的互聯(lián)網(wǎng)生態(tài)環(huán)境。為蛟河企業(yè)提供專業(yè)的網(wǎng)站制作、做網(wǎng)站,蛟河網(wǎng)站改版等技術服務。擁有10多年豐富建站經(jīng)驗和眾多成功案例,為您定制開發(fā)。

為了控制批作業(yè)的執(zhí)行時長,算子的并行度應該和其需要處理的數(shù)據(jù)量成正比。用戶需要通過預估算子需要處理的數(shù)據(jù)量來配置并行度。但準確預估算子需要處理的數(shù)據(jù)量是一件很困難的事情:需要處理的數(shù)據(jù)量可能每天都在變化,作業(yè)中可能會存在大量的 UDF 和復雜算子導致難以判斷其產(chǎn)出的數(shù)據(jù)量。

為了解決這個問題,我們在 Flink 1.15 中引入了一種新的調(diào)度器:自適應批作業(yè)調(diào)度器(Adaptive Batch Scheduler)。自適應批作業(yè)調(diào)度器會在作業(yè)運行時根據(jù)每個算子需要處理的實際數(shù)據(jù)量來自動推導并行度。它會帶來以下好處:

  1. 大大降低批處理作業(yè)并發(fā)度調(diào)優(yōu)的繁瑣程度;
  2. 可以根據(jù)處理的數(shù)據(jù)量為不同的算子配置不同的并行度,這對于之前只能配置全局并行度的 SQL 作業(yè)尤其有益;
  3. 可以更好的適應每日變化的數(shù)據(jù)量。

02用法

使 Flink 自動推導算子的并行度,需要進行以下配置:

  1. 啟用自適應批作業(yè)調(diào)度器;
  2. 配置算子的并行度為 -1。

2.1 啟用自適應批作業(yè)調(diào)度器

啟用自適應批作業(yè)調(diào)度器,需要進行以下配置:

  1. 配置 jobmanager.scheduler: AdaptiveBatch;
  2. 將 execution.batch-shuffle-mode 配置為 ALL-EXCHANGES-BLOCKING (默認值)。因為目前自適應批作業(yè)調(diào)度器只支持 shuffle mode 為 ALL-EXCHANGES-BLOCKING 的作業(yè)。

此外,還有一些相關配置來指定自動推導的算子并行度的上下限、預期每個算子處理的數(shù)據(jù)量以及 source 算子的默認并行度,詳情請參閱 Flink 文檔 [1]。

2.2 配置算子的并行度為 -1

自適應批作業(yè)調(diào)度器只會為用戶未指定并行度的算子(即并行度為默認值 -1)推導并行度。所以需要進行以下配置:

  1. 配置 parallelism.default: -1;
  2. 對于 SQL 作業(yè),需要配置 table.exec.resource.default-parallelism: -1;
  3. 對于 DataStream/DataSet 作業(yè),避免在作業(yè)中通過算子的 setParallelism() 方法來指定并行度;
  4. 對于 DataStream/DataSet 作業(yè),避免在作業(yè)中通過 StreamExecutionEnvironment/ExecutionEnvironment 的 setParallelism() 方法來指定并行度。

03實現(xiàn)細節(jié)

接下來我們將介紹自適應批作業(yè)調(diào)度器的實現(xiàn)細節(jié)。在此之前,我們簡要介紹一下涉及到的一些術語概念:

  1. 邏輯節(jié)點(JobVertex)[2] 和邏輯拓撲(JobGraph)[3]:邏輯節(jié)點是為了更優(yōu)的性能而將幾個算子鏈接到一起形成的算子鏈,邏輯拓撲則是多個邏輯節(jié)點連接組成的數(shù)據(jù)流圖。
  2. 執(zhí)行節(jié)點(ExecutionVertex)[4] 和執(zhí)行拓撲(ExecutionGraph)[5]:執(zhí)行節(jié)點對應一個可部署物理任務,是邏輯節(jié)點根據(jù)并行度進行展開生成的。例如,如果一個邏輯節(jié)點的并行度為 100,就會生成 100 個對應的執(zhí)行節(jié)點。執(zhí)行拓撲則是所有執(zhí)行節(jié)點連接組成的物理執(zhí)行圖。

以上概念的介紹可以參見 Flink 文檔 [6]。需要注意的是,自適應批作業(yè)調(diào)度器是通過推導邏輯節(jié)點的并行度來決定該節(jié)點包含的算子的并行度的。

實現(xiàn)細節(jié)主要包括以下幾部分:

  1. 使調(diào)度器能夠收集執(zhí)行節(jié)點產(chǎn)出數(shù)據(jù)的大?。?/li>
  2. 引入一個新組件 VertexParallelismDecider [7] 來負責根據(jù)邏輯節(jié)點需要處理的數(shù)據(jù)量計算其并行度;
  3. 支持動態(tài)構建執(zhí)行拓撲,即執(zhí)行拓撲從一個空的執(zhí)行拓撲開始,然后隨著作業(yè)調(diào)度逐漸添加執(zhí)行節(jié)點;
  4. 引入自適應批作業(yè)調(diào)度器來更新和調(diào)度執(zhí)行拓撲。

后續(xù)章節(jié)會對以上內(nèi)容進行詳細介紹。

圖 1 - 自動推導并行度的整體結構

3.1 收集執(zhí)行節(jié)點產(chǎn)出的數(shù)據(jù)量

自適應批作業(yè)調(diào)度器是根據(jù)邏輯節(jié)點需要處理的數(shù)據(jù)量來決定其并行度的,因此需要收集上游節(jié)點產(chǎn)出的數(shù)據(jù)量。為此,我們引入了一個 numBytesProduced 計數(shù)器來記錄每個執(zhí)行節(jié)點產(chǎn)出的數(shù)據(jù)分區(qū)(ResultPartition)的數(shù)據(jù)量,并在執(zhí)行節(jié)點運行完成時將累計值發(fā)送給調(diào)度器。

3.2 為邏輯節(jié)點決定合適的并行度

我們引入了一個新組件 VertexParallelismDecider 來負責為邏輯節(jié)點計算并行度。計算算法如下:

假設

  1. V 是用戶配置的期望每個執(zhí)行節(jié)點處理的數(shù)據(jù)量;
  2. totalBytenon-broadcast 是邏輯節(jié)點需要處理的非廣播數(shù)據(jù)的總量;
  3. totalBytesbroadcast 是邏輯節(jié)點需要處理的廣播數(shù)據(jù)的總量;
  4. maxBroadcastRatio 是每個執(zhí)行節(jié)點處理的廣播數(shù)據(jù)的比例上限;
  5. normalize(x) 是一個輸出與 x 最接近的 2 的冪的函數(shù)。

計算并行度的公式如下:

值得注意的是,我們在這個公式中引入了兩個特殊處理:

  1. 限制每個執(zhí)行節(jié)點處理的廣播數(shù)據(jù)的比例;
  2. 將并行度調(diào)整為 2 的冪。

此外,上述公式不能直接用來決定 source 節(jié)點的并行度,因為 source 節(jié)點不會消費數(shù)據(jù)。為了解決這個問題,我們引入了配置選項 jobmanager.adaptive-batch-scheduler.default-source-parallelism,允許用戶手動配置 source 節(jié)點的并行度。請注意,并非所有 source 都需要此選項,因為某些 source 可以自己推導并行度(例如,HiveTableSource,詳情請參閱 HiveParallelismInference),對于這些source,更推薦由它們自己推導并行度。

3.2.1 限制每個執(zhí)行節(jié)點處理的廣播數(shù)據(jù)的比例

我們在公式限制每個執(zhí)行節(jié)點處理的廣播數(shù)據(jù)上限比例為 maxBroadcastRatio。 即每個執(zhí)行節(jié)點處理的非廣播數(shù)據(jù)至少為 (1-maxBroadcastRatio) * V。如果不這樣做,當廣播數(shù)據(jù)的數(shù)據(jù)量接近 V 時,即使非廣播數(shù)據(jù)的量非常小,也可能會被計算出很大的并行度,這是不必要的,會導致資源浪費和任務部署的開銷變大。

通常情況下,一個執(zhí)行節(jié)點需要處理的廣播數(shù)據(jù)量會小于要處理的非廣播數(shù)據(jù)。 因此,我們將 maxBroadcastRatio 默認設置為 0.5。目前,這個值是硬編碼在代碼中的,我們后續(xù)會考慮將其改為可配置的。

3.2.2 將并行度調(diào)整為 2 的冪

normalize 函數(shù)會將并行度調(diào)整為最近的 2 的冪,這樣做是為了避免引入數(shù)據(jù)傾斜。為了更好的理解本節(jié),我們建議您先閱讀子分區(qū)動態(tài)映射部分。

以圖 4(b)為例,A1/A2 產(chǎn)生 4 個子分區(qū),B 最終被決定的并行度為 3。這種情況下,B1 將消費 1 個子分區(qū),B2 將消費 1 個子分區(qū),B3 將消費 2 個子分區(qū)。我們假設不同子分區(qū)的數(shù)據(jù)量都相同,這樣 B3 需要消費的數(shù)據(jù)量是 B1/B2 的 2 倍,從而導致了數(shù)據(jù)傾斜。

為了解決這個問題,我們需要讓所有下游執(zhí)行節(jié)點消費的子分區(qū)數(shù)量都一樣,也就是說上游產(chǎn)出的子分區(qū)數(shù)量應該是下游邏輯節(jié)點并行度的整數(shù)倍。為簡單起見,我們希望用戶指定的最大并行度為 2^N(如果不是則會被自動調(diào)整到不超過配置值的 2^N),然后將下游邏輯節(jié)點的并行度調(diào)整到最接近的 2^M(M <= N),這樣就可以保證子分區(qū)被下游均勻消費。

不過這只是一個臨時的解決方案,最終應該通過自動負載均衡來解決,我們將在后續(xù)版本中實現(xiàn)。

3.3 動態(tài)構建執(zhí)行拓撲

在引入自適應批作業(yè)調(diào)度器之前,執(zhí)行拓撲是以靜態(tài)方式構建的,也就是在調(diào)度開始前執(zhí)行拓撲就被完全創(chuàng)建出來了。為了使邏輯節(jié)點并行度可以在運行時決定,執(zhí)行拓撲需要支持動態(tài)構建。

3.3.1 向執(zhí)行拓撲動態(tài)添加節(jié)點和邊

動態(tài)構建執(zhí)行拓撲是指一個 Flink 作業(yè)從一個空的執(zhí)行拓撲開始,然后隨著調(diào)度逐步附加執(zhí)行節(jié)點,如圖 2 所示。

執(zhí)行拓撲由執(zhí)行節(jié)點和執(zhí)行邊(ExecutionEdge)組成。只有在以下情況下,才會將邏輯節(jié)點展開創(chuàng)建執(zhí)行節(jié)點并將其添加到執(zhí)行拓撲:

  1. 對應邏輯節(jié)點的并行度已經(jīng)被確定(以便 Flink 知道應該創(chuàng)建多少個執(zhí)行節(jié)點);
  2. 所有上游邏輯節(jié)點都已經(jīng)被展開(以便 Flink 通過執(zhí)行邊將新創(chuàng)建的執(zhí)行節(jié)點和上游執(zhí)行節(jié)點連接起來)。

圖 2 - 動態(tài)構建執(zhí)行拓撲

3.3.2 子分區(qū)動態(tài)映射

在引入自適應批作業(yè)調(diào)度器之前,在部署執(zhí)行節(jié)點時,F(xiàn)link 需要知道其下游邏輯節(jié)點的并行度。因為下游邏輯節(jié)點的并行度決定了上游執(zhí)行節(jié)點需要產(chǎn)出的子分區(qū)數(shù)量。以圖 3 為例,下游 B 的并行度為 2,因此上游的 A1/A2 需要產(chǎn)生 2 個子分區(qū),索引為 0 的子分區(qū)被 B1 消費,索引為 1 的子分區(qū)被 B2 消費。

圖 3 - 靜態(tài)執(zhí)行拓撲消費子分區(qū)的方式

但顯然,這不適用于動態(tài)圖,因為當部署上游執(zhí)行節(jié)點時,下游邏輯節(jié)點的并行度可能尚未確定(即部署 A1/A2 時,B 的并行度還未確定)。為了解決這個問題,我們需要使上游執(zhí)行節(jié)點產(chǎn)生的子分區(qū)數(shù)量與下游邏輯節(jié)點的并行度解耦。

我們通過以下方法實現(xiàn)解耦:將上游執(zhí)行節(jié)點產(chǎn)生子分區(qū)的數(shù)量設置為下游邏輯節(jié)點的最大并行度(最大并行度是一個可配置的固定值),然后在下游邏輯節(jié)點并行度被確定后,將這些子分區(qū)均分給不同的下游執(zhí)行節(jié)點進行消費。也就是說,部署下游執(zhí)行節(jié)點時,每個下游執(zhí)行節(jié)點都會被分配到一個子分區(qū)范圍來消費。假設 N 是下游邏輯節(jié)點并行度,P 是子分區(qū)的數(shù)量。對于第 k 個下游執(zhí)行節(jié)點,消費的子分區(qū)范圍應該是:

以圖 4 為例,B 的最大并行度為 4,因此 A1/A2 有 4 個子分區(qū)。然后如果B的確定并行度為 2,則子分區(qū)映射將為圖 4(a),如果B的確定并行度為 3,則子分區(qū)映射將為圖 4(b)。

圖 4 - 動態(tài)執(zhí)行拓撲消費子分區(qū)的方式

3.4 動態(tài)更新并調(diào)度執(zhí)行拓撲

自適應批作業(yè)調(diào)度器調(diào)度作業(yè)的方式和默認調(diào)度器基本相同,唯一的區(qū)別是:自適應批作業(yè)調(diào)度器是從一個空的執(zhí)行拓撲開始調(diào)度,在處理任何調(diào)度事件之前,都會嘗試決定所有邏輯節(jié)點的并行度,然后嘗試為邏輯節(jié)點生成對應的執(zhí)行節(jié)點,并通過執(zhí)行邊連接上游節(jié)點,更新執(zhí)行拓撲。

調(diào)度器會在每次調(diào)度之前嘗試按照拓撲順序決定所有邏輯節(jié)點的并行度:

  1. 對于 source 節(jié)點,其并行度會在開始調(diào)度之前就進行確定;
  2. 對于非 source 節(jié)點,需要在其所有上游節(jié)點數(shù)據(jù)產(chǎn)出完成后才能確定其并行度。

然后,調(diào)度程序將嘗試按照拓撲順序將邏輯節(jié)點展開生成執(zhí)行節(jié)點。一個可以被展開的邏輯節(jié)點應該滿足以下條件:

  1. 該邏輯節(jié)點并行度已確定;
  2. 所有上游邏輯節(jié)點都已經(jīng)被展開。

04未來展望 - 自動負載均衡

運行批作業(yè)時,可能會出現(xiàn)數(shù)據(jù)傾斜(某個執(zhí)行節(jié)點需要處理的數(shù)據(jù)遠多于其他執(zhí)行節(jié)點),這會導作業(yè)出現(xiàn)長尾現(xiàn)象,拖慢作業(yè)的完成速度。如果 Flink 可以自動改善或者解決這個問題,可以給用戶很大的幫助。

一種典型的數(shù)據(jù)傾斜情況是某些子分區(qū)的數(shù)據(jù)量明顯大于其他子分區(qū)。這種情況可以通過劃分更細粒度的子分區(qū),并根據(jù)子分區(qū)大小來平衡工作負載來解決(如圖 5)。自適應批作業(yè)調(diào)度器的工作可以被認為是邁向它的第一步,因為自動重新平衡的要求類似于自適應批作業(yè)調(diào)度器,它們都需要動態(tài)圖的支持和結果分區(qū)大小的采集。

基于自適應批作業(yè)調(diào)度器的實現(xiàn),我們可以通過增加最大并行度(為了更細粒度的子分區(qū))和簡單地更改子分區(qū)范圍劃分算法(為了平衡工作負載)來解決上述問題。在目前的設計中,子分區(qū)范圍是按照子分區(qū)的個數(shù)來劃分的,我們可以改成按照子分區(qū)中的數(shù)據(jù)量來劃分,這樣每個子分區(qū)范圍內(nèi)的數(shù)據(jù)量可以大致相同,從而平衡下游執(zhí)行節(jié)點的工作量。

圖 5 - 自動負載均衡

注釋

[1] https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/deployment/elastic_scaling/#adaptive-batch-scheduler

[2] https://github.com/apache/flink/blob/release-1.15/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java

[3] https://github.com/apache/flink/blob/release-1.15/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java

[4] https://github.com/apache/flink/blob/release-1.15/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java

[5] https://github.com/apache/flink/blob/release-1.15/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java

[6] https://nightlies.apache.org/flink/flink-docs-master/zh/docs/internals/job_scheduling/#jobmanager-數(shù)據(jù)結構

[7] https://github.com/apache/flink/blob/release-1.15/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/VertexParallelismDecider.java?


文章題目:自適應批作業(yè)調(diào)度器:為Flink批作業(yè)自動推導并行度
文章轉載:http://www.dlmjj.cn/article/dhddspg.html