新聞中心
使用ElasticSearch、Kafka和Cassandra構(gòu)建流式數(shù)據(jù)中心
作者:佚名 2015-10-22 14:02:58
服務(wù)器
數(shù)據(jù)中心
Kafka 在過(guò)去的一年里,我遇到了一些軟件公司討論如何處理應(yīng)用程序的數(shù)據(jù)(通常以日志和metrics的形式)。在這些討論中,我經(jīng)常會(huì)聽(tīng)到挫折感,他們不得不用一組零碎的工具,隨著時(shí)間的推移將這些數(shù)據(jù)匯總起來(lái)。

在過(guò)去的一年里,我遇到了一些軟件公司討論如何處理應(yīng)用程序的數(shù)據(jù)(通常以日志和metrics的形式)。在這些討論中,我經(jīng)常會(huì)聽(tīng)到挫折感,他們不得不用一組零碎的工具,隨著時(shí)間的推移將這些數(shù)據(jù)匯總起來(lái)。這些工具,如: - 運(yùn)維人員使用的,用于監(jiān)控和告警的工具
- 開(kāi)發(fā)人員用于跟蹤性能和定位問(wèn)題的工具
- 一個(gè)完整獨(dú)立的系統(tǒng),商業(yè)智能(BI)和業(yè)務(wù)依賴其分析用戶行為。
雖然這些工具使用不同的視角,適用不同的場(chǎng)景,但是他們同樣都是關(guān)注數(shù)據(jù)來(lái)源和類型。因此,許多軟件團(tuán)隊(duì)說(shuō),“如果時(shí)間充裕,我們可以建立一個(gè)更好的”,坦率地說(shuō),現(xiàn)在有很多出色的開(kāi)源代碼,自己重頭建立一套是否更有意義值得商榷。在Jut我們就是這樣做的。我們使用開(kāi)源的大數(shù)據(jù)組件建立了一個(gè)流式數(shù)據(jù)分析系統(tǒng),這篇文章描述了我們使用的片段以及我們?nèi)绾伟阉鼈兘M合在一起。我們將介紹:
- 數(shù)據(jù)攝取:如何引入不同類型的數(shù)據(jù)流
- 索引及保存數(shù)據(jù):高效存儲(chǔ)以及統(tǒng)一查詢
- 串聯(lián):系統(tǒng)中的數(shù)據(jù)流過(guò)程
- 調(diào)優(yōu):讓整個(gè)過(guò)程真正的快速,用戶才會(huì)真的使用它。
我希望通過(guò)閱讀這篇文章將有助于您的系統(tǒng)在一個(gè)理智的,可擴(kuò)展的方式避免一些我們遇到的陷阱。
1數(shù)據(jù)攝取
當(dāng)涉及到業(yè)務(wù)分析和監(jiān)控,大部分相關(guān)的數(shù)據(jù)類型,格式和傳輸協(xié)議并不是固定的。你需要能夠支持系統(tǒng)不同的數(shù)據(jù)來(lái)源和數(shù)據(jù)發(fā)送者。例如,您的數(shù)據(jù)可能包括下列任何一種:
- 自定義的應(yīng)用程序事件。
- 容器級(jí)指標(biāo)和日志。
- statsd或收集的度量指標(biāo)。
- 來(lái)自第三方的webhook事件,像GitHub或Stripe。
- 應(yīng)用程序或服務(wù)器日志。
- 用戶行為。 雖然這些都有不同的格式和象征,他們?cè)谙到y(tǒng)內(nèi)部需要一個(gè)統(tǒng)一的格式。無(wú)論你選擇哪一個(gè)格式,你都需要對(duì)輸入的數(shù)據(jù)流做轉(zhuǎn)換。
我們選擇了簡(jiǎn)單靈活的數(shù)據(jù)格式:每個(gè)記錄(“點(diǎn)”)是一系列的鍵/值對(duì),它可以方便地表示為一個(gè)JSON對(duì)象。所有的點(diǎn)都有一個(gè)“時(shí)間”字段,度量點(diǎn)也有一個(gè)數(shù)值型的“值”字段;其他點(diǎn)可以有任何的“形狀”。前端HTTPS服務(wù)器(運(yùn)行Nginx)接收數(shù)據(jù),多路分配并發(fā)送到本地的每個(gè)數(shù)據(jù)類型“連接器”進(jìn)程(運(yùn)行Node.js)。這些進(jìn)程將傳入的數(shù)據(jù)轉(zhuǎn)換為系統(tǒng)的內(nèi)部格式,然后將它們發(fā)布到一個(gè)Kafka topic(可靠性),從中,它們可以被用于索引和/或處理。 除了上面的數(shù)據(jù)類型,多考慮使用連接器,能使您自己的團(tuán)隊(duì)最容易將輸入數(shù)據(jù)整合到您的數(shù)據(jù)總線。你可能不需要太多我在這里描述的通用性或靈活性,但設(shè)計(jì)一些靈活性總是好的,這使你系統(tǒng)能夠攝取更多的數(shù)據(jù)類型,防止以后新數(shù)據(jù)到來(lái)要重新建造。
2索引及保存數(shù)據(jù)
所有這些數(shù)據(jù)都需要保存在某個(gè)地方。***在一個(gè)數(shù)據(jù)庫(kù)中,當(dāng)您的數(shù)據(jù)需要的增長(zhǎng)時(shí),將很容易擴(kuò)展。并且如果該數(shù)據(jù)庫(kù)提供對(duì)分析類型的查詢方式支持,那***不過(guò)了。如果這個(gè)數(shù)據(jù)中心只是為了存儲(chǔ)日志和事件,那么你可以選擇Elasticsearch。如果這只是關(guān)于度量指標(biāo),你可以選擇一個(gè)時(shí)間序列數(shù)據(jù)庫(kù)(TSDB)。但是我們都需要處理。我們最終建立了一個(gè)系統(tǒng),有多個(gè)本地?cái)?shù)據(jù)存儲(chǔ),以便我們能夠最有效地處理不同類型的數(shù)據(jù)。
ElasticSearch保存日志以及Events
我們使用Elasticsearch作為事件數(shù)據(jù)庫(kù)。這些事件可以有不同的“形狀”,這取決于他們來(lái)自哪一個(gè)來(lái)源。我們使用了一些Elasticsearch API,效果很好,特別是查詢和聚合API。
Cassandra和ElasticSearch保存Metrics
而metrics,原則上,是完全存儲(chǔ)在Elasticsearch(或任何其他數(shù)據(jù)庫(kù)),使用一個(gè)專門的匹配metrics數(shù)據(jù)結(jié)構(gòu)以及metrics冗余數(shù)據(jù)的數(shù)據(jù)庫(kù)將更有效。 ***的方法是使用現(xiàn)有的開(kāi)源時(shí)間序列數(shù)據(jù)庫(kù)(TSDB)。
我們最初是這么使用的 —— 我使用開(kāi)源TSDB并使用Cassandra作為后端。這種方法的挑戰(zhàn)是,TSDB有自己的查詢API,它不同于Elasticsearch的API。由于API之間的不同,為事件和指標(biāo)提供一個(gè)統(tǒng)一的搜索和查詢界面是很難的。 這就是為什么我們最終決定寫(xiě)自己的TSDB,通過(guò)Casandra和Elasticsearch存儲(chǔ)metrics。
具體來(lái)說(shuō),我們?cè)贑assandra中存儲(chǔ)的時(shí)間/值的鍵值對(duì),在Elasticsearch中存儲(chǔ)元數(shù)據(jù),并在頂部有一個(gè)查詢和管理層。這樣,搜索和查詢事件以及metrics可以統(tǒng)一在Elasticsearch做。 流式處理引擎 那么現(xiàn)在我們有一個(gè)攝取數(shù)據(jù)的途徑和一些數(shù)據(jù)庫(kù)。我們是否可以準(zhǔn)備添加前端應(yīng)用程序并使用我們的數(shù)據(jù)?并沒(méi)有!盡管Elasticsearch本身可以做一些日志和事件分析,我們?nèi)匀贿€需要一個(gè)處理引擎。因?yàn)椋?/p>
- 我們需要一個(gè)統(tǒng)一的方式來(lái)訪問(wèn)事件和指標(biāo),包括實(shí)時(shí)或歷史的數(shù)據(jù)。
- 對(duì)于某些情況(監(jiān)控、報(bào)警),當(dāng)它發(fā)生時(shí),我們需要實(shí)時(shí)處理這些數(shù)據(jù)。
- 度量指標(biāo)!我們想要做的不只是尋找度量指標(biāo)并讀出來(lái)
- 度量指標(biāo)是為了優(yōu)化現(xiàn)有的度量。
- 即使是事件,我們需要一個(gè)比Elasticsearch API更通用的處理能力。例如,join不同的來(lái)源和數(shù)據(jù),或做字符串解析,或自定義聚合。 從這里開(kāi)始,事情變得非常有趣。你可以花一天(或更多)研究別人是如何建立數(shù)據(jù)管道,了解Lambda,Kappa等數(shù)據(jù)架構(gòu)。實(shí)際上有很多非常好的資料在那里。我們就開(kāi)門見(jiàn)山:我們達(dá)到的效果,是一個(gè)支持實(shí)時(shí)數(shù)據(jù)流和批處理計(jì)算的處理引擎。在這方面,我們完全支持,有興趣的可以看這里以及這里。
在這里,不同于存儲(chǔ)和攝取,我們從頭建立了自己的處理引擎,- 不是因?yàn)闆](méi)有其他的流處理引擎,而是由于我們看重查詢的性能,我們將在下面的部分單獨(dú)討論。更具體地說(shuō),我們建立了一個(gè)流處理引擎,實(shí)現(xiàn)了數(shù)據(jù)流處理模型,計(jì)算表示被表示為一系列操作的有向圖,將輸入轉(zhuǎn)化為輸出的,這些操作包括聚合,窗口,過(guò)濾或join。這能很自然的將模型的查詢和計(jì)算組合起來(lái),適合實(shí)時(shí)和批量,且適合分布式運(yùn)行。
當(dāng)然,除非你真的在尋找建立一個(gè)新的項(xiàng)目,然而我們推薦你使用一個(gè)開(kāi)源的流處理引擎。我們建議你看看Riemann, Spark Streaming或者Apache Flink。
3查詢和計(jì)算
我們使用流處理引擎,基于數(shù)據(jù)流模型的計(jì)算。但用戶如何表達(dá)查詢和創(chuàng)建這樣的數(shù)據(jù)流圖?一個(gè)方法是提供一個(gè)API或嵌入式DSL。該接口將需要提供查詢和篩選數(shù)據(jù)、定義轉(zhuǎn)換和其他處理操作的方法,而且最重要的是,提供一種將多個(gè)處理階段組合并應(yīng)用到流圖的方法。上述每一個(gè)項(xiàng)目都有自己的API,而個(gè)人的偏好可能有所不同,API常見(jiàn)的一個(gè)挑戰(zhàn)是,SQL分析師或Excel用戶無(wú)法方便的使用。
一個(gè)可能的解決問(wèn)題的方案,在這一點(diǎn)上,可以讓這些用戶通過(guò)基于這些API構(gòu)建的工具來(lái)訪問(wèn)系統(tǒng)(例如,一個(gè)簡(jiǎn)單的web應(yīng)用程序)。
另一種方法是提供一個(gè)簡(jiǎn)單的查詢語(yǔ)言。這是我們Jut在做的。因?yàn)槟壳皼](méi)有現(xiàn)有的數(shù)據(jù)流的查詢語(yǔ)言(如SQL之于關(guān)系查詢),我們創(chuàng)建了一個(gè)數(shù)據(jù)流查詢語(yǔ)言稱為Juttle。它的核心,Juttle的流圖查詢語(yǔ)言可以用簡(jiǎn)單的語(yǔ)法,聲明處理管道,如上圖所示。
它具有這些原語(yǔ),search,window,join,aggregation和group-by,語(yǔ)法簡(jiǎn)單。當(dāng)然,在處理一個(gè)流程圖數(shù)據(jù)之前,你需要取得到數(shù)據(jù) - Juttle允許您定義查詢獲取數(shù)據(jù),通過(guò)事件和/或度量的任何組合,實(shí)時(shí)和/或歷史的,都具有相同的語(yǔ)法和結(jié)構(gòu)。下面是一個(gè)簡(jiǎn)單的例子,遵循一個(gè)模式… query | analyze | view (注意鏈接使用管道操作符,語(yǔ)法類似shell)。 ``` read -from :1 day ago: datatype = 'weblog' | reduce -every :minute: count() by status_code | @timechart ```
4拼在一起:一個(gè)異常檢測(cè)的例子
到目前為止,我們已經(jīng)采取了一個(gè)組件為中心的視角-我們已經(jīng)討論了組成成分和它們的作用,但沒(méi)怎么提到關(guān)于如何將它們組合在一起。現(xiàn)在我們將視角切換到以數(shù)據(jù)為中心,看看支持實(shí)時(shí)和歷史查詢需要哪些步驟。讓我們使用一個(gè)異常檢測(cè)算法的實(shí)例來(lái)解說(shuō)。這是一個(gè)很好的例子,因?yàn)槲覀冃枰樵儦v史數(shù)據(jù)來(lái)訓(xùn)練潛在的統(tǒng)計(jì)模型,實(shí)時(shí)流數(shù)據(jù)來(lái)測(cè)試異常,然后我們需要把結(jié)果寫(xiě)回系統(tǒng),同時(shí)異常告警。
但是,在我們做任何查詢之前,我們需要串聯(lián)下攝取的整個(gè)過(guò)程,傳入的數(shù)據(jù)是如何寫(xiě)入索引存儲(chǔ)。這是由import服務(wù)完成的,服務(wù)完成了包括寫(xiě)入時(shí)間序列數(shù)據(jù)庫(kù),將指標(biāo)數(shù)據(jù)和元數(shù)據(jù)存儲(chǔ)在Elasticsearch和Cassandra。
現(xiàn)在一個(gè)用戶來(lái)了,啟動(dòng)了一個(gè)異常檢測(cè)的job。這需要讀取歷史數(shù)據(jù),通過(guò)任務(wù)處理引擎直接查詢底層數(shù)據(jù)庫(kù)來(lái)進(jìn)行的。不同的查詢和數(shù)據(jù)可以進(jìn)一步做性能優(yōu)化(下面討論),和/或?qū)嵤┒攘繑?shù)據(jù)庫(kù)的讀取路徑(查詢Elasticsearch中的元數(shù)據(jù),獲取Cassandra中的度量值,并結(jié)合結(jié)果產(chǎn)生實(shí)際的度量點(diǎn))。
歷史數(shù)據(jù)涵蓋了一些過(guò)去范圍內(nèi)的數(shù)據(jù),處理引擎將歷史數(shù)據(jù)轉(zhuǎn)換成流向圖的實(shí)時(shí)數(shù)據(jù)。為了做到這一點(diǎn),處理引擎直接將數(shù)據(jù)導(dǎo)入import服務(wù)的入口點(diǎn)。請(qǐng)注意,這種切換必須小心,以免數(shù)據(jù)丟棄或者數(shù)據(jù)重復(fù)。
在這一點(diǎn)上,我們有一個(gè)訓(xùn)練有素的異常檢測(cè)流圖運(yùn)行在實(shí)時(shí)數(shù)據(jù)上。當(dāng)檢測(cè)到異常時(shí),我們希望它將警報(bào)發(fā)送給一些外部的系統(tǒng),這可以通過(guò)處理引擎向外部的HTTP服務(wù)POST數(shù)據(jù)。除了發(fā)送警報(bào),我們還希望保持對(duì)內(nèi)部系統(tǒng)的跟蹤。換句話說(shuō),我們希望能夠?qū)?shù)據(jù)流寫(xiě)回系統(tǒng)中。從概念上講這是通過(guò)處理引擎管道返回?cái)?shù)據(jù)到攝取途徑。
5調(diào)優(yōu)
那么我們已有了一個(gè)攝取數(shù)據(jù)的工作系統(tǒng)的和一些數(shù)據(jù)庫(kù)以及處理引擎。我們可以準(zhǔn)備添加前端應(yīng)用程序并分析我們的數(shù)據(jù)了嗎?還沒(méi)有! 嗯,我們實(shí)際上可以這樣做,但問(wèn)題是我們的查詢性能仍然會(huì)非常慢。而緩慢的查詢意味著……沒(méi)有人會(huì)使用我們的系統(tǒng)。
因此,讓我們重新審視一下“統(tǒng)一處理引擎”的概念。按照我們的解釋,它是同一個(gè)系統(tǒng)使用相同結(jié)構(gòu),抽象和查詢來(lái)處理歷史或?qū)崟r(shí)的數(shù)據(jù)。 性能挑戰(zhàn)來(lái)自于這樣的一個(gè)事實(shí),歷史數(shù)據(jù)比實(shí)時(shí)數(shù)據(jù)要多的多。例如,假設(shè)我們有一百萬(wàn)點(diǎn)/秒的速度輸入到系統(tǒng),并有一個(gè)是足夠快處理過(guò)程,可以在數(shù)據(jù)錄入時(shí)進(jìn)行實(shí)時(shí)查詢?,F(xiàn)在采取相同的查詢語(yǔ)義查詢過(guò)去一天的數(shù)據(jù) 。
這將需要一次性處理數(shù)百億點(diǎn)(或者,至少,必須能跟的上從存儲(chǔ)點(diǎn)讀取的速度)。假設(shè)計(jì)算是分布式的,我們可以通過(guò)增加計(jì)算節(jié)點(diǎn)來(lái)解決,但在***的情況下,這將是低效和昂貴的。 所以這就是優(yōu)化的所在。有許多方法可以優(yōu)化數(shù)據(jù)查詢。其中一些包括對(duì)查詢本身進(jìn)行轉(zhuǎn)換 。例如,上游數(shù)據(jù)的filters或aggregations盡可能不改變查詢語(yǔ)義。我們說(shuō)的這種優(yōu)化,是將數(shù)據(jù)的filter和處理盡量由數(shù)據(jù)庫(kù)去做。這需要做以下的:
- 自動(dòng)識(shí)別可以由數(shù)據(jù)庫(kù)處理查詢的部分
- 將對(duì)應(yīng)的部分轉(zhuǎn)換成目標(biāo)數(shù)據(jù)庫(kù)的查詢語(yǔ)言
- 運(yùn)行后端查詢并將結(jié)果注入到數(shù)據(jù)流圖的正確位置
6結(jié)語(yǔ)
我們做到了!當(dāng)然,如果不需要一個(gè)可視化層,我們就完成了。只能通過(guò)API來(lái)查詢系統(tǒng)。建立一個(gè)客戶端應(yīng)用程序來(lái)創(chuàng)建查詢,流和可視化數(shù)據(jù),組合儀表板是另外一個(gè)棘手的問(wèn)題,所以我們將改天討論這個(gè)。
現(xiàn)在,讓我們來(lái)總結(jié)一下我們?cè)诮ㄔO(shè)這個(gè)數(shù)據(jù)中心過(guò)程中的所見(jiàn)所聞:
- 一個(gè)攝取途徑,可以接受不同來(lái)源的輸入數(shù)據(jù),并將其轉(zhuǎn)換為統(tǒng)一的格式,并儲(chǔ)存起來(lái)供以后消費(fèi)。(在Jut,這是基于Kafka建立的)。
- 事件和度量的數(shù)據(jù)庫(kù)。在Jut,Events使用Elasticsearch,自己構(gòu)建的度量數(shù)據(jù)庫(kù)則基于Cassandra。
- 一個(gè)處理引擎(或是兩個(gè),如果你要用lambda ISH架構(gòu))。
- 在系統(tǒng)上運(yùn)行查詢的API或查詢語(yǔ)言。 唷。建立這套系統(tǒng),是一個(gè)漫長(zhǎng)而有趣的旅程。即便你要建立你自己的系統(tǒng),可以先試試Jut。你可能會(huì)覺(jué)得很好用。
本文標(biāo)題:使用Elasticsearch、Kafka和Cassandra構(gòu)建流式數(shù)據(jù)中心
當(dāng)前地址:http://www.dlmjj.cn/article/djojhic.html


咨詢
建站咨詢
