日本综合一区二区|亚洲中文天堂综合|日韩欧美自拍一区|男女精品天堂一区|欧美自拍第6页亚洲成人精品一区|亚洲黄色天堂一区二区成人|超碰91偷拍第一页|日韩av夜夜嗨中文字幕|久久蜜综合视频官网|精美人妻一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
使用SparkStreaming轉(zhuǎn)換不同的JSON有效負載

【】Spark Streaming 是底層基于 Spark Core 的對大數(shù)據(jù)進行實時計算的框架,可以流方式從源讀取數(shù)據(jù)。只需要從數(shù)據(jù)源創(chuàng)建一個讀取流,然后我們可以創(chuàng)建寫入流以將數(shù)據(jù)加載到目標數(shù)據(jù)源中。

創(chuàng)新互聯(lián)建站專業(yè)提供成都主機托管四川主機托管成都服務(wù)器托管四川服務(wù)器托管,支持按月付款!我們的承諾:貴族品質(zhì)、平民價格,機房位于中國電信/網(wǎng)通/移動機房,成都移動機房服務(wù)有保障!

?[[418750]]?

接下來的演示,將假設(shè)我們有不同的 JSON 有效負載進入一個 kafka 主題,我們需要將其轉(zhuǎn)換并寫入另一個 kafka 主題。

創(chuàng)建一個ReadStream

為了能連續(xù)接收JSON有效負載作為消息。我們需要首先讀取消息并使用spark的readstream創(chuàng)建數(shù)據(jù)幀。Spark 中提供了 readStream 函數(shù),我們可以使用這個函數(shù)基本上創(chuàng)建一個 readStream。這將從 kafka 主題中讀取流負載。 

val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()

我們可以創(chuàng)建一個 case-class(例如CustomerUnion),它將包含JSON有效負載的所有可能字段。這樣,我們就能在數(shù)據(jù)幀上運行select查詢而不會失敗。 

val rawDfValue = rawData.selectExpr("CAST(value AS STRING)").as[String]

val schema = ScalaReflection.schemaFor[CustomerUnion].dataType.asInstanceOf[StructType]

val extractedDFWithSchema = rawDfValue.select(from_json(col("value"), schema).as("data")).select("data.*")

extractedDFWithSchema.createOrReplaceTempView(“tempView”)

這將為我們提供一個數(shù)據(jù)幀提取的 DFWithSchema,其中包含作為有效負載字段的列。

示例輸入負載

這是兩個樣本輸入有效負載,但也可以有更多的有效負載,有些字段不存在(變量)。 

{
“id”: 1234,
“firstName”:”Jon”,
“l(fā)astName”:”Butler”,
“City”:”Newyork”,
“Email”:abc@gmail.com,
“Phone”:”2323123”
}

{
“firstName”:”Jon”,
“l(fā)astName”:”Butler”,
“City”:”Newyork”,
“Email”:abc@gmail.com,
“Phone”:”2323123”
}

樣例輸出負載

根據(jù)id字段,我們將決定輸出有效負載。如果存在一個 id 字段,我們將把它視為一個用戶更新案例,并且在輸出有效負載中只發(fā)送“Email”和“Phone”。我們可以根據(jù)某些條件配置任何字段。這只是一個例子。

如果 id 不存在,我們將發(fā)送所有字段。下面是兩個輸出載荷的示例: 

{
“userid”: 1234,
“Email”:abc@gmail.com,
“Phone”:”2323123”
}

{
“fullname”:”Jon Butler”,
“City”:”Newyork”,
“Email”:abc@gmail.com,
“Phone”:”2323123”
}

開始WriteStreams

一旦我們有了數(shù)據(jù)幀,我們就可以運行盡可能多的sql查詢,并根據(jù)所需的有效負載寫入 kafka 主題。因此,我們可以創(chuàng)建一個包含所有sql查詢的列表,并通過該列表進行循環(huán),并調(diào)用writeStream函數(shù)。讓我們假設(shè),我們有一個名為 queryList 的列表,它只包含字符串(即sql查詢)。

下面為寫入流定義的一個函數(shù): 

def startWriteStream(query: String): Unit = {

val transformedDf = spark.sql(query)
transformedDf
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.start()

}

這將啟動列表中每個查詢的寫入流。 

queryList.foreach(startWriteStream)
spark.streams.awaitAnyTermination()

如果我們知道輸入有效負載的所有可能字段,那么即使有一些字段不存在,我們的sql查詢也不會失敗。我們已經(jīng)將有效負載的模式指定為case-class,它將為缺席字段創(chuàng)建指定 NULL 的數(shù)據(jù)幀。

通過這種方式,我們可以使用 spark-streaming 在所需的轉(zhuǎn)換/過濾器之后將多個有效負載從同一主題寫入不同的主題。

【譯稿,合作站點轉(zhuǎn)載請注明原文譯者和出處為.com】


新聞名稱:使用SparkStreaming轉(zhuǎn)換不同的JSON有效負載
標題鏈接:http://www.dlmjj.cn/article/djgopeg.html