日本综合一区二区|亚洲中文天堂综合|日韩欧美自拍一区|男女精品天堂一区|欧美自拍第6页亚洲成人精品一区|亚洲黄色天堂一区二区成人|超碰91偷拍第一页|日韩av夜夜嗨中文字幕|久久蜜综合视频官网|精美人妻一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
使用Kafka和Druid了解Spark流

使用Kafka和Druid了解Spark流

作者:聞數(shù)起舞 2020-05-14 10:26:27

大數(shù)據(jù)

Kafka

Spark 在本博文中,我將分享通過將Spark Streaming,Kafka和Apache Druid結(jié)合在一起以構(gòu)建實時分析儀表板,以確保精確的數(shù)據(jù)表示而獲得的知識。

成都創(chuàng)新互聯(lián)公司服務(wù)項目包括潛江網(wǎng)站建設(shè)、潛江網(wǎng)站制作、潛江網(wǎng)頁制作以及潛江網(wǎng)絡(luò)營銷策劃等。多年來,我們專注于互聯(lián)網(wǎng)行業(yè),利用自身積累的技術(shù)優(yōu)勢、行業(yè)經(jīng)驗、深度合作伙伴關(guān)系等,向廣大中小型企業(yè)、政府機構(gòu)等提供互聯(lián)網(wǎng)行業(yè)的解決方案,潛江網(wǎng)站推廣取得了明顯的社會效益與經(jīng)濟效益。目前,我們服務(wù)的客戶以成都為中心已經(jīng)輻射到潛江省份的部分城市,未來相信會繼續(xù)擴大服務(wù)區(qū)域并繼續(xù)獲得客戶的支持與信任!

作為一名數(shù)據(jù)工程師,我正在研究大數(shù)據(jù)技術(shù),例如Spark Streaming,Kafka和Apache Druid。 他們都有自己的教程和RTFM頁面。 但是,將這些技術(shù)大規(guī)模地組合在一起時,您會發(fā)現(xiàn)自己正在尋找涵蓋更復(fù)雜的生產(chǎn)用例的解決方案。 在本博文中,我將分享通過將Spark Streaming,Kafka和Apache Druid結(jié)合在一起以構(gòu)建實時分析儀表板,以確保精確的數(shù)據(jù)表示而獲得的知識。

在開始之前……關(guān)于實時分析的幾句話

實時分析是大數(shù)據(jù)技術(shù)的新趨勢,通常具有顯著的業(yè)務(wù)影響。 在分析新鮮數(shù)據(jù)時,見解更加精確。 例如,為數(shù)據(jù)分析師,BI和客戶經(jīng)理團隊提供實時分析儀表板可以幫助這些團隊做出快速決策。 大規(guī)模實時分析的常用架構(gòu)基于Spark Streaming和Kafka。 這兩種技術(shù)都具有很好的可擴展性。 它們在群集上運行,并在許多計算機之間分配負載。 Spark作業(yè)的輸出可以到達許多不同的目的地,這取決于特定的用例和體系結(jié)構(gòu)。 我們的目標是提供顯示實時事件的可視工具。 為此,我們選擇了Apache Druid數(shù)據(jù)庫。

Apache Druid中的數(shù)據(jù)可視化

Druid是高性能的實時分析數(shù)據(jù)庫。 它的好處之一是能夠使用來自Kafka主題的實時數(shù)據(jù),并使用Pivot模塊在其上構(gòu)建強大的可視化效果。 它的可視化功能可以運行各種臨時的"切片和切塊"查詢,并快速獲得可視化結(jié)果。 這對于分析各種用例非常有用,例如特定運動在某些國家的表現(xiàn)。 實時檢索數(shù)據(jù),延遲1-2分鐘。

架構(gòu)

因此,我們決定基于Kafka事件和Apache Druid構(gòu)建實時分析系統(tǒng)。 我們已經(jīng)在Kafka主題中進行過活動。 但是我們不能將它們直接攝取到德魯伊中。 我們需要為每個事件添加更多維度。 我們需要用更多的數(shù)據(jù)豐富每個事件,以便在德魯伊中方便地查看它。 關(guān)于規(guī)模,我們每分鐘要處理數(shù)十萬個事件,因此我們需要使用能夠支持這些數(shù)字的技術(shù)。 我們決定使用Spark Streaming作業(yè)豐富原始的Kafka事件。

圖1.實時分析架構(gòu)

Spark Streaming作業(yè)永遠運行? 并不是的。

Spark Streaming作業(yè)的想法是它始終在運行。 這項工作永遠都不應(yīng)停止。 它不斷讀取來自Kafka主題的事件,對其進行處理,并將輸出寫入另一個Kafka主題。 但是,這是一個樂觀的看法。 在現(xiàn)實生活中,事情更加復(fù)雜。 Spark群集中存在驅(qū)動程序故障,在這種情況下,作業(yè)將重新啟動。 有時新版本的spark應(yīng)用程序已部署到生產(chǎn)中。 在這種情況下會發(fā)生什么? 重新啟動的作業(yè)如何讀取Kafka主題并處理事件? 在深入研究這些細節(jié)之前,此圖顯示了重新啟動Spark Streaming作業(yè)時在Druid中看到的內(nèi)容:

圖2.作業(yè)重新啟動時數(shù)據(jù)丟失

絕對是數(shù)據(jù)丟失!

我們要解決什么問題?

我們正在處理Spark Streaming應(yīng)用程序,該應(yīng)用程序從一個Kafka主題讀取事件,并將事件寫入另一個Kafka主題。 這些事件稍后將在Druid中顯示。 我們的目標是在重新啟動Spark Streaming應(yīng)用程序期間實現(xiàn)平滑的數(shù)據(jù)可視化。 換句話說,我們需要確保在Spark Streaming作業(yè)重啟期間不會丟失或重復(fù)任何事件。

都是關(guān)于補償

為了理解為什么作業(yè)重新啟動時會丟失數(shù)據(jù),我們需要熟悉Kafka體系結(jié)構(gòu)中的一些術(shù)語。 您可以在這里找到Kafka的官方文檔。 簡而言之:Kafka中的事件存儲在主題中; 每個主題都分為多個分區(qū)。 分區(qū)中的每個記錄都有一個偏移量-一個連續(xù)的數(shù)字,它定義了記錄的順序。 當(dāng)應(yīng)用程序使用該主題時,它可以通過多種方式處理偏移量。 默認行為始終是從最新的偏移量讀取。 另一個選擇是提交偏移量,即持久保留偏移量,以便作業(yè)可以在重新啟動時讀取已提交的偏移量并從此處繼續(xù)。 讓我們看一下解決方案的步驟,并在每個步驟中加深對Kafka膠印管理的了解。

步驟#1-自動提交偏移量

默認行為始終是從最新的偏移量讀取。 這將不起作用,因為重新啟動作業(yè)時,該主題中有新事件。 如果作業(yè)從最新讀取,它將丟失重新啟動期間添加的所有消息,如圖2所示。Spark Streaming中有一個" enable.auto.commit"參數(shù)。 默認情況下,其值為false。 圖3顯示了將其值更改為true,運行Spark應(yīng)用程序并重新啟動后的行為。

圖3.作業(yè)重啟的數(shù)據(jù)峰值

我們可以看到,使用Kafka自動提交功能會產(chǎn)生新的效果。 沒有"數(shù)據(jù)丟失",但是現(xiàn)在我們看到重復(fù)的事件。 沒有真正的事件"爆發(fā)"。 實際發(fā)生的情況是自動提交機制"不時"提交偏移量。 輸出主題中有許多未提交的消息。 重新啟動后,作業(yè)將使用最新提交的偏移量中的消息,并再次處理其中一些事件。 這就是為什么在輸出中會出現(xiàn)大量事件的原因。

顯然,將這些重復(fù)項合并到我們的可視化中可能會誤導(dǎo)業(yè)務(wù)消費者此數(shù)據(jù),并影響他們的決策和對系統(tǒng)的信任。

步驟#2:手動提交Kafka偏移

因此,我們不能依靠Kafka自動提交功能。 我們需要自己進行卡夫卡補償。 為了做到這一點,讓我們看看Spark Streaming如何使用Kafka主題中的數(shù)據(jù)。 Spark Streaming使用稱為離散流或DStream的體系結(jié)構(gòu)。 DStream由一系列連續(xù)的RDD(彈性分布式數(shù)據(jù)集)表示,這是Spark的主要抽象之一。 大多數(shù)Spark Streaming作業(yè)如下所示:

  
 
 
 
  1. dstream.foreachRDD { rdd => rdd.foreach { record => process(record)} } 

在我們的案例中,處理記錄意味著將記錄寫入輸出Kafka主題。 因此,為了提交Kafka偏移量,我們需要執(zhí)行以下操作:

  
 
 
 
  1. dstream.foreachRDD { rdd => val offsetRanges =  
  2. rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd.foreach { record  
  3. => process(record)}  
  4. stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) } 

這是一種簡單明了的方法,在我們深入討論之前,讓我們看一下大局。 假設(shè)我們正確處理了偏移量。 即,在每次RDD處理之后都保存偏移量。 當(dāng)我們停止工作時會怎樣? 該作業(yè)在RDD的處理過程中停止。 微批處理的部分將寫入輸出Kafka主題,并且不會提交。 一旦作業(yè)再次運行,它將第二次處理某些消息,并且重復(fù)消息的峰值將(與以前一樣)出現(xiàn)在Druid中:

圖4.作業(yè)重新啟動時的數(shù)據(jù)峰值

正常關(guān)機

事實證明,有一種方法可以確保在RDD處理期間不會殺死作業(yè)。這稱為"正常關(guān)機"。有幾篇博客文章描述了如何優(yōu)雅地殺死Spark應(yīng)用程序,但是其中大多數(shù)與舊版本的Spark有關(guān),并且有很多限制。我們一直在尋找一種適用于任何規(guī)模且不依賴于特定Spark版本或操作系統(tǒng)的"安全"解決方案。要啟用正常關(guān)機,應(yīng)使用以下參數(shù)創(chuàng)建Spark上下文:spark.streaming.stopGracefullyOnShutdown = true。這指示Spark在JVM關(guān)閉時(而不是立即)正常關(guān)閉StreamingContext。另外,我們需要一種機制來有意地停止工作,例如在部署新版本時。我們已經(jīng)通過簡單地檢查是否存在指示作業(yè)關(guān)閉的HDFS文件來實現(xiàn)該機制的第一個版本。當(dāng)文件顯示在HDFS中時,流上下文將使用以下參數(shù)停止:ssc.stop(stopSparkContext = true,stopGracefully = true)

在這種情況下,只有在完成所有接收到的數(shù)據(jù)處理之后,Spark應(yīng)用程序才會正常停止。 這正是我們所需要的。

步驟#3:Kafka commitAsync

讓我們回顧一下到目前為止的情況。 我們有意在每個RDD處理中提交Kafka偏移量(使用Kafka commitAsync API),并使用Spark正常關(guān)機。 顯然,還有另一個警告。 深入研究Kafka API和Kafka commitAsync()源代碼的文檔,我了解到commitAsync()僅將offsetRanges放入隊列中,實際上僅在下一個foreachRDD循環(huán)中進行處理。 即使Spark作業(yè)正常停止并完成了所有RDD的處理,實際上也不會提交最后一個RDD的偏移量。 為解決此問題,我們實現(xiàn)了一個代碼,該代碼可同步保留Kafka偏移量,并且不依賴于Kafka commitAsync()。 然后,對于每個RDD,我們將提交的偏移量存儲在HDFS文件中。 當(dāng)作業(yè)再次開始運行時,它將從HDFS加載偏移文件,并從這些偏移開始使用Kafka主題。

在這里,它有效!

僅僅是正常關(guān)機和Kafka偏移量同步存儲的組合,才為我們提供了理想的結(jié)果。 重新啟動期間沒有數(shù)據(jù)丟失,沒有數(shù)據(jù)高峰:

圖5.重新啟動Spark作業(yè)期間沒有峰值數(shù)據(jù)丟失

結(jié)論

解決Spark Streaming和Kafka之間的集成問題是構(gòu)建實時分析儀表板的重要里程碑。 我們找到了可以確保穩(wěn)定的數(shù)據(jù)流的解決方案,而不會在Spark Streaming作業(yè)重啟期間丟失事件或重復(fù)。 現(xiàn)在,我們獲得了在Druid中可視化的可信賴數(shù)據(jù)。 因此,我們將更多類型的事件(Kafka主題)添加到了Druid中,并建立了實時儀表板。 這些儀表板為各種團隊提供了見解,例如BI,產(chǎn)品和客戶支持。 我們的下一個目標是利用Druid的更多功能,例如新的分析功能和警報。


網(wǎng)站標題:使用Kafka和Druid了解Spark流
標題網(wǎng)址:http://www.dlmjj.cn/article/cdidsgh.html