日本综合一区二区|亚洲中文天堂综合|日韩欧美自拍一区|男女精品天堂一区|欧美自拍第6页亚洲成人精品一区|亚洲黄色天堂一区二区成人|超碰91偷拍第一页|日韩av夜夜嗨中文字幕|久久蜜综合视频官网|精美人妻一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
Go并發(fā)編程中的經(jīng)驗(yàn)教訓(xùn)

通過學(xué)習(xí)如何定位并發(fā)處理的陷阱來避免未來處理這些問題時的困境。

樂平網(wǎng)站建設(shè)公司創(chuàng)新互聯(lián),樂平網(wǎng)站設(shè)計(jì)制作,有大型網(wǎng)站制作公司豐富經(jīng)驗(yàn)。已為樂平數(shù)千家提供企業(yè)網(wǎng)站建設(shè)服務(wù)。企業(yè)網(wǎng)站搭建\成都外貿(mào)網(wǎng)站建設(shè)公司要多少錢,請找那個售后服務(wù)好的樂平做網(wǎng)站的公司定做!

在復(fù)雜的分布式系統(tǒng)進(jìn)行任務(wù)處理時,你通常會需要進(jìn)行并發(fā)的操作。在 Mode.net 公司,我們每天都要和實(shí)時、快速和靈活的軟件打交道。而沒有一個高度并發(fā)的系統(tǒng),就不可能構(gòu)建一個毫秒級的動態(tài)地路由數(shù)據(jù)包的全球?qū)S镁W(wǎng)絡(luò)。這個動態(tài)路由是基于網(wǎng)絡(luò)狀態(tài)的,盡管這個過程需要考慮眾多因素,但我們的重點(diǎn)是鏈路指標(biāo)。在我們的環(huán)境中,鏈路指標(biāo)可以是任何跟網(wǎng)絡(luò)鏈接的狀態(tài)和當(dāng)前屬性(如鏈接延遲)有關(guān)的任何內(nèi)容。

并發(fā)探測鏈接監(jiān)控

我們的動態(tài)路由算法 H.A.L.O.(逐跳自適應(yīng)鏈路狀態(tài)最佳路由Hop-by-Hop Adaptive Link-State Optimal Routing)部分依賴于鏈路指標(biāo)來計(jì)算路由表。這些指標(biāo)由位于每個 PoP(存活節(jié)點(diǎn)Point of Presence)上的獨(dú)立組件收集。PoP 是表示我們的網(wǎng)絡(luò)中單個路由實(shí)體的機(jī)器,通過鏈路連接并分布在我們的網(wǎng)絡(luò)拓?fù)渲械母鱾€位置。某個組件使用網(wǎng)絡(luò)數(shù)據(jù)包探測周圍的機(jī)器,周圍的機(jī)器回復(fù)數(shù)據(jù)包給前者。從接收到的探測包中可以獲得鏈路延遲。由于每個 PoP 都有不止一個臨近節(jié)點(diǎn),所以這種探測任務(wù)實(shí)質(zhì)上是并發(fā)的:我們需要實(shí)時測量每個臨近連接點(diǎn)的延遲。我們不能串行地處理;為了計(jì)算這個指標(biāo),必須盡快處理每個探測。

latency computation graph

序列號和重置:一個重新排列場景

我們的探測組件互相發(fā)送和接收數(shù)據(jù)包,并依靠序列號進(jìn)行數(shù)據(jù)包處理。這旨在避免處理重復(fù)的包或順序被打亂的包。我們的第一個實(shí)現(xiàn)依靠特殊的序列號 0 來重置序列號。這個數(shù)字僅在組件初始化時使用。主要的問題是我們考慮了遞增的序列號總是從 0 開始。在該組件重啟后,包的順序可能會重新排列,某個包的序列號可能會輕易地被替換成重置之前使用過的值。這意味著,后繼的包都會被忽略掉,直到排到重置之前用到的序列值。

UDP 握手和有限狀態(tài)機(jī)

這里的問題是該組件重啟前后的序列號是否一致。有幾種方法可以解決這個問題,經(jīng)過討論,我們選擇了實(shí)現(xiàn)一個帶有清晰狀態(tài)定義的三步握手協(xié)議。這個握手過程在初始化時通過鏈接建立會話。這樣可以確保節(jié)點(diǎn)通過同一個會話進(jìn)行通信且使用了適當(dāng)?shù)男蛄刑枴?/p>

為了正確實(shí)現(xiàn)這個過程,我們必須定義一個有清晰狀態(tài)和過渡的有限狀態(tài)機(jī)。這樣我們就可以正確管理握手過程中的所有極端情況。

finite state machine diagram

會話 ID 由握手的初始化程序生成。一個完整的交換順序如下:

  1. 發(fā)送者發(fā)送一個 SYN(ID) 數(shù)據(jù)包。
  2. 接收者存儲接收到的 ID 并發(fā)送一個 SYN-ACK(ID)
  3. 發(fā)送者接收到 SYN-ACK(ID) 并發(fā)送一個 ACK(ID)。它還發(fā)送一個從序列號 0 開始的數(shù)據(jù)包。
  4. 接收者檢查最后接收到的 ID,如果 ID 匹配,則接受 ACK(ID)。它還開始接受序列號為 0 的數(shù)據(jù)包。

處理狀態(tài)超時

基本上,每種狀態(tài)下你都需要處理最多三種類型的事件:鏈接事件、數(shù)據(jù)包事件和超時事件。這些事件會并發(fā)地出現(xiàn),因此你必須正確處理并發(fā)。

  • 鏈接事件包括網(wǎng)絡(luò)連接或網(wǎng)絡(luò)斷開的變化,相應(yīng)的初始化一個鏈接會話或斷開一個已建立的會話。
  • 數(shù)據(jù)包事件是控制數(shù)據(jù)包(SYN/SYN-ACK/ACK)或只是探測響應(yīng)。
  • 超時事件在當(dāng)前會話狀態(tài)的預(yù)定超時時間到期后觸發(fā)。

這里面臨的最主要的問題是如何處理并發(fā)的超時到期和其他事件。這里很容易陷入死鎖和資源競爭的陷阱。

第一種方法

本項(xiàng)目使用的語言是 Golang。它確實(shí)提供了原生的同步機(jī)制,如自帶的通道和鎖,并且能夠使用輕量級線程來進(jìn)行并發(fā)處理。

gopher 們聚眾狂歡

首先,你可以設(shè)計(jì)兩個分別表示我們的會話和超時處理程序的結(jié)構(gòu)體。

 
 
 
 
  1. type Session struct {  
  2.   State SessionState  
  3.   Id SessionId  
  4.   RemoteIp string  
  5. }
  6.  
  7. type TimeoutHandler struct {  
  8.   callback func(Session)  
  9.   session Session  
  10.   duration int  
  11.   timer *timer.Timer  
  12. }

Session 標(biāo)識連接會話,內(nèi)有表示會話 ID、臨近的連接點(diǎn)的 IP 和當(dāng)前會話狀態(tài)的字段。

TimeoutHandler 包含回調(diào)函數(shù)、對應(yīng)的會話、持續(xù)時間和指向調(diào)度計(jì)時器的指針。

每一個臨近連接點(diǎn)的會話都包含一個保存調(diào)度 TimeoutHandler 的全局映射。

 
 
 
 
  1. SessionTimeout map[Session]*TimeoutHandler

下面方法注冊和取消超時:

 
 
 
 
  1. // schedules the timeout callback function.  
  2. func (timeout* TimeoutHandler) Register() {  
  3.   timeout.timer = time.AfterFunc(time.Duration(timeout.duration) * time.Second, func() {  
  4.     timeout.callback(timeout.session)  
  5.   })  
  6. }
  7.  
  8. func (timeout* TimeoutHandler) Cancel() {  
  9.   if timeout.timer == nil {  
  10.     return  
  11.   }  
  12.   timeout.timer.Stop()  
  13. }

你可以使用類似下面的方法來創(chuàng)建和存儲超時:

 
 
 
 
  1. func CreateTimeoutHandler(callback func(Session), session Session, duration int) *TimeoutHandler {  
  2.   if sessionTimeout[session] == nil {  
  3.     sessionTimeout[session] := new(TimeoutHandler)  
  4.   }  
  5.    
  6.   timeout = sessionTimeout[session]  
  7.   timeout.session = session  
  8.   timeout.callback = callback  
  9.   timeout.duration = duration  
  10.   return timeout  
  11. }

超時處理程序創(chuàng)建后,會在經(jīng)過了設(shè)置的 duration 時間(秒)后執(zhí)行回調(diào)函數(shù)。然而,有些事件會使你重新調(diào)度一個超時處理程序(與 SYN 狀態(tài)時的處理一樣,每 3 秒一次)。

為此,你可以讓回調(diào)函數(shù)重新調(diào)度一次超時:

 
 
 
 
  1. func synCallback(session Session) {  
  2.   sendSynPacket(session)
  3.  
  4.   // reschedules the same callback.  
  5.   newTimeout := NewTimeoutHandler(synCallback, session, SYN_TIMEOUT_DURATION)  
  6.   newTimeout.Register()
  7.  
  8.   sessionTimeout[state] = newTimeout  
  9. }

這次回調(diào)在新的超時處理程序中重新調(diào)度自己,并更新全局映射 sessionTimeout。

數(shù)據(jù)競爭和引用

你的解決方案已經(jīng)有了??梢酝ㄟ^檢查計(jì)時器到期后超時回調(diào)是否執(zhí)行來進(jìn)行一個簡單的測試。為此,注冊一個超時,休眠 duration 秒,然后檢查是否執(zhí)行了回調(diào)的處理。執(zhí)行這個測試后,最好取消預(yù)定的超時時間(因?yàn)樗鼤匦抡{(diào)度),這樣才不會在下次測試時產(chǎn)生副作用。

令人驚訝的是,這個簡單的測試發(fā)現(xiàn)了這個解決方案中的一個問題。使用 cancel 方法來取消超時并沒有正確處理。以下順序的事件會導(dǎo)致數(shù)據(jù)資源競爭:

  1. 你有一個已調(diào)度的超時處理程序。
  2. 線程 1:
    • 你接收到一個控制數(shù)據(jù)包,現(xiàn)在你要取消已注冊的超時并切換到下一個會話狀態(tài)(如發(fā)送 SYN 后接收到一個 SYN-ACK
    • 你調(diào)用了 timeout.Cancel(),這個函數(shù)調(diào)用了 timer.Stop()。(請注意,Golang 計(jì)時器的停止不會終止一個已過期的計(jì)時器。)
  3. 線程 2:
    • 在取消調(diào)用之前,計(jì)時器已過期,回調(diào)即將執(zhí)行。
    • 執(zhí)行回調(diào),它調(diào)度一次新的超時并更新全局映射。
  4. 線程 1:
    • 切換到新的會話狀態(tài)并注冊新的超時,更新全局映射。

兩個線程并發(fā)地更新超時映射。最終結(jié)果是你無法取消注冊的超時,然后你也會丟失對線程 2 重新調(diào)度的超時的引用。這導(dǎo)致處理程序在一段時間內(nèi)持續(xù)執(zhí)行和重新調(diào)度,出現(xiàn)非預(yù)期行為。

鎖也解決不了問題

使用鎖也不能完全解決問題。如果你在處理所有事件和執(zhí)行回調(diào)之前加鎖,它仍然不能阻止一個過期的回調(diào)運(yùn)行:

 
 
 
 
  1. func (timeout* TimeoutHandler) Register() {  
  2.   timeout.timer = time.AfterFunc(time.Duration(timeout.duration) * time._Second_, func() {  
  3.     stateLock.Lock()  
  4.     defer stateLock.Unlock()
  5.  
  6.     timeout.callback(timeout.session)  
  7.   })  
  8. }

現(xiàn)在的區(qū)別就是全局映射的更新是同步的,但是這還是不能阻止在你調(diào)用 timeout.Cancel() 后回調(diào)的執(zhí)行 —— 這種情況出現(xiàn)在調(diào)度計(jì)時器過期了但是還沒有拿到鎖的時候。你還是會丟失一個已注冊的超時的引用。

使用取消通道

你可以使用取消通道,而不必依賴不能阻止到期的計(jì)時器執(zhí)行的 golang 函數(shù) timer.Stop()。

這是一個略有不同的方法。現(xiàn)在你可以不用再通過回調(diào)進(jìn)行遞歸地重新調(diào)度;而是注冊一個死循環(huán),這個循環(huán)接收到取消信號或超時事件時終止。

新的 Register() 產(chǎn)生一個新的 go 線程,這個線程在超時后執(zhí)行你的回調(diào),并在前一個超時執(zhí)行后調(diào)度新的超時。返回給調(diào)用方一個取消通道,用來控制循環(huán)的終止。

 
 
 
 
  1. func (timeout *TimeoutHandler) Register() chan struct{} {  
  2.   cancelChan := make(chan struct{})  
  3.    
  4.   go func () {  
  5.     select {  
  6.     case _ = <- cancelChan:  
  7.       return  
  8.     case _ = <- time.AfterFunc(time.Duration(timeout.duration) * time.Second):  
  9.       func () {  
  10.         stateLock.Lock()  
  11.         defer stateLock.Unlock()
  12.  
  13.         timeout.callback(timeout.session)  
  14.       } ()  
  15.     }  
  16.   } ()
  17.  
  18.   return cancelChan  
  19. }
  20.  
  21. func (timeout* TimeoutHandler) Cancel() {  
  22.   if timeout.cancelChan == nil {  
  23.     return  
  24.   }  
  25.   timeout.cancelChan <- struct{}{}  
  26. }

這個方法給你注冊的所有超時提供了取消通道。一個取消調(diào)用向通道發(fā)送一個空結(jié)構(gòu)體并觸發(fā)取消操作。然而,這并不能解決前面的問題;可能在你通過通道取消之前以及超時線程拿到鎖之前,超時時間就已經(jīng)到了。

這里的解決方案是,在拿到鎖之后,檢查一下超時范圍內(nèi)的取消通道。

 
 
 
 
  1.   case _ = <- time.AfterFunc(time.Duration(timeout.duration) * time.Second):  
  2.     func () {  
  3.       stateLock.Lock()  
  4.       defer stateLock.Unlock()  
  5.      
  6.       select {  
  7.       case _ = <- handler.cancelChan:  
  8.         return  
  9.       default:  
  10.         timeout.callback(timeout.session)  
  11.       }  
  12.     } ()  
  13.   }

最終,這可以確保在拿到鎖之后執(zhí)行回調(diào),不會觸發(fā)取消操作。

小心死鎖

這個解決方案看起來有效;但是還是有個隱患:死鎖。

請閱讀上面的代碼,試著自己找到它??紤]下描述的所有函數(shù)的并發(fā)調(diào)用。

這里的問題在取消通道本身。我們創(chuàng)建的是無緩沖通道,即發(fā)送的是阻塞調(diào)用。當(dāng)你在一個超時處理程序中調(diào)用取消函數(shù)時,只有在該處理程序被取消后才能繼續(xù)處理。問題出現(xiàn)在,當(dāng)你有多個調(diào)用請求到同一個取消通道時,這時一個取消請求只被處理一次。當(dāng)多個事件同時取消同一個超時處理程序時,如連接斷開或控制包事件,很容易出現(xiàn)這種情況。這會導(dǎo)致死鎖,可能會使應(yīng)用程序停機(jī)。

有人在聽嗎?

(已獲得 Trevor Forrey 授權(quán)。)

這里的解決方案是創(chuàng)建通道時指定緩存大小至少為 1,這樣向通道發(fā)送數(shù)據(jù)就不會阻塞,也顯式地使發(fā)送變成非阻塞的,避免了并發(fā)調(diào)用。這樣可以確保取消操作只發(fā)送一次,并且不會阻塞后續(xù)的取消調(diào)用。

 
 
 
 
  1. func (timeout* TimeoutHandler) Cancel() {  
  2.   if timeout.cancelChan == nil {  
  3.     return  
  4.   }  
  5.    
  6.   select {  
  7.   case timeout.cancelChan <- struct{}{}:  
  8.   default:  
  9.     // can’t send on the channel, someone has already requested the cancellation.  
  10.   }  
  11. }

總結(jié)

在實(shí)踐中你學(xué)到了并發(fā)操作時出現(xiàn)的常見錯誤。由于其不確定性,即使進(jìn)行大量的測試,也不容易發(fā)現(xiàn)這些問題。下面是我們在最初的實(shí)現(xiàn)中遇到的三個主要問題:

在非同步的情況下更新共享數(shù)據(jù)

這似乎是個很明顯的問題,但如果并發(fā)更新發(fā)生在不同的位置,就很難發(fā)現(xiàn)。結(jié)果就是數(shù)據(jù)競爭,由于一個更新會覆蓋另一個,因此對同一數(shù)據(jù)的多次更新中會有某些更新丟失。在我們的案例中,我們是在同時更新同一個共享映射里的調(diào)度超時引用。(有趣的是,如果 Go 檢測到在同一個映射對象上的并發(fā)讀寫,會拋出致命錯誤 — 你可以嘗試下運(yùn)行 Go 的數(shù)據(jù)競爭檢測器)。這最終會導(dǎo)致丟失超時引用,且無法取消給定的超時。當(dāng)有必要時,永遠(yuǎn)不要忘記使用鎖。

不要忘記同步 gopher 們的工作

缺少條件檢查

在不能僅依賴鎖的獨(dú)占性的情況下,就需要進(jìn)行條件檢查。我們遇到的場景稍微有點(diǎn)不一樣,但是核心思想跟條件變量是一樣的。假設(shè)有個一個生產(chǎn)者和多個消費(fèi)者使用一個共享隊(duì)列的經(jīng)典場景,生產(chǎn)者可以將一個元素添加到隊(duì)列并喚醒所有消費(fèi)者。這個喚醒調(diào)用意味著隊(duì)列中的數(shù)據(jù)是可訪問的,并且由于隊(duì)列是共享的,消費(fèi)者必須通過鎖來進(jìn)行同步訪問。每個消費(fèi)者都可能拿到鎖;然而,你仍然需要檢查隊(duì)列中是否有元素。因?yàn)樵谀隳玫芥i的瞬間并不知道隊(duì)列的狀態(tài),所以還是需要進(jìn)行條件檢查。

在我們的例子中,超時處理程序收到了計(jì)時器到期時發(fā)出的“喚醒”調(diào)用,但是它仍需要檢查是否已向其發(fā)送了取消信號,然后才能繼續(xù)執(zhí)行回調(diào)。

如果你要喚醒多個 gopher,可能就需要進(jìn)行條件檢查

死鎖

當(dāng)一個線程被卡住,無限期地等待一個喚醒信號,但是這個信號永遠(yuǎn)不會到達(dá)時,就會發(fā)生這種情況。死鎖可以通過讓你的整個程序停機(jī)來徹底殺死你的應(yīng)用。

在我們的案例中,這種情況的發(fā)生是由于多次發(fā)送請求到一個非緩沖且阻塞的通道。這意味著向通道發(fā)送數(shù)據(jù)只有在從這個通道接收完數(shù)據(jù)后才能返回。我們的超時線程循環(huán)迅速從取消通道接收信號;然而,在接收到第一個信號后,它將跳出循環(huán),并且再也不會從這個通道讀取數(shù)據(jù)。其他的調(diào)用會一直被卡住。為避免這種情況,你需要仔細(xì)檢查代碼,謹(jǐn)慎處理阻塞調(diào)用,并確保不會發(fā)生線程饑餓。我們例子中的解決方法是使取消調(diào)用成為非阻塞調(diào)用 — 我們不需要阻塞調(diào)用。


新聞名稱:Go并發(fā)編程中的經(jīng)驗(yàn)教訓(xùn)
轉(zhuǎn)載來于:http://www.dlmjj.cn/article/djsgedj.html