日本综合一区二区|亚洲中文天堂综合|日韩欧美自拍一区|男女精品天堂一区|欧美自拍第6页亚洲成人精品一区|亚洲黄色天堂一区二区成人|超碰91偷拍第一页|日韩av夜夜嗨中文字幕|久久蜜综合视频官网|精美人妻一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
Golang channel 使用總結(jié)

不同于傳統(tǒng)的多線程并發(fā)模型使用共享內(nèi)存來實現(xiàn)線程間通信的方式,golang 的哲學(xué)是通過 channel 進行協(xié)程 (goroutine) 之間的通信來實現(xiàn)數(shù)據(jù)共享:

Do not communicate by sharing memory; instead, share memory by communicating.

這種方式的優(yōu)點是通過提供原子的通信原語,避免了競態(tài)情形 (race condition) 下復(fù)雜的鎖機制。channel 可以看成一個 FIFO 隊列,對 FIFO 隊列的讀寫都是原子的操作,不需要加鎖。對 channel 的操作行為結(jié)果總結(jié)如下:

操作nil channelclosed channelnot-closed non-nil channel
closepanicpanic成功 close
寫 ch <-一直阻塞panic阻塞或成功寫入數(shù)據(jù)
讀 <- ch一直阻塞讀取對應(yīng)類型零值阻塞或成功讀取數(shù)據(jù)

讀取一個已關(guān)閉的 channel 時,總是能讀取到對應(yīng)類型的零值,為了和讀取非空未關(guān)閉 channel 的行為區(qū)別,可以使用兩個接收值:

 
 
 
 
  1. // ok is false when ch is closed  
  2. v, ok := <-ch 

golang 中大部分類型都是值類型(只有 slice / channel / map 是引用類型),讀/寫類型是值類型的 channel 時,如果元素 size 比較大時,應(yīng)該使用指針代替,避免頻繁的內(nèi)存拷貝開銷。

內(nèi)部實現(xiàn)

如圖所示,在 channel 的內(nèi)部實現(xiàn)中(具體定義在 $GOROOT/src/runtime/chan.go 里),維護了 3 個隊列:

  • 讀等待協(xié)程隊列 recvq,維護了阻塞在讀此 channel 的協(xié)程列表
  •  寫等待協(xié)程隊列 sendq,維護了阻塞在寫此 channel 的協(xié)程列表
  •  緩沖數(shù)據(jù)隊列 buf,用環(huán)形隊列實現(xiàn),不帶緩沖的 channel 此隊列 size 則為 0

img

當(dāng)協(xié)程嘗試從未關(guān)閉的 channel 中讀取數(shù)據(jù)時,內(nèi)部的操作如下:

    1.  當(dāng) buf 非空時,此時 recvq 必為空,buf 彈出一個元素給讀協(xié)程,讀協(xié)程獲得數(shù)據(jù)后繼續(xù)執(zhí)行,此時若 sendq 非空,則從 sendq 中彈出一個寫協(xié)程轉(zhuǎn)入 running 狀態(tài),待寫數(shù)據(jù)入隊列 buf ,此時讀取操作 <- ch 未阻塞;

    2.  當(dāng) buf 為空但 sendq 非空時(不帶緩沖的 channel),則從 sendq 中彈出一個寫協(xié)程轉(zhuǎn)入 running 狀態(tài),待寫數(shù)據(jù)直接傳遞給讀協(xié)程,讀協(xié)程繼續(xù)執(zhí)行,此時讀取操作 <- ch 未阻塞;

    3.  當(dāng) buf 為空并且 sendq 也為空時,讀協(xié)程入隊列 recvq 并轉(zhuǎn)入 blocking 狀態(tài),當(dāng)后續(xù)有其他協(xié)程往 channel 寫數(shù)據(jù)時,讀協(xié)程才會重新轉(zhuǎn)入 running 狀態(tài),此時讀取操作 <- ch 阻塞。

類似的,當(dāng)協(xié)程嘗試往未關(guān)閉的 channel 中寫入數(shù)據(jù)時,內(nèi)部的操作如下:

  1.  當(dāng)隊列 recvq 非空時,此時隊列 buf 必為空,從 recvq 彈出一個讀協(xié)程接收待寫數(shù)據(jù),此讀協(xié)程此時結(jié)束阻塞并轉(zhuǎn)入 running 狀態(tài),寫協(xié)程繼續(xù)執(zhí)行,此時寫入操作 ch <- 未阻塞;
  2.  當(dāng)隊列 recvq 為空但 buf 未滿時,此時 sendq 必為空,寫協(xié)程的待寫數(shù)據(jù)入 buf 然后繼續(xù)執(zhí)行,此時寫入操作 ch <- 未阻塞;
  3.  當(dāng)隊列 recvq 為空并且 buf 為滿時,此時寫協(xié)程入隊列 sendq 并轉(zhuǎn)入 blokcing 狀態(tài),當(dāng)后續(xù)有其他協(xié)程從 channel 中讀數(shù)據(jù)時,寫協(xié)程才會重新轉(zhuǎn)入 running 狀態(tài),此時寫入操作 ch <- 阻塞。

當(dāng)關(guān)閉 non-nil channel 時,內(nèi)部的操作如下:

  1.   當(dāng)隊列 recvq 非空時,此時 buf 必為空,recvq 中的所有協(xié)程都將收到對應(yīng)類型的零值然后結(jié)束阻塞狀態(tài);
  2.   當(dāng)隊列 sendq 非空時,此時 buf 必為滿,sendq 中的所有協(xié)程都會產(chǎn)生 panic ,在 buf 中數(shù)據(jù)仍然會保留直到被其他協(xié)程讀取。

使用場景

除了常規(guī)的用來在協(xié)程之間傳遞數(shù)據(jù)外,本節(jié)列出了一些特殊的使用 channel 的場景。

futures / promises

golang 雖然沒有直接提供 futrue / promise 模型的操作原語,但通過 goroutine 和 channel 可以實現(xiàn)類似的功能:

 
 
 
 
  1. package main  
  2. import (  
  3.     "io/ioutil"  
  4.     "log"  
  5.     "net/http"  
  6. )  
  7. // RequestFuture, http request promise. 
  8. func RequestFuture(url string) <-chan []byte {  
  9.     c := make(chan []byte, 1) 
  10.     go func() {  
  11.         var body []byte  
  12.         defer func() {  
  13.             c <- body  
  14.         }()  
  15.         res, err := http.Get(url)  
  16.         if err != nil { 
  17.              return  
  18.         }  
  19.         defer res.Body.Close()  
  20.         body, _ = ioutil.ReadAll(res.Body)  
  21.     }() 
  22.     return c  
  23. }  
  24. func main() {  
  25.     future := RequestFuture("https://api.github.com/users/octocat/orgs")  
  26.     body := <-future  
  27.     log.Printf("reponse length: %d", len(body))  

條件變量 (condition variable)

類型于 POSIX 接口中線程通知其他線程某個事件發(fā)生的條件變量,channel 的特性也可以用來當(dāng)成協(xié)程之間同步的條件變量。因為 channel 只是用來通知,所以 channel 中具體的數(shù)據(jù)類型和值并不重要,這種場景一般用 strct {} 作為 channel 的類型。

一對一通知

類似 pthread_cond_signal() 的功能,用來在一個協(xié)程中通知另個某一個協(xié)程事件發(fā)生:

 
 
 
 
  1. package main  
  2. import (  
  3.     "fmt"  
  4.     "time"  
  5. )  
  6. func main() {  
  7.     ch := make(chan struct{})  
  8.     nums := make([]int, 100)  
  9.     go func() {  
  10.         time.Sleep(time.Second)  
  11.         for i := 0; i < len(nums); i++ {  
  12.             nums[i] = i  
  13.         }  
  14.         // send a finish signal  
  15.         ch <- struct{}{}  
  16.     }()  
  17.     // wait for finish signal  
  18.     <-ch  
  19.     fmt.Println(nums)  

廣播通知

類似 pthread_cond_broadcast() 的功能。利用從已關(guān)閉的 channel 讀取數(shù)據(jù)時總是非阻塞的特性,可以實現(xiàn)在一個協(xié)程中向其他多個協(xié)程廣播某個事件發(fā)生的通知:

 
 
 
 
  1. package main  
  2. import (  
  3.     "fmt"  
  4.     "time"  
  5. )  
  6. func main() {  
  7.     N := 10  
  8.     exit := make(chan struct{})  
  9.     done := make(chan struct{}, N)  
  10.     // start N worker goroutines  
  11.     for i := 0; i < N; i++ {  
  12.         go func(n int) {  
  13.             for {  
  14.                 select {  
  15.                 // wait for exit signal  
  16.                 case <-exit:  
  17.                     fmt.Printf("worker goroutine #%d exit\n", n)  
  18.                     done <- struct{}{}  
  19.                     return  
  20.                 case <-time.After(time.Second):  
  21.                     fmt.Printf("worker goroutine #%d is working...\n", n)  
  22.                 }  
  23.             }  
  24.         }(i)  
  25.     }  
  26.     time.Sleep(3 * time.Second)  
  27.     // broadcast exit signal  
  28.     close(exit)  
  29.     // wait for all worker goroutines exit  
  30.     for i := 0; i < N; i++ {  
  31.         <-done  
  32.     }  
  33.     fmt.Println("main goroutine exit")  

信號量

channel 的讀/寫相當(dāng)于信號量的 P / V 操作,下面的示例程序中 channel 相當(dāng)于信號量:

 
 
 
 
  1. package main  
  2. import (  
  3.     "log"  
  4.     "math/rand"  
  5.     "time"  
  6. )  
  7. type Seat int  
  8. type Bar chan Seat  
  9. func (bar Bar) ServeConsumer(customerId int) {  
  10.     log.Print("-> consumer#", customerId, " enters the bar")  
  11.     seat := <-bar // need a seat to drink  
  12.     log.Print("consumer#", customerId, " drinks at seat#", seat)  
  13.     time.Sleep(time.Second * time.Duration(2+rand.Intn(6)))  
  14.     log.Print("<- consumer#", customerId, " frees seat#", seat)  
  15.     bar <- seat // free the seat and leave the bar  
  16. }  
  17. func main() {  
  18.     rand.Seed(time.Now().UnixNano())  
  19.     bar24x7 := make(Bar, 10) // the bar has 10 seats  
  20.     // Place seats in an bar.  
  21.     for seatId := 0; seatId < cap(bar24x7); seatId++ {  
  22.         bar24x7 <- Seat(seatId) // none of the sends will block  
  23.     }  
  24.     // a new consumer try to enter the bar for each second  
  25.     for customerId := 0; ; customerId++ {  
  26.         time.Sleep(time.Second)  
  27.         go bar24x7.ServeConsumer(customerId)  
  28.     }  

互斥量

互斥量相當(dāng)于二元信號里,所以 cap 為 1 的 channel 可以當(dāng)成互斥量使用:

 
 
 
 
  1. package main  
  2. import "fmt"  
  3. func main() {  
  4.     mutex := make(chan struct{}, 1) // the capacity must be one  
  5.     counter := 0  
  6.     increase := func() {  
  7.         mutex <- struct{}{} // lock  
  8.         counter++  
  9.         <-mutex // unlock  
  10.     }  
  11.     increase1000 := func(done chan<- struct{}) {  
  12.         for i := 0; i < 1000; i++ {  
  13.             increase()  
  14.         }  
  15.         done <- struct{}{}  
  16.     }  
  17.     done := make(chan struct{})  
  18.     go increase1000(done)  
  19.     <-done; <-done  
  20.     fmt.Println(counter) // 2000  

關(guān)閉 channel

關(guān)閉不再需要使用的 channel 并不是必須的。跟其他資源比如打開的文件、socket 連接不一樣,這類資源使用完后不關(guān)閉后會造成句柄泄露,channel 使用完后不關(guān)閉也沒有關(guān)系,channel 沒有被任何協(xié)程用到后最終會被 GC 回收。關(guān)閉 channel 一般是用來通知其他協(xié)程某個任務(wù)已經(jīng)完成了。golang 也沒有直接提供判斷 channel 是否已經(jīng)關(guān)閉的接口,雖然可以用其他不太優(yōu)雅的方式自己實現(xiàn)一個:

 
 
 
 
  1. func isClosed(ch chan int) bool {  
  2.     select {  
  3.     case <-ch:  
  4.         return true  
  5.     default:  
  6.     }  
  7.     return false  

不過實現(xiàn)一個這樣的接口也沒什么必要。因為就算通過 isClosed() 得到當(dāng)前 channel 當(dāng)前還未關(guān)閉,如果試圖往 channel 里寫數(shù)據(jù),仍然可能會發(fā)生 panic ,因為在調(diào)用 isClosed() 后,其他協(xié)程可能已經(jīng)把 channel 關(guān)閉了。關(guān)閉 channel 時應(yīng)該注意以下準則:

  •  不要在讀取端關(guān)閉 channel ,因為寫入端無法知道 channel 是否已經(jīng)關(guān)閉,往已關(guān)閉的 channel 寫數(shù)據(jù)會 panic ;
  •  有多個寫入端時,不要再寫入端關(guān)閉 channle ,因為其他寫入端無法知道 channel 是否已經(jīng)關(guān)閉,關(guān)閉已經(jīng)關(guān)閉的 channel 會發(fā)生 panic ;
  •  如果只有一個寫入端,可以在這個寫入端放心關(guān)閉 channel 。

關(guān)閉 channel 粗暴一點的做法是隨意關(guān)閉,如果產(chǎn)生了 panic 就用 recover 避免進程掛掉。稍好一點的方案是使用標(biāo)準庫的 sync 包來做關(guān)閉 channel 時的協(xié)程同步,不過使用起來也稍微復(fù)雜些。下面介紹一種優(yōu)雅些的做法。

一寫多讀

這種場景下這個唯一的寫入端可以關(guān)閉 channel 用來通知讀取端所有數(shù)據(jù)都已經(jīng)寫入完成了。讀取端只需要用 for range 把 channel 中數(shù)據(jù)遍歷完就可以了,當(dāng) channel 關(guān)閉時,for range 仍然會將 channel 緩沖中的數(shù)據(jù)全部遍歷完然后再退出循環(huán):

 
 
 
 
  1. package main  
  2. import (  
  3.     "fmt"  
  4.     "sync"  
  5. )  
  6. func main() {  
  7.     wg := &sync.WaitGroup{}  
  8.     ch := make(chan int, 100)  
  9.     send := func() { 
  10.          for i := 0; i < 100; i++ {  
  11.             ch <- i  
  12.         }  
  13.         // signal sending finish  
  14.         close(ch) 
  15.      }  
  16.     recv := func(id int) {  
  17.         defer wg.Done()  
  18.         for i := range ch {  
  19.             fmt.Printf("receiver #%d get %d\n", id, i)  
  20.         }  
  21.         fmt.Printf("receiver #%d exit\n", id)  
  22.     }  
  23.     wg.Add(3)  
  24.     go recv(0)  
  25.     go recv(1)  
  26.     go recv(2)  
  27.     send()  
  28.     wg.Wait()  

多寫一讀

這種場景下雖然可以用 sync.Once 來解決多個寫入端重復(fù)關(guān)閉 channel 的問題,但更優(yōu)雅的辦法設(shè)置一個額外的 channel ,由讀取端通過關(guān)閉來通知寫入端任務(wù)完成不要再繼續(xù)再寫入數(shù)據(jù)了:

 
 
 
 
  1. package main  
  2. import (  
  3.     "fmt"  
  4.     "sync"  
  5. )  
  6. func main() {  
  7.     wg := &sync.WaitGroup{}  
  8.     ch := make(chan int, 100)  
  9.     done := make(chan struct{})  
  10.     send := func(id int) {  
  11.         defer wg.Done()  
  12.         for i := 0; ; i++ {  
  13.             select {  
  14.             case <-done:  
  15.                 // get exit signal  
  16.                 fmt.Printf("sender #%d exit\n", id)  
  17.                 return  
  18.             case ch <- id*1000 + i:  
  19.             }  
  20.         }  
  21.     }  
  22.     recv := func() {  
  23.         count := 0  
  24.         for i := range ch {  
  25.             fmt.Printf("receiver get %d\n", i)  
  26.             count++  
  27.             if count >= 1000 {  
  28.                 // signal recving finish  
  29.                 close(done)  
  30.                 return  
  31.             }  
  32.         }  
  33.     }  
  34.     wg.Add(3)  
  35.     go send(0)  
  36.     go send(1)  
  37.     go send(2)  
  38.     recv()  
  39.     wg.Wait() 

多寫多讀

這種場景稍微復(fù)雜,和上面的例子一樣,也需要設(shè)置一個額外 channel 用來通知多個寫入端和讀取端。另外需要起一個額外的協(xié)程來通過關(guān)閉這個 channel 來廣播通知:

 
 
 
 
  1. package main  
  2. import (  
  3.     "fmt"  
  4.     "sync"  
  5.     "time"  
  6. func main() {  
  7.     wg := &sync.WaitGroup{}  
  8.     ch := make(chan int, 100)  
  9.     done := make(chan struct{}) 
  10.     send := func(id int) {  
  11.         defer wg.Done()  
  12.         for i := 0; ; i++ {  
  13.             select {  
  14.             case <-done:  
  15.                 // get exit signal  
  16.                 fmt.Printf("sender #%d exit\n", id)  
  17.                 return  
  18.             case ch <- id*1000 + i:  
  19.             }  
  20.         }  
  21.     }  
  22.     recv := func(id int) {  
  23.         defer wg.Done()  
  24.         for {  
  25.             select { 
  26.              case <-done:  
  27.                 // get exit signal  
  28.                 fmt.Printf("receiver #%d exit\n", id)  
  29.                 return  
  30.             case i := <-ch:  
  31.                 fmt.Printf("receiver #%d get %d\n", id, i)  
  32.                 time.Sleep(time.Millisecond)  
  33.             }  
  34.         }  
  35.     }  
  36.     wg.Add(6)  
  37.     go send(0)  
  38.     go send(1)  
  39.     go send(2)  
  40.     go recv(0)  
  41.     go recv(1)  
  42.     go recv(2)   
  43.     time.Sleep(time.Second)  
  44.     // signal finish  
  45.     close(done)  
  46.     // wait all sender and receiver exit  
  47.     wg.Wait()  

總結(jié)

channle 作為 golang 最重要的特性,用起來還是比較爽的。傳統(tǒng)的 C 里要實現(xiàn)類型的功能的話,一般需要用到 socket 或者 FIFO 來實現(xiàn),另外還要考慮數(shù)據(jù)包的完整性與并發(fā)沖突的問題,channel 則屏蔽了這些底層細節(jié),使用者只需要考慮讀寫就可以了。channel 是引用類型,了解一下 channel 底層的機制對更好的使用 channel 還是很用必要的。雖然操作原語簡單,但涉及到阻塞的問題,使用不當(dāng)可能會造成死鎖或者無限制的協(xié)程創(chuàng)建最終導(dǎo)致進程掛掉。

channel 除在可以用來在協(xié)程之間通信外,其阻塞和喚醒協(xié)程的特性也可以用作協(xié)程之間的同步機制,文中也用示例簡單介紹了這種場景下的用法。

關(guān)閉 channel 并不是必須的,只要沒有協(xié)程沒用引用 channel ,最終會被 GC 清理。所以使用的時候要特別注意,不要讓協(xié)程阻塞在 channel 上,這種情況很難檢測到,而且會造成 channel 和阻塞在 channel 的協(xié)程占有的資源無法被 GC 清理最終導(dǎo)致內(nèi)存泄露。

channle 方便 golang 程序使用 CSP 的編程范形,但是 golang 是一種多范形的編程語言,golang 也支持傳統(tǒng)的通過共享內(nèi)存來通信的編程方式。終極的原則是根據(jù)場景選擇合適的編程范型,不要因為 channel 好用而濫用 CSP 。


分享文章:Golang channel 使用總結(jié)
標(biāo)題路徑:http://www.dlmjj.cn/article/coscjjd.html