新聞中心
本篇內容介紹了“ReceiverTracker是怎么處理數(shù)據(jù)的”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
網(wǎng)站建設哪家好,找創(chuàng)新互聯(lián)公司!專注于網(wǎng)頁設計、網(wǎng)站建設、微信開發(fā)、微信小程序定制開發(fā)、集團企業(yè)網(wǎng)站建設等服務項目。為回饋新老客戶創(chuàng)新互聯(lián)還提供了曲靖免費建站歡迎大家使用!
ReceiverTracker可以以Driver中具體的算法計算出在具體的executor上啟動Receiver。啟動Receiver的方法是封裝在一個task中運行,這個task是job中唯一的task。實質上講,ReceiverTracker啟動Receiver時封裝成一個又一個的job。啟動Receiver的方法中有一個ReceiverSupervisorImpl,ReceiverSupervisorImpl的start方法會導致Receiver早work節(jié)點上真正的執(zhí)行。轉過來通過BlockGenerator把接收到的數(shù)據(jù)放入block中,并通過ReceiverSupervisorImpl把block進行存儲,然后把數(shù)據(jù)的元數(shù)據(jù)匯報給ReceiverTracker。
下面就講ReceiverTracker在接收到數(shù)據(jù)之后具體怎么處理。
ReceiverSupervisorImpl把block進行存儲是通過receivedBlockHandler來寫的。
private val receivedBlockHandler: ReceivedBlockHandler = {
if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {
...
new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId,
receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get)
} else {
new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel)
}
}
一種是通過WAL的方式,一種是通過BlockManager的方式。
/** Store block and report it to driver */
def pushAndReportBlock(
receivedBlock: ReceivedBlock,
metadataOption: Option[Any],
blockIdOption: Option[StreamBlockId]
) {
val blockId = blockIdOption.getOrElse(nextBlockId)
val time = System.currentTimeMillis
val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")
val numRecords = blockStoreResult.numRecords
val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
logDebug(s"Reported block $blockId")
}
把數(shù)據(jù)存儲起來切向ReceiverTracker匯報。匯報的時候是元數(shù)據(jù)。
/** Information about blocks received by the receiver */
private[streaming] case class ReceivedBlockInfo(
streamId: Int,
numRecords: Option[Long],
metadataOption: Option[Any],
blockStoreResult: ReceivedBlockStoreResult
Sealed關鍵字的意思就是所有的子類都在當前的文件中
ReceiverTracker管理Receiver的啟動、回收、接收匯報的元數(shù)據(jù)。ReceiverTracker在實例化之前必須所有的input stream都已經(jīng)被added和streamingcontext.start()。因為ReceiverTracker要為每個input stream啟動一個Receiver。
ReceiverTracker中有所有的輸入數(shù)據(jù)來源和ID。
private val receiverInputStreams = ssc.graph.getReceiverInputStreams()
private val receiverInputStreamIds = receiverInputStreams.map { _.id }
ReceiverTracker的狀態(tài)
/** Enumeration to identify current state of the ReceiverTracker */
object TrackerState extends Enumeration {
type TrackerState = Value
val Initialized, Started, Stopping, Stopped = Value
}
下面看一下ReceiverTracker在接收到ReceiverSupervisorImpl發(fā)送的AddBlock的消息后的處理。
case AddBlock(receivedBlockInfo) =>
if (WriteAheadLogUtils.isBatchingEnabled(ssc.conf, isDriver = true)) {
walBatchingThreadPool.execute(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
if (active) {
context.reply(addBlock(receivedBlockInfo))
} else {
throw new IllegalStateException("ReceiverTracker RpcEndpoint shut down.")
}
}
})
} else {
context.reply(addBlock(receivedBlockInfo))
}
先判斷一下是不是WAL得方式,如果是就用線程池中的一個線程來回復addBlock,因為WAL非常消耗性能。否則就直接回復addBlock。
讓后交給receiverBlockTracker 進行處理
/** Add new blocks for the given stream */
private def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
receivedBlockTracker.addBlock(receivedBlockInfo)
}
ReceiverBlockTracker是在Driver端管理blockInfo的。
/** Add received block. This event will get written to the write ahead log (if enabled). */
def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
try {
val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo))
if (writeResult) {
synchronized {
getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
}
logDebug(s"Stream ${receivedBlockInfo.streamId} received " +
s"block ${receivedBlockInfo.blockStoreResult.blockId}")
} else {
logDebug(s"Failed to acknowledge stream ${receivedBlockInfo.streamId} receiving " +
s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write Ahead Log.")
}
writeResult
} catch {
case NonFatal(e) =>
logError(s"Error adding block $receivedBlockInfo", e)
false
}
}
writeToLog的代碼很簡單,首先判斷是不是WAL得方式,如果是就把blockInfo寫入到日志中,用于以后恢復數(shù)據(jù)。否則的話就直接返回true。然后就把block的信息放入streamIdToUnallocatedBlockQueues中。
private val streamIdToUnallocatedBlockQueues = new mutable.HashMap[Int, ReceivedBlockQueue]
這個數(shù)據(jù)結構很精妙,key是streamid,value是一個隊列,把每一個stream接收的block信息分開存儲。這樣ReceiverBlockTracker就有了所有stream接收到的block信息。
/** Write an update to the tracker to the write ahead log */
private def writeToLog(record: ReceivedBlockTrackerLogEvent): Boolean = {
if (isWriteAheadLogEnabled) {
logTrace(s"Writing record: $record")
try {
writeAheadLogOption.get.write(ByteBuffer.wrap(Utils.serialize(record)),
clock.getTimeMillis())
true
} catch {
case NonFatal(e) =>
logWarning(s"Exception thrown while writing record: $record to the WriteAheadLog.", e)
false
}
} else {
true
}
}
詳細看一下ReceiverBlockTracker的注釋。這個class會追蹤所有接收到的blocks,并把他們按batch分配,如果有需要這個class接收的所有action都可以寫WAL中,如果指定了checkpoint的目錄,當Driver崩潰了,ReceiverBlockTracker的狀態(tài)(包括接收的blocks和分配的blocks)都可以恢復。如果實例化這個class的時候指定了checkpoint,就會從中讀取之前保存的信息。
/**
* Class that keep track of all the received blocks, and allocate them to batches
* when required. All actions taken by this class can be saved to a write ahead log
* (if a checkpoint directory has been provided), so that the state of the tracker
* (received blocks and block-to-batch allocations) can be recovered after driver failure.
*
* Note that when any instance of this class is created with a checkpoint directory,
* it will try reading events from logs in the directory.
*/
private[streaming] class ReceivedBlockTracker(
下面看一下ReceiverTracker接收到CleanupOldBlocks后的處理。
case c: CleanupOldBlocks =>
receiverTrackingInfos.values.flatMap(_.endpoint).foreach(_.send(c))
ReceiverTracker接收到這條消息后會給它管理的每一個Receiver發(fā)送這個消息。ReceiverSupervisorImpl接收到消息后使用receivedBlockHandler清理數(shù)據(jù)。
private def cleanupOldBlocks(cleanupThreshTime: Time): Unit = {
logDebug(s"Cleaning up blocks older then $cleanupThreshTime")
receivedBlockHandler.cleanupOldBlocks(cleanupThreshTime.milliseconds)
}
ReceiverTracker還可以隨時調整某一個streamID接收數(shù)據(jù)的速度,向對應的ReceiverSupervisorImpl發(fā)送UpdateRateLimit的消息。
case UpdateReceiverRateLimit(streamUID, newRate) =>
for (info <- receiverTrackingInfos.get(streamUID); eP <- info.endpoint) {
eP.send(UpdateRateLimit(newRate))
}
ReceiverSupervisorImpl接收到消息后。
case UpdateRateLimit(eps) =>
logInfo(s"Received a new rate limit: $eps.")
registeredBlockGenerators.foreach { bg =>
bg.updateRate(eps)
}
/**
* Set the rate limit to `newRate`. The new rate will not exceed the maximum rate configured by
* {{{spark.streaming.receiver.maxRate}}}, even if `newRate` is higher than that.
*
* @param newRate A new rate in events per second. It has no effect if it's 0 or negative.
*/
private[receiver] def updateRate(newRate: Long): Unit =
if (newRate > 0) {
if (maxRateLimit > 0) {
rateLimiter.setRate(newRate.min(maxRateLimit))
} else {
rateLimiter.setRate(newRate)
}
}
ReceiverTracker是一個門面設計模式,看似調用的是ReceiverTracker的功能,其實調用的是別的類的功能。
“ReceiverTracker是怎么處理數(shù)據(jù)的”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關的知識可以關注創(chuàng)新互聯(lián)網(wǎng)站,小編將為大家輸出更多高質量的實用文章!
分享文章:ReceiverTracker是怎么處理數(shù)據(jù)的
文章網(wǎng)址:http://www.dlmjj.cn/article/jcccjp.html