日本综合一区二区|亚洲中文天堂综合|日韩欧美自拍一区|男女精品天堂一区|欧美自拍第6页亚洲成人精品一区|亚洲黄色天堂一区二区成人|超碰91偷拍第一页|日韩av夜夜嗨中文字幕|久久蜜综合视频官网|精美人妻一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務時間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
一個Demo學會WorkerPool

本文轉(zhuǎn)載自微信公眾號「Golang來啦」,作者Seekload。轉(zhuǎn)載本文請聯(lián)系Golang來啦公眾號。

成都創(chuàng)新互聯(lián)公司是工信部頒發(fā)資質(zhì)IDC服務器商,為用戶提供優(yōu)質(zhì)的服務器托管服務

四哥水平有限,如有翻譯或理解錯誤,煩請幫忙指出,感謝!

今天給大家分享一篇關(guān)于 workPool 的文章,這個平時大家應該用的比較多,一起來看下。

原文如下:

工作池是這樣一個池子,會創(chuàng)建指定數(shù)量的 worker,這些 worker 能獲取任務并處理。允許多個任務同時處理,但是需要維持固定數(shù)量的 worker 避免系統(tǒng)資源被過度使用。

通常有兩種方式創(chuàng)建任務池:

  • 一種是預先創(chuàng)建固定數(shù)量的 worker;
  • 另外一種是當有需要的時候才會創(chuàng)建 worker,當然也會有數(shù)量限制;

本文將與大家一起討論第一種方式。當我們預先知道有許多任務需要同時運行,并且很大概率會用上最大數(shù)量的 worker,通常會采用這種方式。

為了演示,我們先創(chuàng)建 Worker 結(jié)構(gòu)體,它獲取任務并執(zhí)行。

 
 
 
  1. import ( 
  2.  "fmt" 
  3.  
  4. // Worker ... 
  5. type Worker struct { 
  6.  ID       int 
  7.  Name     string 
  8.  StopChan chan bool 
  9.  
  10. // Start ... 
  11. func (w *Worker) Start(jobQueue chan Job) { 
  12.  w.StopChan = make(chan bool) 
  13.  successChan := make(chan bool) 
  14.  
  15.  go func() { 
  16.   successChan <- true 
  17.   for { 
  18.    // take job 
  19.    job := <-jobQueue 
  20.    if job != nil { 
  21.     job.Start(w) 
  22.    } else { 
  23.     fmt.Printf("worker %s to be stopped\n", w.Name) 
  24.     w.StopChan <- true 
  25.     break 
  26.    } 
  27.   } 
  28.  }() 
  29.  
  30.  // wait for the worker to start 
  31.  <-successChan 
  32.  
  33. // Stop ... 
  34. func (w *Worker) Stop() { 
  35.  // wait for the worker to stop, blocking 
  36.  _ = <-w.StopChan 
  37.  fmt.Printf("worker %s stopped\n", w.Name) 

Worker 有一些屬性保存當前的狀態(tài),另外還聲明了兩個方法分別用于啟動、停止 worker。

在 Start() 方法里,創(chuàng)建了兩個 channel 分別用于 worker 的啟動和停止。最重要的是 for 循環(huán)里面,worker 會一直等待獲取 job 并可執(zhí)行的直到任務隊列關(guān)閉。

Job 是包含單個方法 Start() 的接口,所以只要實現(xiàn) Start() 方法就可以有不同類型的 job。

 
 
 
  1. // Job ... 
  2. type Job interface { 
  3.  Start(worker *Worker) error 

一旦 Worker 確定之后,接下來就是創(chuàng)建 pool 來管理 workers。

 
 
 
  1. import ( 
  2.  "fmt" 
  3.  "sync" 
  4.  
  5. // Pool ... 
  6. type Pool struct { 
  7.  Name string 
  8.  
  9.  Size    int 
  10.  Workers []*Worker 
  11.  
  12.  QueueSize int 
  13.  Queue     chan Job 
  14.  
  15. // Initiualize ... 
  16. func (p *Pool) Initialize() { 
  17.  // maintain minimum 1 worker 
  18.  if p.Size < 1 { 
  19.   p.Size = 1 
  20.  } 
  21.  p.Workers = []*Worker{} 
  22.  for i := 1; i <= p.Size; i++ { 
  23.   worker := &Worker{ 
  24.    ID:   i - 1, 
  25.    Name: fmt.Sprintf("%s-worker-%d", p.Name, i-1), 
  26.   } 
  27.   p.Workers = append(p.Workers, worker) 
  28.  } 
  29.  
  30.  // maintain min queue size as 1 
  31.  if p.QueueSize < 1 { 
  32.   p.QueueSize = 1 
  33.  } 
  34.  p.Queue = make(chan Job, p.QueueSize) 
  35.  
  36. // Start ... 
  37. func (p *Pool) Start() { 
  38.  for _, worker := range p.Workers { 
  39.   worker.Start(p.Queue) 
  40.  } 
  41.  fmt.Println("all workers started") 
  42.  
  43. // Stop ... 
  44. func (p *Pool) Stop() { 
  45.  close(p.Queue) // close the queue channel 
  46.  
  47.  var wg sync.WaitGroup 
  48.  for _, worker := range p.Workers { 
  49.   wg.Add(1) 
  50.   go func(w *Worker) { 
  51.    defer wg.Done() 
  52.  
  53.    w.Stop() 
  54.   }(worker) 
  55.  } 
  56.  wg.Wait() 
  57.  fmt.Println("all workers stopped") 

Pool 包含 worker 切片和用于保存 job 的隊列。worker 的數(shù)量在初始化的時候是可以自定義。

關(guān)鍵點在 Stop() 的邏輯,當它被調(diào)用時,會先關(guān)閉 job 隊列,worker 便會從 job 隊列讀到 nil,接著就會關(guān)閉對應的 worker。接著在 for 循環(huán)里,等待 worker 并發(fā)地停止直到最后一個 worker 停止。

為了演示整體邏輯,下面的例子展示了一個僅僅輸出值的 job。

 
 
 
  1. import "fmt" 
  2.  
  3. func main() { 
  4.  pool := &Pool{ 
  5.   Name:      "test", 
  6.   Size:      5, 
  7.   QueueSize: 20, 
  8.  } 
  9.  pool.Initialize() 
  10.  pool.Start() 
  11.         defer pool.Stop() 
  12.  
  13.  for i := 1; i <= 100; i++ { 
  14.   job := &PrintJob{ 
  15.    Index: i, 
  16.   } 
  17.   pool.Queue <- job 
  18.  } 
  19.  
  20. // PrintJob ... 
  21. type PrintJob struct { 
  22.  Index int 
  23.  
  24. func (pj *PrintJob) Start(worker *Worker) error { 
  25.  fmt.Printf("job %s - %d\n", worker.Name, pj.Index) 
  26.  return nil 

如果你看了上面的代碼邏輯,就會發(fā)現(xiàn)很簡單,創(chuàng)建了有 5 個 worker 的工作池并且 job 隊列的大小是 20。

接著,模擬 job 創(chuàng)建和處理過程:一旦 job 被創(chuàng)建就會 push 到任務隊列里,等待著的 worker 便會從隊列里取出 job 并處理。

類似下面這樣的輸出:

 
 
 
  1. all workers started 
  2. job test-worker-3 - 4 
  3. job test-worker-3 - 6 
  4. job test-worker-3 - 7 
  5. job test-worker-3 - 8 
  6. job test-worker-3 - 9 
  7. job test-worker-3 - 10 
  8. job test-worker-3 - 11 
  9. job test-worker-3 - 12 
  10. job test-worker-3 - 13 
  11. job test-worker-3 - 14 
  12. job test-worker-3 - 15 
  13. job test-worker-3 - 16 
  14. job test-worker-3 - 17 
  15. job test-worker-3 - 18 
  16. job test-worker-3 - 19 
  17. job test-worker-3 - 20 
  18. worker test-worker-3 to be stopped 
  19. job test-worker-4 - 5 
  20. job test-worker-0 - 1 
  21. worker test-worker-3 stopped 
  22. job test-worker-2 - 3 
  23. worker test-worker-2 to be stopped 
  24. worker test-worker-2 stopped 
  25. worker test-worker-4 to be stopped 
  26. worker test-worker-4 stopped 
  27. worker test-worker-0 to be stopped 
  28. worker test-worker-0 stopped 
  29. job test-worker-1 - 2 
  30. worker test-worker-1 to be stopped 
  31. worker test-worker-1 stopped 
  32. all workers stopped 

via:https://www.pixelstech.net/article/1611483826-Demo-on-creating-worker-pool-in-GoLang

作者:sonic0002


當前標題:一個Demo學會WorkerPool
文章來源:http://www.dlmjj.cn/article/ccogchh.html