日本综合一区二区|亚洲中文天堂综合|日韩欧美自拍一区|男女精品天堂一区|欧美自拍第6页亚洲成人精品一区|亚洲黄色天堂一区二区成人|超碰91偷拍第一页|日韩av夜夜嗨中文字幕|久久蜜综合视频官网|精美人妻一区二区三区

RELATEED CONSULTING
相關咨詢
選擇下列產(chǎn)品馬上在線溝通
服務時間:8:30-17:00
你可能遇到了下面的問題
關閉右側工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
NIO之多線程協(xié)作處理數(shù)據(jù)讀寫

本文轉載自微信公眾號「源碼學徒」,作者皇甫嗷嗷叫 。轉載本文請聯(lián)系源碼學徒公眾號。

成都創(chuàng)新互聯(lián)公司,專注為中小企業(yè)提供官網(wǎng)建設、營銷型網(wǎng)站制作、響應式網(wǎng)站、展示型做網(wǎng)站、網(wǎng)站制作等服務,幫助中小企業(yè)通過網(wǎng)站體現(xiàn)價值、有效益。幫助企業(yè)快速建站、解決網(wǎng)站建設與網(wǎng)站營銷推廣問題。

經(jīng)過前面幾章的學習,我們已經(jīng) 能夠掌握了JDK NIO的開發(fā)方式,我們來總結一下NIO開發(fā)的流程:

  1. 創(chuàng)建一個服務端通道 ServerSocketChannel
  2. 創(chuàng)建一個選擇器 Selector
  3. 將服務端通道注冊到選擇器上,并且關注我們感興趣的事件serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
  4. 綁定服務管道的地址 serverSocketChannel.bind(new InetSocketAddress(8989));
  5. 開始進行事件選擇,選擇我們感興趣的事件做對應的操作!

具體的代碼信息請參照第一章:多路復用模型章節(jié),這里不做太多的贅述!

有關多路復用的概念,我們也在第一章進行了分析。多路復用模型能夠最大限度的將一個線程的執(zhí)行能力榨干,一條線程執(zhí)行所有的數(shù)據(jù),包括新連接的接入、數(shù)據(jù)的讀取、計算與回寫,但是假設,我們的數(shù)據(jù)計算及其緩慢,那么該任務的執(zhí)行就勢必影響下一個新鏈接的接入!

傳統(tǒng)NIO單線程模型

單線程的NIO模型

如圖,我們能了解到,單線程情況下,讀事件因為要做一些業(yè)務性操作(數(shù)據(jù)庫連接、圖片、文件下載)等操作,導致線程阻塞再,讀事件的處理上,此時單線程程序無法進行下一次新鏈接的處理!我們對該線程模型進行優(yōu)化,select事件處理封裝為任務,提交到線程池!

NIO多線程模型

上面的這種數(shù)據(jù)結構能夠解決掉因為計算任務耗時過長,導致新鏈接接入阻塞的問題,我們能否再次進行一次優(yōu)化呢?

我們能否創(chuàng)建多個事件選擇器,每個事件選擇器,負責不同的Socket連接,就像下面這種:

NIO多線程優(yōu)化模型

這樣我們就可以每一個Select選擇器負責多個客戶端Socket連接,主線程只需要將客戶端新連接選擇一個選擇器注冊到select選擇器上就可以了!所以我們的架構圖,就變成了下圖這樣:

我們在select選擇器內(nèi)部處理計算任務的時候,也可以將任務封裝為task,提交到線程池里面去,徹底將新連接接入和讀寫事件處理分離開,互不影響!事實上,這也是Netty的核心思想之一,我們可以根據(jù)上面的圖例,自己簡單寫一個:

代碼實現(xiàn)

構建一個事件執(zhí)行器 對應上圖的select選擇器

 
 
 
 
  1. /** 
  2.  * Nio事件處理器 
  3.  * 
  4.  * @author huangfu 
  5.  * @date 
  6.  */ 
  7. public class MyNioEventLoop implements Runnable { 
  8.     static final ByteBuffer ALLOCATE = ByteBuffer.allocate(128); 
  9.     private final Selector selector; 
  10.     private final LinkedBlockingQueue linkedBlockingQueue; 
  11.     public MyNioEventLoop(Selector selector) { 
  12.         this.selector = selector; 
  13.         linkedBlockingQueue = new LinkedBlockingQueue<>(); 
  14.     } 
  15.  
  16.     public Selector getSelector() { 
  17.         return selector; 
  18.     } 
  19.  
  20.     public LinkedBlockingQueue getLinkedBlockingQueue() { 
  21.         return linkedBlockingQueue; 
  22.     } 
  23.  
  24.     //忽略  hashCode和eques 
  25.  
  26.     /** 
  27.      * 任務處理器 
  28.      */ 
  29.     @Override 
  30.     public void run() { 
  31.         while (!Thread.currentThread().isInterrupted()) { 
  32.             try { 
  33.                 //進行事件選擇  這里我們只處理讀事件 
  34.                 if (selector.select() > 0) { 
  35.                     Set selectionKeys = selector.selectedKeys(); 
  36.                     Iterator iterator = selectionKeys.iterator(); 
  37.                     //處理讀事件 
  38.                     while (iterator.hasNext()) { 
  39.                         SelectionKey next = iterator.next(); 
  40.                         iterator.remove(); 
  41.                         if (next.isReadable()) { 
  42.                             SocketChannel channel = (SocketChannel) next.channel(); 
  43.                             int read = channel.read(ALLOCATE); 
  44.                             if(read > 0) { 
  45.                                 System.out.printf("線程%s【%s】發(fā)來消-息:",Thread.currentThread().getName(), channel.getRemoteAddress()); 
  46.                                 System.out.println(new String(ALLOCATE.array(), StandardCharsets.UTF_8)); 
  47.                             }else if(read == -1) { 
  48.                                 System.out.println("連接斷開"); 
  49.                                 channel.close(); 
  50.                             } 
  51.                             ALLOCATE.clear(); 
  52.                         } 
  53.                     } 
  54.                     selectionKeys.clear(); 
  55.                 }else { 
  56.                     //處理異步任務  進行注冊 
  57.                     while (!linkedBlockingQueue.isEmpty()) { 
  58.                         Runnable take = linkedBlockingQueue.take(); 
  59.                         //異步事件執(zhí)行 
  60.                         take.run(); 
  61.                     } 
  62.                 } 
  63.             } catch (IOException | InterruptedException e) { 
  64.                 e.printStackTrace(); 
  65.             } 
  66.         } 
  67.     } 

構建一個選擇器組

 
 
 
 
  1. /** 
  2.  * 選擇器組 
  3.  * 
  4.  * @author huangfu 
  5.  * @date 2021年3月12日09:44:37 
  6.  */ 
  7. public class SelectorGroup { 
  8.     private final List SELECTOR_GROUP = new ArrayList<>(8); 
  9.     private static final int AVAILABLE_PROCESSORS = Runtime.getRuntime().availableProcessors(); 
  10.     private final static AtomicInteger IDX = new AtomicInteger(); 
  11.  
  12.     /** 
  13.      * 初始化選擇器 
  14.      * @param count 處理器數(shù)量 
  15.      * @throws IOException 異常欣喜 
  16.      */ 
  17.     public SelectorGroup(int count) throws IOException { 
  18.  
  19.         for (int i = 0; i < count; i++) { 
  20.             Selector open = Selector.open(); 
  21.             MyNioEventLoop myNioEventLoop = new MyNioEventLoop(open); 
  22.             SELECTOR_GROUP.add(myNioEventLoop); 
  23.         } 
  24.     } 
  25.  
  26.     public SelectorGroup() throws IOException { 
  27.         this(AVAILABLE_PROCESSORS << 1); 
  28.     } 
  29.  
  30.     /** 
  31.      * 輪詢獲取一個選擇器 
  32.      * @return 返回一個選擇器 
  33.      */ 
  34.     public MyNioEventLoop next(){ 
  35.         int andIncrement = IDX.getAndIncrement(); 
  36.         int length = SELECTOR_GROUP.size(); 
  37.  
  38.         return SELECTOR_GROUP.get(Math.abs(andIncrement % length)); 
  39.     } 

構建一個執(zhí)行器記錄器

 
 
 
 
  1. /** 
  2.  * @author huangfu 
  3.  * @date 
  4.  */ 
  5. public class ThreadContext { 
  6.     /** 
  7.      * 記錄當前使用過的選擇器 
  8.      */ 
  9.     public static final Set RUN_SELECT = new HashSet<>(); 

構建一個新連接接入選擇器

 
 
 
 
  1. /** 
  2.  * 連接器 
  3.  * 
  4.  * @author huangfu 
  5.  * @date 2021年3月12日10:15:37 
  6.  */ 
  7. public class Acceptor implements Runnable { 
  8.     private final ServerSocketChannel serverSocketChannel; 
  9.     private final SelectorGroup selectorGroup; 
  10.  
  11.     public Acceptor(ServerSocketChannel serverSocketChannel, SelectorGroup selectorGroup) { 
  12.         this.serverSocketChannel = serverSocketChannel; 
  13.         this.selectorGroup = selectorGroup; 
  14.     } 
  15.  
  16.  
  17.     @Override 
  18.     public void run() { 
  19.         try { 
  20.             SocketChannel socketChannel = serverSocketChannel.accept(); 
  21.             MyNioEventLoop next = selectorGroup.next(); 
  22.  
  23.             //向隊列追加一個注冊任務 
  24.             next.getLinkedBlockingQueue().offer(() -> { 
  25.                 try { 
  26.                     //客戶端注冊為非阻塞 
  27.                     socketChannel.configureBlocking(false); 
  28.                     //注冊到選擇器 關注一個讀事件 
  29.                     socketChannel.register(next.getSelector(), SelectionKey.OP_READ); 
  30.                 } catch (Exception e) { 
  31.                     e.printStackTrace(); 
  32.                 } 
  33.             }); 
  34.             //喚醒對應的任務,讓其處理異步任務 
  35.             next.getSelector().wakeup(); 
  36.  
  37.  
  38.             System.out.println("檢測到連接:" + socketChannel.getRemoteAddress()); 
  39.             //當當前選擇器已經(jīng)被使用過了  就不再使用了,直接注冊就行了 
  40.             if (ThreadContext.RUN_SELECT.add(next)) { 
  41.                 //啟動任務 
  42.                 new Thread(next).start(); 
  43.             } 
  44.  
  45.  
  46.         } catch (IOException e) { 
  47.             e.printStackTrace(); 
  48.         } 
  49.     } 

創(chuàng)建啟動器

 
 
 
 
  1. /** 
  2.  * @author huangfu 
  3.  * @date 
  4.  */ 
  5. public class TestMain { 
  6.  
  7.     public static void main(String[] args) throws IOException { 
  8.         //創(chuàng)建一個選擇器組   傳遞選擇器組的大小 決定使用多少選擇器來實現(xiàn) 
  9.         SelectorGroup selectorGroup = new SelectorGroup(2); 
  10.         //開啟一個服務端管道 
  11.         ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); 
  12.         //開啟一個服務端專用的選擇器 
  13.         Selector selector = Selector.open(); 
  14.         //設置非阻塞 
  15.         serverSocketChannel.configureBlocking(false); 
  16.         //創(chuàng)建一個連接器 
  17.         Acceptor acceptor = new Acceptor(serverSocketChannel, selectorGroup); 
  18.         //將服務端通道注冊到服務端選擇器上  這里會綁定一個新連接接入器 
  19.         serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT, acceptor); 
  20.         //綁定端口 
  21.         serverSocketChannel.bind(new InetSocketAddress(8989)); 
  22.         //啟動處理器 
  23.         new Reactor(selector).run(); 
  24.     } 

總結

單線程下的NIO存在性能瓶頸,當某一計算過程緩慢的時候會阻塞住整個線程,導致影響其他事件的處理!

為了解決這一缺陷,我們提出了使用異步線程的方式去操作任務,將耗時較長的業(yè)務,封裝為一個異步任務,提交到線程池執(zhí)行!

為了使業(yè)務操作和新連接接入完全分離開,我們做了另外一重優(yōu)化,我們封裝了一個選擇器組,輪詢的方式獲取選擇器,每一個選擇器都能夠處理多個新連接, socket連接->selector選擇器 = 多 -> 1,在每一個選擇器里面又可以使用線程池來處理任務,進一步提高吞吐量!


新聞標題:NIO之多線程協(xié)作處理數(shù)據(jù)讀寫
當前URL:http://www.dlmjj.cn/article/coosocd.html