日本综合一区二区|亚洲中文天堂综合|日韩欧美自拍一区|男女精品天堂一区|欧美自拍第6页亚洲成人精品一区|亚洲黄色天堂一区二区成人|超碰91偷拍第一页|日韩av夜夜嗨中文字幕|久久蜜综合视频官网|精美人妻一区二区三区

RELATEED CONSULTING
相關咨詢
選擇下列產品馬上在線溝通
服務時間:8:30-17:00
你可能遇到了下面的問題
關閉右側工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
如何使用Go建開發(fā)高負載WebSocket服務器

嗨,大家好! 我的名字是Sergey Kamardin,我是Mail.Ru的工程師。

成都創(chuàng)新互聯(lián)-專業(yè)網(wǎng)站定制、快速模板網(wǎng)站建設、高性價比通遼網(wǎng)站開發(fā)、企業(yè)建站全套包干低至880元,成熟完善的模板庫,直接使用。一站式通遼網(wǎng)站制作公司更省心,省錢,快速模板網(wǎng)站建設找我們,業(yè)務覆蓋通遼地區(qū)。費用合理售后完善,10余年實體公司更值得信賴。

介紹

首先介紹我們的故事的上下文,應該介紹幾點我們?yōu)槭裁葱枰@個服務器。

Mail.Ru有很多有狀態(tài)的系統(tǒng)。 用戶電子郵件存儲是其中之一。 跟蹤系統(tǒng)中的狀態(tài)變化和系統(tǒng)事件有幾種方法。 這主要是通過定期系統(tǒng)輪詢或關于其狀態(tài)變化的系統(tǒng)通知。

兩種方式都有利弊。 但是當涉及郵件時,用戶收到新郵件的速度越快越好。

郵件輪詢涉及每秒大約50,000個HTTP查詢,其中60%返回304狀態(tài),這意味著郵箱沒有變化。

因此,為了減少服務器上的負載并加快郵件傳遞給用戶,決定通過編寫發(fā)布-訂閱服務器,一方面將接收有關狀態(tài)更改的通知,另一方面則會收到這種通知的訂閱。

先前

現(xiàn)在

***個方案顯示了以前的樣子。 瀏覽器定期輪詢API,并查詢有關Storage(郵箱服務)的更改。

第二個方案描述了新架構。 瀏覽器與通知API建立WebSocket連接,通知API是Bus服務器的客戶端。收到新的電子郵件后,Storage會向Bus(1)發(fā)送一條通知,由Bus發(fā)送到訂閱者。 API確定連接以發(fā)送接收到的通知,并將其發(fā)送到用戶的瀏覽器(3)。

所以今天我們將討論API或WebSocket服務器。 我們的服務器將有大約300萬個在線連接。

實現(xiàn)方式

讓我們看看如何使用Go函數(shù)實現(xiàn)服務器的某些部分,而無需任何優(yōu)化。

在進行net/http ,我們來談談我們如何發(fā)送和接收數(shù)據(jù)。 站在WebSocket協(xié)議(例如JSON對象) 之上的數(shù)據(jù)在下文中將被稱為分組 。

我們開始實現(xiàn)包含通過WebSocket連接發(fā)送和接收這些數(shù)據(jù)包的Channel結構。

channel 結構

 
 
 
 
  1. // Packet represents application level data. 
  2. type Packet struct { 
  3.     ... 
  4.  
  5. // Channel wraps user connection. 
  6. type Channel struct { 
  7.     conn net.Conn    // WebSocket connection. 
  8.     send chan Packet // Outgoing packets queue. 
  9.  
  10. func NewChannel(conn net.Conn) *Channel { 
  11.     c := &Channel{ 
  12.         conn: conn, 
  13.         send: make(chan Packet, N), 
  14.     } 
  15.  
  16.     go c.reader() 
  17.     go c.writer() 
  18.  
  19.     return c 

 

注意這里有reader和writer連個goroutines。 每個goroutine都需要自己的內存棧, 根據(jù)操作系統(tǒng)和Go版本可能具有2到8 KB的初始大小。

在300萬個在線連接的時候,我們將需要24 GB的內存 (堆棧為4 KB)用于維持所有連接。 這還沒有計算為Channel結構分配的內存,傳出的數(shù)據(jù)包ch.send和其他內部字段消耗的內存。

 
 
 
 
  1. I/O goroutines 

我們來看看“reader”的實現(xiàn):

 
 
 
 
  1. func (c *Channel) reader() { 
  2.     // We make a buffered read to reduce read syscalls. 
  3.     buf := bufio.NewReader(c.conn) 
  4.  
  5.     for { 
  6.         pkt, _ := readPacket(buf) 
  7.         c.handle(pkt) 
  8.     } 

 

這里我們使用bufio.Reader來減少read() syscalls的數(shù)量,并讀取與buf緩沖區(qū)大小一樣的數(shù)量。 在***循環(huán)中,我們期待新數(shù)據(jù)的到來。 請記?。?預計新數(shù)據(jù)將會來臨。 我們稍后會回來。

我們將離開傳入數(shù)據(jù)包的解析和處理,因為對我們將要討論的優(yōu)化不重要。 但是, buf現(xiàn)在值得我們注意:默認情況下,它是4 KB,這意味著我們需要另外12 GB內存。 “writer”有類似的情況:

 
 
 
 
  1. func (c *Channel) writer() { 
  2.     // We make buffered write to reduce write syscalls.  
  3.     buf := bufio.NewWriter(c.conn) 
  4.  
  5.     for pkt := range c.send { 
  6.         _ := writePacket(buf, pkt) 
  7.         buf.Flush() 
  8.     } 

 

我們遍歷c.send ,并將它們寫入緩沖區(qū)。細心讀者已經猜到的,我們的300萬個連接還將消耗12 GB的內存。

HTTP

我們已經有一個簡單的Channel實現(xiàn),現(xiàn)在我們需要一個WebSocket連接才能使用。

注意:如果您不知道WebSocket如何工作??蛻舳送ㄟ^稱為升級的特殊HTTP機制切換到WebSocket協(xié)議。 在成功處理升級請求后,服務器和客戶端使用TCP連接來交換二進制WebSocket幀。 這是連接中的框架結構的描述。

 
 
 
 
  1. import ( 
  2.     "net/http" 
  3.     "some/websocket" 
  4.  
  5. http.HandleFunc("/v1/ws", func(w http.ResponseWriter, r *http.Request) { 
  6.     conn, _ := websocket.Upgrade(r, w) 
  7.     ch := NewChannel(conn) 
  8.     //... 
  9. }) 

 

請注意, http.ResponseWriter為bufio.Reader和bufio.Writer (使用4 KB緩沖區(qū))進行內存分配,用于*http.Request初始化和進一步的響應寫入。

無論使用什么WebSocket庫,在成功響應升級請求后, 服務器在responseWriter.Hijack()調用之后,連同TCP連接一起接收 I/O緩沖區(qū)。

提示:在某些情況下, go:linkname 可用于 通過調用 net/http.putBufio{Reader,Writer} 將緩沖區(qū)返回到 net/http 內 的 sync.Pool 。

因此,我們需要另外24 GB的內存來維持300萬個鏈接。

所以,我們的程序即使什么都沒做,也需要72G內存。

優(yōu)化

我們來回顧介紹部分中談到的內容,并記住用戶連接的行為。 切換到WebSocket之后,客戶端發(fā)送一個包含相關事件的數(shù)據(jù)包,換句話說就是訂閱事件。 然后(不考慮諸如ping/pong等技術信息),客戶端可能在整個連接壽***不發(fā)送任何其他信息。

連接壽命可能是幾秒到幾天。

所以在最多的時候,我們的Channel.reader()和Channel.writer()正在等待接收或發(fā)送數(shù)據(jù)的處理。 每個都有4 KB的I/O緩沖區(qū)。

現(xiàn)在很明顯,某些事情可以做得更好,不是嗎?

Netpoll

你還記得bufio.Reader.Read()內部,Channel.reader()實現(xiàn)了在沒有新數(shù)據(jù)的時候conn.read()會被鎖。如果連接中有數(shù)據(jù),Go運行時“喚醒”我們的goroutine并允許它讀取下一個數(shù)據(jù)包。 之后,goroutine再次鎖定,期待新的數(shù)據(jù)。 讓我們看看Go運行時如何理解goroutine必須被“喚醒”。 如果我們看看conn.Read()實現(xiàn) ,我們將在其中看到net.netFD.Read()調用 :

 
 
 
 
  1. // net/fd_unix.go 
  2.  
  3. func (fd *netFD) Read(p []byte) (n int, err error) { 
  4.     //... 
  5.     for { 
  6.         n, err = syscall.Read(fd.sysfd, p) 
  7.         if err != nil { 
  8.             n = 0 
  9.             if err == syscall.EAGAIN { 
  10.                 if err = fd.pd.waitRead(); err == nil { 
  11.                     continue 
  12.                 } 
  13.             } 
  14.         } 
  15.         //... 
  16.         break 
  17.     } 
  18.     //... 

 

Go在非阻塞模式下使用套接字。 EAGAIN表示,套接字中沒有數(shù)據(jù),并且在從空套接字讀取時不會被鎖定,操作系統(tǒng)將控制權返還給我們。

我們從連接文件描述符中看到一個read()系統(tǒng)調用。 如果讀取返回EAGAIN錯誤 ,則運行時會使pollDesc.waitRead()調用 :

 
 
 
 
  1. // net/fd_poll_runtime.go 
  2.  
  3. func (pd *pollDesc) waitRead() error { 
  4.    return pd.wait('r') 
  5.  
  6. func (pd *pollDesc) wait(mode int) error { 
  7.    res := runtime_pollWait(pd.runtimeCtx, mode) 
  8.    //... 

 

如果我們深入挖掘 ,我們將看到netpoll是使用Linux中的epoll和BSD中的kqueue來實現(xiàn)的。 為什么不使用相同的方法來進行連接? 我們可以分配一個讀緩沖區(qū),只有在真正有必要時才使用goroutine:當套接字中有真實可讀的數(shù)據(jù)時。

在github.com/golang/go上, 導出netpoll函數(shù)有問題 。

擺脫goroutines

假設我們有Go的netpoll實現(xiàn) 。 現(xiàn)在我們可以避免使用內部緩沖區(qū)啟動Channel.reader() goroutine,并在連接中訂閱可讀數(shù)據(jù)的事件:

 
 
 
 
  1. ch := NewChannel(conn) 
  2.  
  3. // Make conn to be observed by netpoll instance. 
  4. poller.Start(conn, netpoll.EventRead, func() { 
  5.     // We spawn goroutine here to prevent poller wait loop 
  6.     // to become locked during receiving packet from ch. 
  7.     go Receive(ch) 
  8. }) 
  9.  
  10. // Receive reads a packet from conn and handles it somehow. 
  11. func (ch *Channel) Receive() { 
  12.     buf := bufio.NewReader(ch.conn) 
  13.     pkt := readPacket(buf) 
  14.     c.handle(pkt) 

 

使用Channel.writer()更容易,因為只有當我們要發(fā)送數(shù)據(jù)包時,我們才能運行goroutine并分配緩沖區(qū):

 
 
 
 
  1. func (ch *Channel) Send(p Packet) { 
  2.     if c.noWriterYet() { 
  3.         go ch.writer() 
  4.     } 
  5.     ch.send <- p 

 

請注意,當操作系統(tǒng)在 write() 系統(tǒng)調用時返回 EAGAIN 時,我們不處理這種情況 。 對于這種情況,我們傾向于Go運行時那樣處理。 如果需要,它可以以相同的方式來處理。

從ch.send (一個或幾個)讀出傳出的數(shù)據(jù)包后,writer將完成其操作并釋放goroutine棧和發(fā)送緩沖區(qū)。

***! 通過擺脫兩個連續(xù)運行的goroutine中的堆棧和I/O緩沖區(qū),我們節(jié)省了48 GB 。

資源控制

大量的連接不僅涉及高內存消耗。 在開發(fā)服務器時,我們會經歷重復的競爭條件和死鎖,常常是所謂的自動DDoS,這種情況是當應用程序客戶端肆意嘗試連接到服務器,從而破壞服務器。

例如,如果由于某些原因我們突然無法處理ping/pong消息,但是空閑連接的處理程序會關閉這樣的連接(假設連接斷開,因此沒有提供數(shù)據(jù)),客戶端會不斷嘗試連接,而不是等待事件。

如果鎖定或超載的服務器剛剛停止接受新連接,并且負載均衡器(例如,nginx)將請求都傳遞給下一個服務器實例,那壓力將是巨大的。

此外,無論服務器負載如何,如果所有客戶端突然想要以任何原因發(fā)送數(shù)據(jù)包(大概是由于錯誤原因),則先前節(jié)省的48 GB將再次使用,因為我們將實際恢復到初始狀態(tài)goroutine和并對每個連接分配緩沖區(qū)。

Goroutine池

我們可以使用goroutine池來限制同時處理的數(shù)據(jù)包數(shù)量。 這是一個go routine池的簡單實現(xiàn):

 
 
 
 
  1. package gopool 
  2.  
  3. func New(size int) *Pool { 
  4.     return &Pool{ 
  5.         work: make(chan func()), 
  6.         sem:  make(chan struct{}, size), 
  7.     } 
  8.  
  9. func (p *Pool) Schedule(task func()) error { 
  10.     select { 
  11.     case p.work <- task: 
  12.     case p.sem <- struct{}{}: 
  13.         go p.worker(task) 
  14.     } 
  15.  
  16. func (p *Pool) worker(task func()) { 
  17.     defer func() { <-p.sem } 
  18.     for { 
  19.         task() 
  20.         task = <-p.work 
  21.     } 

現(xiàn)在我們的netpoll代碼如下:

 
 
 
 
  1. pool := gopool.New(128) 
  2.  
  3. poller.Start(conn, netpoll.EventRead, func() { 
  4.     // We will block poller wait loop when 
  5.     // all pool workers are busy. 
  6.     pool.Schedule(func() { 
  7.         Receive(ch) 
  8.     }) 
  9. }) 

所以現(xiàn)在我們讀取數(shù)據(jù)包可以在池中使用了空閑的goroutine。

同樣,我們將更改Send() :

 
 
 
 
  1. pool := gopool.New(128) 
  2.  
  3. func (ch *Channel) Send(p Packet) { 
  4.     if c.noWriterYet() { 
  5.         pool.Schedule(ch.writer) 
  6.     } 
  7.     ch.send <- p 

 

而不是go ch.writer() ,我們想寫一個重用的goroutine。 因此,對于N goroutines池,我們可以保證在N請求同時處理并且到達N + 1我們不會分配N + 1緩沖區(qū)進行讀取。 goroutine池還允許我們限制新連接的Accept()和Upgrade() ,并避免大多數(shù)情況下被DDoS打垮。

零拷貝升級

讓我們從WebSocket協(xié)議中偏離一點。 如前所述,客戶端使用HTTP升級請求切換到WebSocket協(xié)議。 協(xié)議是樣子:

 
 
 
 
  1. GET /ws HTTP/1.1 
  2. Host: mail.ru 
  3. Connection: Upgrade 
  4. Sec-Websocket-Key: A3xNe7sEB9HixkmBhVrYaA== 
  5. Sec-Websocket-Version: 13 
  6. Upgrade: websocket 
  7.  
  8. HTTP/1.1 101 Switching Protocols 
  9. Connection: Upgrade 
  10. Sec-Websocket-Accept: ksu0wXWG+YmkVx+KQR2agP0cQn4= 
  11. Upgrade: websocket 

 

也就是說,在我們的例子中,我們需要HTTP請求和header才能切換到WebSocket協(xié)議。 這個知識點和http.Request的內部實現(xiàn)表明我們可以做優(yōu)化。我們會在處理HTTP請求時拋棄不必要的內存分配和復制,并放棄標準的net/http服務器。

例如, http.Request 包含一個具有相同名稱的頭文件類型的字段,它通過將數(shù)據(jù)從連接復制到值字符串而無條件填充所有請求頭。 想像一下這個字段中可以保留多少額外的數(shù)據(jù),例如大型Cookie頭。

但是要做什么呢?

WebSocket實現(xiàn)

不幸的是,在我們的服務器優(yōu)化時存在的所有庫都允許我們對標準的net/http服務器進行升級。 此外,所有庫都不能使用所有上述讀寫優(yōu)化。 為使這些優(yōu)化能夠正常工作,我們必須使用一個相當?shù)图墑e的API來處理WebSocket。 要重用緩沖區(qū),我們需要procotol函數(shù)看起來像這樣:

 
 
 
 
  1. func ReadFrame(io.Reader) (Frame, error)  
  2. func WriteFrame(io.Writer, Frame) error 

如果我們有一個這樣的API的庫,我們可以從連接中讀取數(shù)據(jù)包,如下所示(數(shù)據(jù)包寫入看起來差不多):

 
 
 
 
  1. // getReadBuf, putReadBuf are intended to  
  2. // reuse *bufio.Reader (with sync.Pool for example). 
  3. func getReadBuf(io.Reader) *bufio.Reader 
  4. func putReadBuf(*bufio.Reader) 
  5.  
  6. // readPacket must be called when data could be read from conn. 
  7. func readPacket(conn io.Reader) error { 
  8.     buf := getReadBuf() 
  9.     defer putReadBuf(buf) 
  10.  
  11.     buf.Reset(conn) 
  12.     frame, _ := ReadFrame(buf) 
  13.     parsePacket(frame.Payload) 
  14.     //... 

 

簡而言之,現(xiàn)在是制作我們自己庫的時候了。

 
 
 
 
  1. github.com/gobwas/ws 

為了避免將協(xié)議操作邏輯強加給用戶,我們編寫了WS庫。 所有讀寫方法都接受標準的io.Reader和io.Writer接口,可以使用或不使用緩沖或任何其他I/O包裝器。

除了來自標準net/http升級請求之外, ws支持零拷貝升級 ,升級請求的處理和切換到WebSocket,而無需內存分配或復制。 ws.Upgrade()接受io.ReadWriter ( net.Conn實現(xiàn)了這個接口)。 換句話說,我們可以使用標準的net.Listen()并將接收到的連接從ln.Accept()立即傳遞給ws.Upgrade() 。 該庫可以復制任何請求數(shù)據(jù)以供將來在應用程序中使用(例如, Cookie以驗證會話)。

以下是升級請求處理的基準 :標準net/http服務器與net.Listen()加零拷貝升級:

 
 
 
 
  1. BenchmarkUpgradeHTTP 5156 ns/op 8576 B/op 9 allocs/op  
  2. BenchmarkUpgradeTCP 973 ns/op 0 B/op 0 allocs/op 

切換到ws和零拷貝升級節(jié)省了另外24 GB內存 - 這是由net/http處理程序請求處理時為I/O緩沖區(qū)分配的空間。

概要

讓我們結合代碼告訴你我們做的優(yōu)化。

  • 讀取內部緩沖區(qū)的goroutine是非常昂貴的。 解決方案 :netpoll(epoll,kqueue); 重用緩沖區(qū)。
  • 寫入內部緩沖區(qū)的goroutine是非常昂貴的。 解決方案 :必要時啟動goroutine; 重用緩沖區(qū)。
  • DDOS,netpoll將無法工作。 解決方案 :重新使用數(shù)量限制的goroutines。
  • net/http不是處理升級到WebSocket的最快方法。 解決方案 :在連接上使用零拷貝升級。

這就是服務器代碼的樣子:

 
 
 
 
  1. import ( 
  2.     "net" 
  3.     "github.com/gobwas/ws" 
  4.  
  5. ln, _ := net.Listen("tcp", ":8080") 
  6.  
  7. for { 
  8.     // Try to accept incoming connection inside free pool worker. 
  9.     // If there no free workers for 1ms, do not accept anything and try later. 
  10.     // This will help us to prevent many self-ddos or out of resource limit cases. 
  11.     err := pool.ScheduleTimeout(time.Millisecond, func() { 
  12.         conn := ln.Accept() 
  13.         _ = ws.Upgrade(conn) 
  14.  
  15.         // Wrap WebSocket connection with our Channel struct. 
  16.         // This will help us to handle/send our app's packets. 
  17.         ch := NewChannel(conn) 
  18.  
  19.         // Wait for incoming bytes from connection. 
  20.         poller.Start(conn, netpoll.EventRead, func() { 
  21.             // Do not cross the resource limits. 
  22.             pool.Schedule(func() { 
  23.                 // Read and handle incoming packet(s). 
  24.                 ch.Recevie() 
  25.             }) 
  26.         }) 
  27.     }) 
  28.     if err != nil {    
  29.         time.Sleep(time.Millisecond) 
  30.     } 

結論

過早優(yōu)化是萬惡之源。 Donald Knuth

當然,上述優(yōu)化是有意義的,但并非所有情況都如此。 例如,如果可用資源(內存,CPU)和在線連接數(shù)之間的比例相當高(服務器很閑),則優(yōu)化可能沒有任何意義。 但是,您可以從哪里需要改進以及改進內容中受益匪淺。


網(wǎng)頁題目:如何使用Go建開發(fā)高負載WebSocket服務器
新聞來源:http://www.dlmjj.cn/article/coopieh.html