日本综合一区二区|亚洲中文天堂综合|日韩欧美自拍一区|男女精品天堂一区|欧美自拍第6页亚洲成人精品一区|亚洲黄色天堂一区二区成人|超碰91偷拍第一页|日韩av夜夜嗨中文字幕|久久蜜综合视频官网|精美人妻一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
怎么進(jìn)行spark.streaming.concurrentJobs參數(shù)解密的分析

這篇文章將為大家詳細(xì)講解有關(guān)怎么進(jìn)行spark.streaming.concurrentJobs參數(shù)解密的分析,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關(guān)知識有一定的了解。

創(chuàng)新互聯(lián)建站是一家集網(wǎng)站建設(shè),紫云企業(yè)網(wǎng)站建設(shè),紫云品牌網(wǎng)站建設(shè),網(wǎng)站定制,紫云網(wǎng)站建設(shè)報價,網(wǎng)絡(luò)營銷,網(wǎng)絡(luò)優(yōu)化,紫云網(wǎng)站推廣為一體的創(chuàng)新建站企業(yè),幫助傳統(tǒng)企業(yè)提升企業(yè)形象加強(qiáng)企業(yè)競爭力??沙浞譂M足這一群體相比中小企業(yè)更為豐富、高端、多元的互聯(lián)網(wǎng)需求。同時我們時刻保持專業(yè)、時尚、前沿,時刻以成就客戶成長自我,堅持不斷學(xué)習(xí)、思考、沉淀、凈化自己,讓我們?yōu)楦嗟钠髽I(yè)打造出實用型網(wǎng)站。

最近,在spark streaming 調(diào)優(yōu)時,發(fā)現(xiàn)個增加job并行度的參數(shù)spark.streaming.concurrentJobs,spark 默認(rèn)值為1,當(dāng)增加為2時(在spark-default中配置),如遇到處理速度慢 streaming application UI 中會有兩個Active Jobs(默認(rèn)值時為1),也就是在同一時刻可以執(zhí)行兩個批次的streaming job,下文分析這個參數(shù)是如何影響streaming 的執(zhí)行的。 ##參數(shù)引入 在spark streaming 的JobScheduler line 47,讀取了該參數(shù):

private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1)
private val jobExecutor =  ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor")

使用concurrentJobs參數(shù)初始化jobExecutor線程池,也就是這個參數(shù)直接影響了job executor線程池中的線程數(shù)目。

job executor

job executor 線程池用來execute JobHandler線程;在jobSchedule中有個job容器jobSets:

private val jobSets: java.util.Map[Time, JobSet] = new ConcurrentHashMap[Time, JobSet]

用來保存不同的時間點生成的JobSet,而JobSet中包含多個Job; JobSet submit邏輯:

  def submitJobSet(jobSet: JobSet) {
    if (jobSet.jobs.isEmpty) {
      logInfo("No jobs added for time " + jobSet.time)
    } else {
      listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
      jobSets.put(jobSet.time, jobSet)
      jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
      logInfo("Added jobs for time " + jobSet.time)
    }
  }

不難看出jobExecutor的容量決定了池子中同時可以被處理的JobHandler線程數(shù),JobHandler是job的執(zhí)行線程,因此決定了可以被同時被提交的Job數(shù)目。

使用方法

可以通過集中方法為streaming job配置此參數(shù)。

  • spark-default中修改 全局性修改,所有的streaming job都會受到影響。

  • 提交streaming job是 --conf 參數(shù)添加(推薦) 在提交job時,可以使用--conf 參數(shù)為該job添加個性化的配置。例如: bin/spark-submit --master yarn --conf spark.streaming.concurrentJobs=5 設(shè)置該streaming job的job executor 線程池大小為5,在資源充足的情況下可以同時執(zhí)行5個batch job。

  • 代碼設(shè)置 在代碼中通過sparkConf設(shè)置: sparkConf.set("spark.streaming.concurrentJobs", "5"); 或者 System.setProperty("spark.streaming.concurrentJobs", "5");

scheduler mode的使用建議

在配置多個concurrentJob時,多個批次job被同時提交到集群中,也就需要更多的計算資源;當(dāng)沒有更多的計算資源(Executor)被分配個該streaming job時,可將schedul 調(diào)整為FAIR(公平調(diào)度)來達(dá)到被提交的多個job可公平的共享計算資源。 當(dāng)調(diào)整為公平調(diào)度時,job可以共享計算資源,而job的提交仍然是有時間順序的(雖然時間間隔很?。?,容易造成task在executor間分配的傾斜,拉長job的整體執(zhí)行時間。 當(dāng)使用fifo調(diào)度方式,先到的job優(yōu)先獲得計算資源,當(dāng)executor數(shù)目不足時,job會等待executor被釋放,task數(shù)目反而不易傾斜。 在實際使用時,如果executor數(shù)目足夠,建議使用FIFO模式,如在concurrentJob為默認(rèn)配置時,executor分配數(shù)目為m,則當(dāng)concurrentJobs配置為n時,executor建議分配為 n*m。

關(guān)于怎么進(jìn)行spark.streaming.concurrentJobs參數(shù)解密的分析就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學(xué)到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。


新聞標(biāo)題:怎么進(jìn)行spark.streaming.concurrentJobs參數(shù)解密的分析
網(wǎng)站鏈接:http://www.dlmjj.cn/article/ghphdd.html