新聞中心
按照《Unix網(wǎng)絡編程》的劃分,IO模型可以分為:阻塞IO、非阻塞IO、IO復用、信號驅動IO和異步IO,按照POSIX標準來劃分只分為兩類:同步IO和異步IO。如何區(qū)分呢?首先一個IO操作其實分成了兩個步驟:發(fā)起IO請求和實際的IO操作,同步IO和異步IO的區(qū)別就在于第二個步驟是否阻塞,如果實際的IO讀寫阻塞請求進程,那么就是同步IO,因此阻塞IO、非阻塞IO、IO服用、信號驅動IO都是同步IO,如果不阻塞,而是操作系統(tǒng)幫你做完IO操作再將結果返回給你,那么就是異步IO。阻塞IO和非阻塞IO的區(qū)別在于第一步,發(fā)起IO請求是否會被阻塞,如果阻塞直到完成那么就是傳統(tǒng)的阻塞IO,如果不阻塞,那么就是非阻塞IO。

為保德等地區(qū)用戶提供了全套網(wǎng)頁設計制作服務,及保德網(wǎng)站建設行業(yè)解決方案。主營業(yè)務為網(wǎng)站設計、網(wǎng)站建設、保德網(wǎng)站設計,以傳統(tǒng)方式定制建設網(wǎng)站,并提供域名空間備案等一條龍服務,秉承以專業(yè)、用心的態(tài)度為用戶提供真誠的服務。我們深信只要達到每一位用戶的要求,就會得到認可,從而選擇與我們長期合作。這樣,我們也可以走得更遠!
Java nio 2.0的主要改進就是引入了異步IO(包括文件和網(wǎng)絡),這里主要介紹下異步網(wǎng)絡IO API的使用以及框架的設計,以TCP服務端為例。首先看下為了支持AIO引入的新的類和接口:
java.nio.channels.AsynchronousChannel
標記一個channel支持異步IO操作。
java.nio.channels.AsynchronousServerSocketChannel
ServerSocket的aio版本,創(chuàng)建TCP服務端,綁定地址,監(jiān)聽端口等。
java.nio.channels.AsynchronousSocketChannel
面向流的異步socket channel,表示一個連接。
java.nio.channels.AsynchronousChannelGroup
異步channel的分組管理,目的是為了資源共享。一個AsynchronousChannelGroup綁定一個線程池,這個線程池執(zhí)行兩個任務:處理IO事件和派發(fā)CompletionHandler。AsynchronousServerSocketChannel創(chuàng)建的時候可以傳入一個 AsynchronousChannelGroup,那么通過AsynchronousServerSocketChannel創(chuàng)建的 AsynchronousSocketChannel將同屬于一個組,共享資源。
java.nio.channels.CompletionHandler
異步IO操作結果的回調接口,用于定義在IO操作完成后所作的回調工作。AIO的API允許兩種方式來處理異步操作的結果:返回的Future模式或者注冊CompletionHandler,我更推薦用CompletionHandler的方式,這些handler的調用是由 AsynchronousChannelGroup的線程池派發(fā)的。顯然,線程池的大小是性能的關鍵因素。AsynchronousChannelGroup允許綁定不同的線程池,通過三個靜態(tài)方法來創(chuàng)建:
- public static AsynchronousChannelGroup withFixedThreadPool(int nThreads,
- ThreadFactory threadFactory)
- throws IOException
- public static AsynchronousChannelGroup withCachedThreadPool(ExecutorService executor,
- int initialSize)
- public static AsynchronousChannelGroup withThreadPool(ExecutorService executor)
- throws IOException
需要根據(jù)具體應用相應調整,從框架角度出發(fā),需要暴露這樣的配置選項給用戶。
在介紹完了aio引入的TCP的主要接口和類之后,我們來設想下一個aio框架應該怎么設計。參考非阻塞nio框架的設計,一般都是采用Reactor模式,Reacot負責事件的注冊、select、事件的派發(fā);相應地,異步IO有個Proactor模式,Proactor負責 CompletionHandler的派發(fā),查看一個典型的IO寫操作的流程來看兩者的區(qū)別:
Reactor: send(msg) -> 消息隊列是否為空,如果為空 -> 向Reactor注冊OP_WRITE,然后返回 -> Reactor select -> 觸發(fā)Writable,通知用戶線程去處理 ->先注銷Writable(很多人遇到的cpu 100%的問題就在于沒有注銷),處理Writeable,如果沒有完全寫入,繼續(xù)注冊OP_WRITE。注意到,寫入的工作還是用戶線程在處理。
Proactor: send(msg) -> 消息隊列是否為空,如果為空,發(fā)起read異步調用,并注冊CompletionHandler,然后返回。 -> 操作系統(tǒng)負責將你的消息寫入,并返回結果(寫入的字節(jié)數(shù))給Proactor -> Proactor派發(fā)CompletionHandler??梢姡瑢懭氲墓ぷ魇遣僮飨到y(tǒng)在處理,無需用戶線程參與。事實上在aio的API 中,AsynchronousChannelGroup就扮演了Proactor的角色。
CompletionHandler有三個方法,分別對應于處理成功、失敗、被取消(通過返回的Future)情況下的回調處理:
- public interface CompletionHandler
{ - void completed(V result, A attachment);
- void failed(Throwable exc, A attachment);
- void cancelled(A attachment);
- }
其中的泛型參數(shù)V表示IO調用的結果,而A是發(fā)起調用時傳入的attchment。
在初步介紹完aio引入的類和接口后,我們看看一個典型的tcp服務端是怎么啟動的,怎么接受連接并處理讀和寫,這里引用的代碼都是yanf4j 的aio分支中的代碼,可以從svn checkout,svn地址: http://yanf4j.googlecode.com/svn/branches/yanf4j-aio
第一步,創(chuàng)建一個AsynchronousServerSocketChannel,創(chuàng)建之前先創(chuàng)建一個 AsynchronousChannelGroup,上文提到AsynchronousServerSocketChannel可以綁定一個 AsynchronousChannelGroup,那么通過這個AsynchronousServerSocketChannel建立的連接都將同屬于一個AsynchronousChannelGroup并共享資源:
- this.asynchronousChannelGroup = AsynchronousChannelGroup
- .withCachedThreadPool(Executors.newCachedThreadPool(),
- this.threadPoolSize);
然后初始化一個AsynchronousServerSocketChannel,通過open方法:
- this.serverSocketChannel = AsynchronousServerSocketChannel
- .open(this.asynchronousChannelGroup);
通過nio 2.0引入的SocketOption類設置一些TCP選項:
- this.serverSocketChannel
- .setOption(
- StandardSocketOption.SO_REUSEADDR,true);
- this.serverSocketChannel
- .setOption(
- StandardSocketOption.SO_RCVBUF,16*1024);
綁定本地地址:
- this.serverSocketChannel
- .bind(new InetSocketAddress("localhost",8080), 100);
其中的100用于指定等待連接的隊列大小(backlog)。完了嗎?還沒有,最重要的監(jiān)聽工作還沒開始,監(jiān)聽端口是為了等待連接上來以便accept產生一個AsynchronousSocketChannel來表示一個新建立的連接,因此需要發(fā)起一個accept調用,調用是異步的,操作系統(tǒng)將在連接建立后,將最后的結果——AsynchronousSocketChannel返回給你
- public void pendingAccept() {
- if (this.started && this.serverSocketChannel.isOpen()) {
- this.acceptFuture = this.serverSocketChannel.accept(null,
- new AcceptCompletionHandler());
- } else {
- throw new IllegalStateException("Controller has been closed");
- }
- }
注意,重復的accept調用將會拋出PendingAcceptException,后文提到的read和write也是如此。accept方法的第一個參數(shù)是你想傳給CompletionHandler的attchment,第二個參數(shù)就是注冊的用于回調的CompletionHandler,最后返回結果Future
- private final class AcceptCompletionHandler implements
- CompletionHandler
{ - @Override
- public void cancelled(Object attachment) {
- logger.warn("Accept operation was canceled");
- }
- @Override
- public void completed(AsynchronousSocketChannel socketChannel,
- Object attachment) {
- try {
- logger.debug("Accept connection from "
- + socketChannel.getRemoteAddress());
- configureChannel(socketChannel);
- AioSessionConfig sessionConfig = buildSessionConfig(socketChannel);
- Session session = new AioTCPSession(sessionConfig,
- AioTCPController.this.configuration
- .getSessionReadBufferSize(),
- AioTCPController.this.sessionTimeout);
- session.start();
- registerSession(session);
- } catch (Exception e) {
- e.printStackTrace();
- logger.error("Accept error", e);
- notifyException(e);
- } finally {
- pendingAccept();
- }
- }
- @Override
- public void failed(Throwable exc, Object attachment) {
- logger.error("Accept error", exc);
- try {
- notifyException(exc);
- } finally {
- pendingAccept();
- }
- }
- }
注意到了吧,我們在failed和completed方法中在最后都調用了pendingAccept來繼續(xù)發(fā)起accept調用,等待新的連接上來。有的同學可能要說了,這樣搞是不是遞歸調用,會不會堆棧溢出?實際上不會,因為發(fā)起accept調用的線程與CompletionHandler回調的線程并非同一個,不是一個上下文中,兩者之間沒有耦合關系。要注意到,CompletionHandler的回調共用的是 AsynchronousChannelGroup綁定的線程池,因此千萬別在CompletionHandler回調方法中調用阻塞或者長時間的操作,例如sleep,回調方法最好能支持超時,防止線程池耗盡。
連接建立后,怎么讀和寫呢?回憶下在nonblocking nio框架中,連接建立后的第一件事是干什么?注冊OP_READ事件等待socket可讀。異步IO也同樣如此,連接建立后馬上發(fā)起一個異步read調用,等待socket可讀,這個是Session.start方法中所做的事情:
- public class AioTCPSession {
- protected void start0() {
- pendingRead();
- }
- protected final void pendingRead() {
- if (!isClosed() && this.asynchronousSocketChannel.isOpen()) {
- if (!this.readBuffer.hasRemaining()) {
- this.readBuffer = ByteBufferUtils
- .increaseBufferCapatity(this.readBuffer);
- }
- this.readFuture = this.asynchronousSocketChannel.read(
- this.readBuffer, this, this.readCompletionHandler);
- } else {
- throw new IllegalStateException(
- "Session Or Channel has been closed");
- }
- }
- }
AsynchronousSocketChannel的read調用與AsynchronousServerSocketChannel的accept調用類似,同樣是非阻塞的,返回結果也是一個Future,但是寫的結果是整數(shù),表示寫入了多少字節(jié),因此read調用返回的是 Future
- public final class ReadCompletionHandler implements
- CompletionHandler
{ - private static final Logger log = LoggerFactory
- .getLogger(ReadCompletionHandler.class);
- protected final AioTCPController controller;
- public ReadCompletionHandler(AioTCPController controller) {
- this.controller = controller;
- }
- @Override
- public void cancelled(AbstractAioSession session) {
- log.warn("Session(" + session.getRemoteSocketAddress()
- + ") read operation was canceled");
- }
- @Override
- public void completed(Integer result, AbstractAioSession session) {
- if (log.isDebugEnabled())
- log.debug("Session(" + session.getRemoteSocketAddress()
- + ") read +" + result + " bytes");
- if (result < 0) {
- session.close();
- return;
- }
- try {
- if (result > 0) {
- session.updateTimeStamp();
- session.getReadBuffer().flip();
- session.decode();
- session.getReadBuffer().compact();
- }
- } finally {
- try {
- session.pendingRead();
- } catch (IOException e) {
- session.onException(e);
- session.close();
- }
- }
- controller.checkSessionTimeout();
- }
- @Override
- public void failed(Throwable exc, AbstractAioSession session) {
- log.error("Session read error", exc);
- session.onException(exc);
- session.close();
- }
- }
如果IO讀失敗,會返回失敗產生的異常,這種情況下我們就主動關閉連接,通過session.close()方法,這個方法干了兩件事情:關閉channel和取消read調用:
- if (null != this.readFuture) {
- this.readFuture.cancel(true);
- }
- this.asynchronousSocketChannel.close();
在讀成功的情況下,我們還需要判斷結果result是否小于0,如果小于0就表示對端關閉了,這種情況下我們也主動關閉連接并返回。如果讀到一定字節(jié),也就是result大于0的情況下,我們就嘗試從讀緩沖區(qū)中decode出消息,并派發(fā)給業(yè)務處理器的回調方法,最終通過pendingRead繼續(xù)發(fā)起read調用等待socket的下一次可讀。可見,我們并不需要自己去調用channel來進行IO讀,而是操作系統(tǒng)幫你直接讀到了緩沖區(qū),然后給你一個結果表示讀入了多少字節(jié),你處理這個結果即可。而nonblocking IO框架中,是reactor通知用戶線程socket可讀了,然后用戶線程自己去調用read進行實際讀操作。這里還有個需要注意的地方,就是decode出來的消息的派發(fā)給業(yè)務處理器工作最好交給一個線程池來處理,避免阻塞group綁定的線程池。
IO寫的操作與此類似,不過通常寫的話我們會在session中關聯(lián)一個緩沖隊列來處理,沒有完全寫入或者等待寫入的消息都存放在隊列中,隊列為空的情況下發(fā)起write調用:
- protected void write0(WriteMessage message) {
- boolean needWrite = false;
- synchronized (this.writeQueue) {
- needWrite = this.writeQueue.isEmpty();
- this.writeQueue.offer(message);
- }
- if (needWrite) {
- pendingWrite(message);
- }
- }
- protected final void pendingWrite(WriteMessage message) {
- message = preprocessWriteMessage(message);
- if (!isClosed() && this.asynchronousSocketChannel.isOpen()) {
- this.asynchronousSocketChannel.write(message.getWriteBuffer(),
- this, this.writeCompletionHandler);
- } else {
- throw new IllegalStateException(
- "Session Or Channel has been closed");
- }
- }
write調用返回的結果與read一樣是一個Future
- @Override
- public void completed(Integer result, AbstractAioSession session) {
- if (log.isDebugEnabled())
- log.debug("Session(" + session.getRemoteSocketAddress()
- + ") writen " + result + " bytes");
- WriteMessage writeMessage;
- Queue
writeQueue = session.getWriteQueue(); - synchronized (writeQueue) {
- writeMessage = writeQueue.peek();
- if (writeMessage.getWriteBuffer() == null
- || !writeMessage.getWriteBuffer().hasRemaining()) {
- writeQueue.remove();
- if (writeMessage.getWriteFuture() != null) {
- writeMessage.getWriteFuture().setResult(Boolean.TRUE);
- }
- try {
- session.getHandler().onMessageSent(session,
- writeMessage.getMessage());
- } catch (Exception e) {
- session.onException(e);
- }
- writeMessage = writeQueue.peek();
- }
- }
- if (writeMessage != null) {
- try {
- session.pendingWrite(writeMessage);
- } catch (IOException e) {
- session.onException(e);
- session.close();
- }
- }
- }
compete方法中的result就是實際寫入的字節(jié)數(shù),然后我們判斷消息的緩沖區(qū)是否還有剩余,如果沒有就將消息從隊列中移除,如果隊列中還有消息,那么繼續(xù)發(fā)起write調用。
重復一下,這里引用的代碼都是yanf4j aio分支中的源碼,感興趣的朋友可以直接check out出來看看: http://yanf4j.googlecode.com/svn/branches/yanf4j-aio。
在引入了aio之后,java對于網(wǎng)絡層的支持已經非常完善,該有的都有了,java也已經成為服務器開發(fā)的首選語言之一。java的弱項在于對內存的管理上,由于這一切都交給了GC,因此在高性能的網(wǎng)絡服務器上還是Cpp的天下。java這種單一堆模型比之erlang的進程內堆模型還是有差距,很難做到高效的垃圾回收和細粒度的內存管理。
這里僅僅是介紹了aio開發(fā)的核心流程,對于一個網(wǎng)絡框架來說,還需要考慮超時的處理、緩沖buffer的處理、業(yè)務層和網(wǎng)絡層的切分、可擴展性、性能的可調性以及一定的通用性要求。
原文鏈接:http://lxy2330.iteye.com/blog/1122849
【編輯推薦】
- Java NIO開發(fā)實例
- Java NIO 聊天室實例
- 多線程NIO客戶端實例
- 用nio實現(xiàn)Echo服務
- Java NIO 深入研究
當前名稱:JavaNIO2AIO開發(fā)核心流程
文章出自:http://www.dlmjj.cn/article/cojipge.html


咨詢
建站咨詢
