新聞中心
這篇文章將為大家詳細(xì)講解有關(guān)怎么進(jìn)行spark.streaming.concurrentJobs參數(shù)解密的分析,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關(guān)知識有一定的了解。
創(chuàng)新互聯(lián)建站是一家集網(wǎng)站建設(shè),紫云企業(yè)網(wǎng)站建設(shè),紫云品牌網(wǎng)站建設(shè),網(wǎng)站定制,紫云網(wǎng)站建設(shè)報價,網(wǎng)絡(luò)營銷,網(wǎng)絡(luò)優(yōu)化,紫云網(wǎng)站推廣為一體的創(chuàng)新建站企業(yè),幫助傳統(tǒng)企業(yè)提升企業(yè)形象加強(qiáng)企業(yè)競爭力??沙浞譂M足這一群體相比中小企業(yè)更為豐富、高端、多元的互聯(lián)網(wǎng)需求。同時我們時刻保持專業(yè)、時尚、前沿,時刻以成就客戶成長自我,堅持不斷學(xué)習(xí)、思考、沉淀、凈化自己,讓我們?yōu)楦嗟钠髽I(yè)打造出實用型網(wǎng)站。
最近,在spark streaming 調(diào)優(yōu)時,發(fā)現(xiàn)個增加job并行度的參數(shù)spark.streaming.concurrentJobs
,spark 默認(rèn)值為1,當(dāng)增加為2時(在spark-default中配置),如遇到處理速度慢 streaming application UI 中會有兩個Active Jobs(默認(rèn)值時為1),也就是在同一時刻可以執(zhí)行兩個批次的streaming job,下文分析這個參數(shù)是如何影響streaming 的執(zhí)行的。 ##參數(shù)引入 在spark streaming 的JobScheduler line 47,讀取了該參數(shù):
private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1) private val jobExecutor = ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor")
使用concurrentJobs參數(shù)初始化jobExecutor線程池,也就是這個參數(shù)直接影響了job executor線程池中的線程數(shù)目。
job executor
job executor 線程池用來execute JobHandler線程;在jobSchedule中有個job容器jobSets:
private val jobSets: java.util.Map[Time, JobSet] = new ConcurrentHashMap[Time, JobSet]
用來保存不同的時間點生成的JobSet,而JobSet中包含多個Job; JobSet submit邏輯:
def submitJobSet(jobSet: JobSet) { if (jobSet.jobs.isEmpty) { logInfo("No jobs added for time " + jobSet.time) } else { listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo)) jobSets.put(jobSet.time, jobSet) jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job))) logInfo("Added jobs for time " + jobSet.time) } }
不難看出jobExecutor的容量決定了池子中同時可以被處理的JobHandler線程數(shù),JobHandler是job的執(zhí)行線程,因此決定了可以被同時被提交的Job數(shù)目。
使用方法
可以通過集中方法為streaming job配置此參數(shù)。
spark-default中修改 全局性修改,所有的streaming job都會受到影響。
提交streaming job是 --conf 參數(shù)添加(推薦) 在提交job時,可以使用--conf 參數(shù)為該job添加個性化的配置。例如:
bin/spark-submit --master yarn --conf spark.streaming.concurrentJobs=5
設(shè)置該streaming job的job executor 線程池大小為5,在資源充足的情況下可以同時執(zhí)行5個batch job。代碼設(shè)置 在代碼中通過sparkConf設(shè)置:
sparkConf.set("spark.streaming.concurrentJobs", "5");
或者System.setProperty("spark.streaming.concurrentJobs", "5");
scheduler mode的使用建議
在配置多個concurrentJob時,多個批次job被同時提交到集群中,也就需要更多的計算資源;當(dāng)沒有更多的計算資源(Executor)被分配個該streaming job時,可將schedul 調(diào)整為FAIR(公平調(diào)度)來達(dá)到被提交的多個job可公平的共享計算資源。 當(dāng)調(diào)整為公平調(diào)度時,job可以共享計算資源,而job的提交仍然是有時間順序的(雖然時間間隔很?。?,容易造成task在executor間分配的傾斜,拉長job的整體執(zhí)行時間。 當(dāng)使用fifo調(diào)度方式,先到的job優(yōu)先獲得計算資源,當(dāng)executor數(shù)目不足時,job會等待executor被釋放,task數(shù)目反而不易傾斜。 在實際使用時,如果executor數(shù)目足夠,建議使用FIFO模式,如在concurrentJob為默認(rèn)配置時,executor分配數(shù)目為m,則當(dāng)concurrentJobs配置為n時,executor建議分配為 n*m。
關(guān)于怎么進(jìn)行spark.streaming.concurrentJobs參數(shù)解密的分析就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學(xué)到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
新聞標(biāo)題:怎么進(jìn)行spark.streaming.concurrentJobs參數(shù)解密的分析
網(wǎng)站鏈接:http://www.dlmjj.cn/article/ghphdd.html