日本综合一区二区|亚洲中文天堂综合|日韩欧美自拍一区|男女精品天堂一区|欧美自拍第6页亚洲成人精品一区|亚洲黄色天堂一区二区成人|超碰91偷拍第一页|日韩av夜夜嗨中文字幕|久久蜜综合视频官网|精美人妻一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
使用Java多線程實(shí)現(xiàn)任務(wù)分發(fā)

多線程下載由來已久,如 FlashGet、NetAnts 等工具,它們都是依懶于 HTTP 協(xié)議的支持(Range 字段指定請求內(nèi)容范圍),首先能讀取出請求內(nèi)容 (即欲下載的文件) 的大小,劃分出若干區(qū)塊,把區(qū)塊分段分發(fā)給每個線程去下載,線程從本段起始處下載數(shù)據(jù)及至段尾,多個線程下載的內(nèi)容最終會寫入到同一個文件中。

只研究有用的,工作中的需求:要把多個任務(wù)分派給Java的多個線程去執(zhí)行,這其中就會有一個任務(wù)列表指派到線程的策略思考:已知:1. 一個待執(zhí)行的任務(wù)列表,2. 指定要啟動的線程數(shù);問題是:每個線程實(shí)際要執(zhí)行哪些任務(wù)。

使用Java多線程實(shí)現(xiàn)這種任務(wù)分發(fā)的策略是:任務(wù)列表連續(xù)按線程數(shù)分段,先保證每線程平均能分配到的任務(wù)數(shù),余下的任務(wù)從前至后依次附加到線程中--只是數(shù)量上,實(shí)際每個線程執(zhí)行的任務(wù)都還是連續(xù)的。如果出現(xiàn)那種僧多(線程) 粥(任務(wù)) 少的情況,實(shí)際啟動的線程數(shù)就等于任務(wù)數(shù),一挑一。這里只實(shí)現(xiàn)了每個線程各掃自家門前雪,動作快的完成后眼見別的線程再累都是愛莫能助。

實(shí)現(xiàn)及演示代碼如下:由三個類實(shí)現(xiàn),寫在了一個 Java 文件中:TaskDistributor 為任務(wù)分發(fā)器,Task 為待執(zhí)行的任務(wù),WorkThread 為自定的工作線程。代碼中運(yùn)用了命令模式,如若能配以監(jiān)聽器,用上觀察者模式來控制 UI 顯示就更絕妙不過了,就能實(shí)現(xiàn)像下載中的區(qū)塊著色跳躍的動感了,在此定義下一步的著眼點(diǎn)了。

代碼中有較為詳細(xì)的注釋,看這些注釋和執(zhí)行結(jié)果就很容易理解的。main() 是測試方法

 
 
 
  1. package com.unmi.common;   
  2.   import java.util.ArrayList;   
  3.   import java.util.List;   
  4.   /**   
  5.   * 指派任務(wù)列表給線程的分發(fā)器   
  6.   * @author Unmi   
  7.   * QQ: 1125535 Email: fantasia@sina.com   
  8.   * MSN: kypfos@msn.com 2008-03-25   
  9.   */   
  10.   public class TaskDistributor {   
  11.   /**   
  12.   * 測試方法   
  13.   * @param args   
  14.   */   
  15.   public static void main(String[] args) {   
  16.   //初始化要執(zhí)行的任務(wù)列表   
  17.   List taskList = new ArrayList();   
  18.   for (int i = 0; i < 108; i++) {   
  19.   taskList.add(new Task(i));   
  20.   }   
  21.   //設(shè)定要啟動的工作線程數(shù)為 5 個   
  22.   int threadCount = 5;   
  23.   List[] taskListPerThread = distributeTasks(taskList, threadCount);   
  24.   System.out.println("實(shí)際要啟動的工作線程數(shù):"+taskListPerThread.length);   
  25.   for (int i = 0; i < taskListPerThread.length; i++) {   
  26.   Thread workThread = new WorkThread(taskListPerThread[i],i);   
  27.   workThread.start();   
  28.   }   
  29.   }   
  30.   /**   
  31.   * 把 List 中的任務(wù)分配給每個線程,先平均分配,剩于的依次附加給前面的線程   
  32.   * 返回的數(shù)組有多少個元素 (List) 就表明將啟動多少個工作線程   
  33.   * @param taskList 待分派的任務(wù)列表   
  34.   * @param threadCount 線程數(shù)   
  35.   * @return 列表的數(shù)組,每個元素中存有該線程要執(zhí)行的任務(wù)列表   
  36.   */   
  37.   public static List[] distributeTasks(List taskList, int threadCount) {   
  38.   // 每個線程至少要執(zhí)行的任務(wù)數(shù),假如不為零則表示每個線程都會分配到任務(wù)   
  39.   int minTaskCount = taskList.size() / threadCount;   
  40.   // 平均分配后還剩下的任務(wù)數(shù),不為零則還有任務(wù)依個附加到前面的線程中   
  41.   int remainTaskCount = taskList.size() % threadCount;   
  42.   // 實(shí)際要啟動的線程數(shù),如果工作線程比任務(wù)還多   
  43.   // 自然只需要啟動與任務(wù)相同個數(shù)的工作線程,一對一的執(zhí)行   
  44.   // 畢竟不打算實(shí)現(xiàn)了線程池,所以用不著預(yù)先初始化好休眠的線程   
  45.   int actualThreadCount = minTaskCount > 0 ? threadCount : remainTaskCount;   
  46.   // 要啟動的線程數(shù)組,以及每個線程要執(zhí)行的任務(wù)列表   
  47.   List[] taskListPerThread = new List[actualThreadCount];   
  48.   int taskIndex = 0;   
  49.   //平均分配后多余任務(wù),每附加給一個線程后的剩余數(shù),重新聲明與 remainTaskCount   
  50.   //相同的變量,不然會在執(zhí)行中改變 remainTaskCount 原有值,產(chǎn)生麻煩   
  51.   int remainIndces = remainTaskCount;   
  52.   for (int i = 0; i < taskListPerThread.length; i++) {   
  53.   taskListPerThread[i] = new ArrayList();   
  54.   // 如果大于零,線程要分配到基本的任務(wù)   
  55.   if (minTaskCount > 0) {   
  56.   for (int j = taskIndex; j < minTaskCount + taskIndex; j++) {   
  57.   taskListPerThread[i].add(taskList.get(j));   
  58.   }   
  59.   taskIndex += minTaskCount;   
  60.   }   
  61.   // 假如還有剩下的,則補(bǔ)一個到這個線程中   
  62.   if (remainIndces > 0) {   
  63.   taskListPerThread[i].add(taskList.get(taskIndex++));   
  64.   remainIndces--;   
  65.   }   
  66.   }   
  67.   // 打印任務(wù)的分配情況   
  68.   for (int i = 0; i < taskListPerThread.length; i++) {   
  69.   System.out.println("線程 " + i + " 的任務(wù)數(shù):" +    
  70.  
  71.   taskListPerThread[i].size() + " 區(qū)間["   
  72.   + taskListPerThread[i].get(0).getTaskId() + ","   
  73.   + taskListPerThread[i].get(taskListPerThread[i].size() - 1).getTaskId() + "]");   
  74.   }   
  75.   return taskListPerThread;   
  76.   }   
  77.   }   
  78.   /**   
  79.   * 要執(zhí)行的任務(wù),可在執(zhí)行時改變它的某個狀態(tài)或調(diào)用它的某個操作   
  80.   * 例如任務(wù)有三個狀態(tài),就緒,運(yùn)行,完成,默認(rèn)為就緒態(tài)   
  81.   * 要進(jìn)一步完善,可為 Task 加上狀態(tài)變遷的監(jiān)聽器,因之決定UI的顯示   
  82.   */   
  83.   class Task {   
  84.   public static final int READY = 0;   
  85.   public static final int RUNNING = 1;   
  86.   public static final int FINISHED = 2;   
  87.   private int status;   
  88.   //聲明一個任務(wù)的自有業(yè)務(wù)含義的變量,用于標(biāo)識任務(wù)   
  89.   private int taskId;   
  90.   //任務(wù)的初始化方法   
  91.   public Task(int taskId){   
  92.   this.status = READY;   
  93.   this.taskId = taskId;   
  94.   }   
  95.   /**   
  96.   * 執(zhí)行任務(wù)   
  97.   */   
  98.   public void execute() {   
  99.   // 設(shè)置狀態(tài)為運(yùn)行中   
  100.   setStatus(Task.RUNNING);   
  101.   System.out.println("當(dāng)前線程 ID 是:" + Thread.currentThread().getName()   
  102.   +" | 任務(wù) ID 是:"+this.taskId);   
  103.   // 附加一個延時   
  104.   try {   
  105.   Thread.sleep(1000);   
  106.   } catch (InterruptedException e) {   
  107.   e.printStackTrace();   
  108.   }   
  109.   // 執(zhí)行完成,改狀態(tài)為完成   
  110.   setStatus(FINISHED);   
  111.   }   
  112.   public void setStatus(int status) {   
  113.   this.status = status;   
  114.   }   
  115.   public int getTaskId() {   
  116.   return taskId;   
  117.   }   
  118.   }   
  119.   /**   
  120.   * 自定義的工作線程,持有分派給它執(zhí)行的任務(wù)列表   
  121.   */   
  122.   class WorkThread extends Thread {   
  123.   //本線程待執(zhí)行的任務(wù)列表,你也可以指為任務(wù)索引的起始值   
  124.   private List taskList = null;   
  125.   private int threadId;   
  126.   /**   
  127.   * 構(gòu)造工作線程,為其指派任務(wù)列表,及命名線程 ID   
  128.   * @param taskList 欲執(zhí)行的任務(wù)列表   
  129.   * @param threadId 線程 ID   
  130.   */   
  131.   public WorkThread(List taskList,int threadId) {   
  132.   this.taskList = taskList;   
  133.   this.threadId = threadId;   
  134.   }   
  135.   /**   
  136.   * 執(zhí)行被指派的所有任務(wù)   
  137.   */   
  138.   public void run() {   
  139.   for (Task task : taskList) {   
  140.   task.execute();   
  141.   }   
  142.   }   
  143.   } 

執(zhí)行結(jié)果如下,注意觀察每個Java多線程分配到的任務(wù)數(shù)量及區(qū)間。直到所有的線程完成了所分配到的任務(wù)后程序結(jié)束:

 
 
 
  1. 線程 0 的任務(wù)數(shù):22 區(qū)間[0,21]   
  2. 線程 1 的任務(wù)數(shù):22 區(qū)間[22,43]   
  3. 線程 2 的任務(wù)數(shù):22 區(qū)間[44,65]   
  4. 線程 3 的任務(wù)數(shù):21 區(qū)間[66,86]   
  5. 線程 4 的任務(wù)數(shù):21 區(qū)間[87,107]   
  6. 實(shí)際要啟動的工作線程數(shù):5   
  7. 當(dāng)前線程 ID 是:Thread-0 | 任務(wù) ID 是:0   
  8. 當(dāng)前線程 ID 是:Thread-1 | 任務(wù) ID 是:22   
  9. 當(dāng)前線程 ID 是:Thread-2 | 任務(wù) ID 是:44   
  10. 當(dāng)前線程 ID 是:Thread-3 | 任務(wù) ID 是:66   
  11. 當(dāng)前線程 ID 是:Thread-4 | 任務(wù) ID 是:87   
  12. 當(dāng)前線程 ID 是:Thread-0 | 任務(wù) ID 是:1   
  13. 當(dāng)前線程 ID 是:Thread-1 | 任務(wù) ID 是:23   
  14. 當(dāng)前線程 ID 是:Thread-2 | 任務(wù) ID 是:45 

上面坦白來只算是基本功夫,貼出來還真見笑了。還有更為復(fù)雜的功能.

像Java多線程的下載工具的確更充分利用了網(wǎng)絡(luò)資源,而且像 FlashGet、NetAnts 都實(shí)現(xiàn)了:假如某個線程下載完了欲先所分配段的內(nèi)容之后,會幫其他線程下載未完成數(shù)據(jù),直到任務(wù)完成;或某一下載線程的未完成段區(qū)間已經(jīng)很小了,用不著別人來幫忙時,這就涉及到任務(wù)的進(jìn)一步分配。再如,以上兩個工具都能動態(tài)增加、減小或中止線程,越說越復(fù)雜了,它們原本比這復(fù)雜多了,這些實(shí)現(xiàn)可能定義各種隊(duì)列來實(shí)現(xiàn),如未完成任務(wù)隊(duì)列、下載中任務(wù)隊(duì)列和已完成隊(duì)列等。


新聞標(biāo)題:使用Java多線程實(shí)現(xiàn)任務(wù)分發(fā)
分享URL:http://www.dlmjj.cn/article/dpedhii.html