日本综合一区二区|亚洲中文天堂综合|日韩欧美自拍一区|男女精品天堂一区|欧美自拍第6页亚洲成人精品一区|亚洲黄色天堂一区二区成人|超碰91偷拍第一页|日韩av夜夜嗨中文字幕|久久蜜综合视频官网|精美人妻一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問(wèn)題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營(yíng)銷解決方案
圖解 Kafka 網(wǎng)絡(luò)層實(shí)現(xiàn)機(jī)制(一)

今天我們就來(lái)聊聊 Kafka 是如何對(duì) Java NIO 進(jìn)行封裝的,本系列總共分為3篇,主要剖析以下幾個(gè)問(wèn)題:

創(chuàng)新互聯(lián)2013年開(kāi)創(chuàng)至今,是專業(yè)互聯(lián)網(wǎng)技術(shù)服務(wù)公司,擁有項(xiàng)目成都做網(wǎng)站、網(wǎng)站設(shè)計(jì)、外貿(mào)營(yíng)銷網(wǎng)站建設(shè)網(wǎng)站策劃,項(xiàng)目實(shí)施與項(xiàng)目整合能力。我們以讓每一個(gè)夢(mèng)想脫穎而出為使命,1280元織金做網(wǎng)站,已為上家服務(wù),為織金各地企業(yè)和個(gè)人服務(wù),聯(lián)系電話:13518219792

  1. 針對(duì) Java NIO 的 SocketChannel,kafka 是如何封裝統(tǒng)一的傳輸層來(lái)實(shí)現(xiàn)最基礎(chǔ)的網(wǎng)絡(luò)連接以及讀寫操作的?
  2. 剖析 KafkaChannel 是如何對(duì)傳輸層、讀寫 buffer 操作進(jìn)行封裝的?
  3. 剖析工業(yè)級(jí) NIO 實(shí)戰(zhàn):如何基于位運(yùn)算來(lái)控制事件的監(jiān)聽(tīng)以及拆包、粘包是如何實(shí)現(xiàn)的?
  4. 剖析 Kafka 是如何封裝 Selector 多路復(fù)用器的?
  5. 剖析 Kafka 封裝的 Selector 是如何初始化并與 Broker 進(jìn)行連接以及網(wǎng)絡(luò)讀寫的?
  6. 剖析 Kafka 網(wǎng)絡(luò)發(fā)送消息和接收響應(yīng)的整個(gè)過(guò)程是怎樣的?

本篇只討論前3個(gè)問(wèn)題,剩余的放到后2篇中。

認(rèn)真讀完這篇文章,我相信你會(huì)對(duì) Kafka 封裝 Java NIO 源碼有更加深刻的理解。

這篇文章干貨很多,希望你可以耐心讀完。

一、總體概述

??上篇??剖析了「生產(chǎn)者元數(shù)據(jù)的拉取和管理的全過(guò)程」,此時(shí)發(fā)送消息的時(shí)候就有了元數(shù)據(jù),但是還沒(méi)有進(jìn)行網(wǎng)絡(luò)通信,而網(wǎng)絡(luò)通信是一個(gè)相對(duì)復(fù)雜的過(guò)程,對(duì)于 Java 系統(tǒng)來(lái)說(shuō)網(wǎng)絡(luò)通信一般會(huì)采用 NIO 庫(kù)來(lái)實(shí)現(xiàn),所以 Kafka 對(duì) Java NIO 封裝了統(tǒng)一的框架,來(lái)實(shí)現(xiàn)多路復(fù)用的網(wǎng)絡(luò) I/O 操作。

為了方便大家理解,所有的源碼只保留骨干。

二、Kafka 對(duì) Java NIO 的封裝

如果大家對(duì) Java NIO 不了解的話,可以看下這個(gè)文檔,這里就不過(guò)多介紹了。

https://pdai.tech/md/java/io/java-io-nio.html。

我們來(lái)看看 Kafka 對(duì) Java NIO 組件做了哪些封裝? 這里先說(shuō)下結(jié)果,后面會(huì)深度剖析。

  1. TransportLayer:它是一個(gè)接口,封裝了底層 NIO 的 SocketChannel。
  2. NetworkReceive:封裝了 NIO 的 ByteBuffer 中的讀 Buffer,對(duì)網(wǎng)絡(luò)編程中的粘包、拆包經(jīng)典實(shí)現(xiàn)。
  3. NetworkSend:封裝了 NIO 的 ByteBuffer 中的寫 Buffer。
  4. KafkaChannel:對(duì) TransportLayer、NetworkReceive、NetworkSend 進(jìn)一步封裝,屏蔽了底層的實(shí)現(xiàn)細(xì)節(jié),對(duì)上層更友好。
  5. KafkaSelector:封裝了 NIO 的 Selector 多路復(fù)用器組件。

接下來(lái)我們挨個(gè)對(duì)上面組件進(jìn)行剖析。

三、TransportLayer 封裝過(guò)程

TransportLayer 接口是對(duì) NIO 中 「SocketChannel」 的封裝。它的實(shí)現(xiàn)類總共有 2 個(gè):

  1. PlaintextTransportLayer:明文網(wǎng)絡(luò)傳輸實(shí)現(xiàn)。
  2. SslTransportLayer:SSL 加密網(wǎng)絡(luò)傳輸實(shí)現(xiàn)。

本篇只剖析 PlaintextTransportLayer 的實(shí)現(xiàn)。

github 源碼地址如下:

https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java。

public class PlaintextTransportLayer implements TransportLayer {
// java nio 中 SelectionKey 事件
private final SelectionKey key;
// java nio 中的SocketChannel
private final SocketChannel socketChannel;
// 安全相關(guān)
private final Principal principal = KafkaPrincipal.ANONYMOUS;
// 初始化
public PlaintextTransportLayer(SelectionKey key) throws IOException {
// 對(duì) NIO 中 SelectionKey 類的對(duì)象引用
this.key = key;
// 對(duì) NIO 中 SocketChannel 類的對(duì)象引用
this.socketChannel = (SocketChannel) key.channel();
}
}

從上面代碼可以看出,該類就是對(duì)底層 NIO 的 socketChannel 封裝引用。將構(gòu)造函數(shù)的 SelectionKey 類對(duì)象賦值給 key,然后從 key 中取出對(duì)應(yīng)的 SocketChannel 賦值給 socketChannel,這樣就完成了初始化工作。

接下來(lái),我們看看幾個(gè)重要方法是如何使用這2個(gè) NIO 組件的。

1、finishConnect()

@Override
// 判斷網(wǎng)絡(luò)連接是否完成
public boolean finishConnect() throws IOException {
// 1. 調(diào)用socketChannel的finishConnect方法,返回該連接是否已經(jīng)連接完成
boolean connected = socketChannel.finishConnect();
// 2. 如果網(wǎng)絡(luò)連接完成以后就刪除對(duì)OP_CONNECT事件的監(jiān)聽(tīng),同時(shí)添加對(duì)OP_READ事件的監(jiān)聽(tīng)
if (connected)
// 事件操作
key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
// 3. 最后返回網(wǎng)絡(luò)連接
return connected;
}

該方法主要用來(lái)判斷網(wǎng)絡(luò)連接是否完成,如果完成就關(guān)注 「OP_READ」 事件,并取消 「OP_CONNECT」 事件。

  1. 首先調(diào)用 socketChannel 通道的 finishConnect() 判斷連接是否完成。
  2. 如果網(wǎng)絡(luò)連接完成以后就刪除對(duì) OP_CONNECT 事件的監(jiān)聽(tīng),同時(shí)添加對(duì) OP_READ 事件的監(jiān)聽(tīng),因?yàn)檫B接完成后就可能接收數(shù)據(jù)了。
  3. 最后返回網(wǎng)絡(luò)連接 connected。

二進(jìn)制位運(yùn)算事件監(jiān)聽(tīng)

這里通過(guò)「二進(jìn)制位運(yùn)算」巧妙的解決了網(wǎng)絡(luò)事件的監(jiān)聽(tīng)操作,實(shí)現(xiàn)非常經(jīng)典。

通過(guò) socketChannel 在 Selector 多路復(fù)用器注冊(cè)事件返回 SelectionKey ,SelectionKey 的類型包括:

  1. OP_READ:可讀事件,值為:1<<0 == 1 == 00000001。
  2. OP_WRITE:可寫事件,值為:1<<2 == 4 == 00000100。
  3. OP_CONNECT:客戶端連接服務(wù)端的事件,一般為創(chuàng)建 SocketChannel 客戶端 channel,值為:1<<3 == 8 ==00001000。
  4. OP_ACCEPT:服務(wù)端接收客戶端連接的事件,一般為創(chuàng)建 ServerSocketChannel 服務(wù)端 channel,值為:1<<4 == 16 == 00010000。
key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);

首先"~"符號(hào)代表按位取反,"&"代表按位取與,通過(guò) key.interestOps() 獲取當(dāng)前的事件,然后和 OP_CONNECT事件取反「11110111」 后按位與操作。

所以,"& ~xx" 代表刪除 xx 事件,有就刪除,沒(méi)有就不變;而 "| xx" 代表將 xx 事件添加進(jìn)去。

2、read()

@Override
public int read(ByteBuffer dst) throws IOException {
// 調(diào)用 NIO 的通道實(shí)現(xiàn)數(shù)據(jù)的讀取
return socketChannel.read(dst);
}

該方法主要用來(lái)把 socketChannel 里面的數(shù)據(jù)讀取緩沖區(qū) ByteBuffer 里,通過(guò)調(diào)用 socketChannel.read() 實(shí)現(xiàn)。

3、write()

@Override
public int write(ByteBuffer src) throws IOException {
return socketChannel.write(src);
}

該方法主要用來(lái)把緩沖區(qū) ByteBuffer 的數(shù)據(jù)寫到 SocketChannel 里,通過(guò)調(diào)用 socketChannel.write() 實(shí)現(xiàn)。

大家都知道在網(wǎng)絡(luò)編程中,一次讀寫操作并一定能把數(shù)據(jù)讀寫完,所以就需要判斷是否讀寫完成,勢(shì)必會(huì)涉及數(shù)據(jù)的「拆包」、「粘包」操作。 這些操作比較繁瑣,因此 Kafka 將 ByteBuffer 的讀寫操作進(jìn)行重新封裝,分別對(duì)應(yīng) NetworkReceive 讀操作、NetworkSend 寫操作,對(duì)于上層調(diào)用無(wú)需判斷是否讀寫完成,更加友好。

接下來(lái)我們就來(lái)分別剖析下這2個(gè)類的實(shí)現(xiàn)。

四、NetworkReceive 封裝過(guò)程

public class NetworkReceive implements Receive {
....
// 空 ByteBuffer
private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
private final String source;
// 存儲(chǔ)響應(yīng)消息數(shù)據(jù)長(zhǎng)度
private final ByteBuffer size;
// 響應(yīng)消息數(shù)據(jù)的最大長(zhǎng)度
private final int maxSize;
// ByteBuffer 內(nèi)存池
private final MemoryPool memoryPool;
// 已讀取字節(jié)大小
private int requestedBufferSize = -1;
// 存儲(chǔ)響應(yīng)消息數(shù)據(jù)體
private ByteBuffer buffer;
// 初始化構(gòu)造函數(shù)
public NetworkReceive(int maxSize, String source, MemoryPool memoryPool) {
this.source = source;
// 分配4個(gè)字節(jié)大小的數(shù)據(jù)長(zhǎng)度
this.size = ByteBuffer.allocate(4);
this.buffer = null;
// 能接收消息的最大長(zhǎng)度
this.maxSize = maxSize;
this.memoryPool = memoryPool;
}
}
  1. EMPTY_BUFFER:空 Buffer,值為 ByteBuffer.allocate(0)。
  2. source:final類型,用來(lái)確定對(duì)應(yīng) channel id。
  3. size:final類型,存儲(chǔ)響應(yīng)消息數(shù)據(jù)長(zhǎng)度,大小為4字節(jié)。
  4. maxSize:final類型,接收響應(yīng)消息數(shù)據(jù)的最大長(zhǎng)度。
  5. memoryPool:final類型,ByteBuffer 內(nèi)存池。
  6. requestedBufferSize:已讀取字節(jié)大小。
  7. buffer:存儲(chǔ)響應(yīng)消息數(shù)據(jù)體。

從屬性可以看出,包含2個(gè) ByteBuffer,分別是 size 和 buffer。這里重點(diǎn)說(shuō)下源碼中的size字段的初始化。通過(guò)長(zhǎng)度編碼方式實(shí)現(xiàn),上來(lái)就先分配了4字節(jié)大小的 ByteBuffer 來(lái)存儲(chǔ)響應(yīng)消息數(shù)據(jù)長(zhǎng)度,即32位,與 Java int 占用相同的字節(jié)數(shù),完全滿足表示消息長(zhǎng)度的值。

介紹完字段后,我們來(lái)深度剖析下該類的幾個(gè)重要的方法。

1、readFrom()

public long readFrom(ScatteringByteChannel channel) throws IOException {
// 讀取數(shù)據(jù)總大小
int read = 0;
// 1.判斷響應(yīng)消息數(shù)據(jù)長(zhǎng)度的 ByteBuffer 是否讀完
if (size.hasRemaining()) {
// 2.還有剩余,直接讀取消息數(shù)據(jù)的長(zhǎng)度
int bytesRead = channel.read(size);
if (bytesRead < 0)
throw new EOFException();
// 3.每次讀取后,累加到總讀取數(shù)據(jù)大小里
read += bytesRead;
// 4.判斷響應(yīng)消息數(shù)據(jù)長(zhǎng)度的緩存是否讀完了
if (!size.hasRemaining()) {
// 5.重置position
size.rewind();
// 6.讀取響應(yīng)消息數(shù)據(jù)長(zhǎng)度
int receiveSize = size.getInt();
// 7.如果有異常就拋出
if (receiveSize < 0)
throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + ")");
if (maxSize != UNLIMITED && receiveSize > maxSize)
throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")");
// 8.將讀到數(shù)據(jù)長(zhǎng)度賦值已讀取字節(jié)大小,即數(shù)據(jù)體的大小
requestedBufferSize = receiveSize;
if (receiveSize == 0) {
buffer = EMPTY_BUFFER;
}
}
}
// 9.如果數(shù)據(jù)體buffer還沒(méi)有分配,且響應(yīng)消息數(shù)據(jù)頭已讀完
if (buffer == null && requestedBufferSize != -1) {
// 10.分配requestedBufferSize字節(jié)大小的內(nèi)存空間給數(shù)據(jù)體buffer
buffer = memoryPool.tryAllocate(requestedBufferSize);
if (buffer == null)
log.trace("Broker low on memory - could not allocate buffer of size {} for source {}", requestedBufferSize, source);
}
// 11.判斷buffer是否分配成功
if (buffer != null) {
// 12.把channel里的數(shù)據(jù)讀到buffer中
int bytesRead = channel.read(buffer);
if (bytesRead < 0)
throw new EOFException();
// 13.累計(jì)讀取數(shù)據(jù)總大小
read += bytesRead;
}
// 14. 返回總大小
return read;
}

該方法主要用來(lái)把對(duì)應(yīng) channel 中的數(shù)據(jù)讀到 ByteBuffer 中,包括響應(yīng)消息數(shù)據(jù)長(zhǎng)度的 size 和響應(yīng)消息數(shù)據(jù)體長(zhǎng)度的 buffer,可能會(huì)被多次調(diào)用,每次都需要判斷 size 和 buffer 的狀態(tài)并讀取。

在讀取時(shí),先讀取4字節(jié)到 size 中,再根據(jù) size 的大小為 buffer 分配內(nèi)存,然后讀滿整個(gè) buffer 時(shí)就表示讀取完成了。

通過(guò)短短的30行左右代碼就解決了工業(yè)級(jí)「拆包」 、「粘包」問(wèn)題,相當(dāng)?shù)慕?jīng)典。

如果要解決「粘包」問(wèn)題,就是在每個(gè)響應(yīng)數(shù)據(jù)中間插入一個(gè)特殊的字節(jié)大小的「分隔符」,這里就在響應(yīng)消息體前面插入4個(gè)字節(jié),代表響應(yīng)消息自己本身的數(shù)據(jù)大小,如下圖所示:

具體「拆包」的操作步驟如下:

  1. 調(diào)用 size.hasRemaining() 返回position 至 limit 之間的字節(jié)大小來(lái)判斷響應(yīng)消息數(shù)據(jù)長(zhǎng)度的 ByteBuffer 是否讀完。
  2. 當(dāng)未讀完則通過(guò)調(diào)用 NIO 的方法 channel.read(size),直接把讀取4字節(jié)的響應(yīng)消息數(shù)據(jù)的長(zhǎng)度寫入到 ByteBuffer size 中,如果已經(jīng)讀取到了4字節(jié),此時(shí) position=4,與  limit  相同,表示 ByteBuffer size 已經(jīng)讀滿了。
  3. 每次讀取后,累加到總讀取數(shù)據(jù)大小里
  4. 再次判斷響應(yīng)消息數(shù)據(jù)長(zhǎng)度的緩存是否讀完了。
  5. 如果讀完了,先重置 position 位置為0,此時(shí)就可以從 ByteBuffer 中讀取數(shù)據(jù)了,然后調(diào)用 size.getInt() 從 ByteBuffer 當(dāng)前 position 位置讀取4個(gè)字節(jié),并轉(zhuǎn)化成int 類型數(shù)值賦給 receiveSize,即響應(yīng)體的長(zhǎng)度。
  6. 如果有異常就拋出,包括響應(yīng)數(shù)據(jù)體的長(zhǎng)度無(wú)效或者大于最大長(zhǎng)度等。
  7. 將讀到響應(yīng)數(shù)據(jù)長(zhǎng)度賦值 requestedBufferSize,即數(shù)據(jù)體的大小。
  8. 如果響應(yīng)數(shù)據(jù)體 buffer 還沒(méi)有分配,且響應(yīng)數(shù)據(jù)頭已讀完,分配 requestedBufferSize 字節(jié)大小的內(nèi)存空間給數(shù)據(jù)體 buffer。
  9. 如果 buffer 分配成功,表示 size 已讀完,此時(shí)直接把 channel 里的響應(yīng)數(shù)據(jù)讀到跟它大小一致的 ByteBuffer 中,再次累計(jì)讀取數(shù)據(jù)總大小。
  10. 最后返回?cái)?shù)據(jù)總大小。

2、complete()

@Override
public boolean complete() {
// 響應(yīng)消息頭已讀完 && 響應(yīng)消息體已讀完
return !size.hasRemaining() && buffer != null && !buffer.hasRemaining();
}

該方法主要用來(lái)判斷是否都讀取完成,即響應(yīng)頭大小和響應(yīng)體大小都讀取完。

3、size()

// 返回大小
public int size() {
return payload().limit() + size.limit();
}
public ByteBuffer payload() {
return this.buffer;
}

該方法主要用來(lái)返回響應(yīng)頭和響應(yīng)體還有多少數(shù)據(jù)需要讀出。

此時(shí)已經(jīng)剖析完讀 Buffer 的封裝,接下來(lái)我們看看寫 Buffer。

五、NetworkSend 封裝過(guò)程

github 源碼地址如下:

https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java。

https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java。

https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/network/Send.java。

調(diào)用關(guān)系圖如下:

1、Send 接口

我們先看一下接口 Send 都定義了哪些方法。

public interface Send {
// 要把數(shù)據(jù)寫入目標(biāo)的 channel id
String destination();
// 要發(fā)送的數(shù)據(jù)是否發(fā)送完了
boolean completed();
// 把數(shù)據(jù)寫到對(duì)應(yīng) channel 中
long writeTo(GatheringByteChannel channel) throws IOException;
// 發(fā)送數(shù)據(jù)的大小
long size();
}

Send 作為要發(fā)送數(shù)據(jù)的接口, 子類 ByteBufferSend 實(shí)現(xiàn) complete() 方法用于判斷是否已經(jīng)發(fā)送完成,實(shí)現(xiàn) writeTo() 方法來(lái)實(shí)現(xiàn)寫入數(shù)據(jù)到Channel中。

2、ByteBufferSend 類

ByteBufferSend 類實(shí)現(xiàn)了 Send 接口,即實(shí)現(xiàn)了數(shù)據(jù)從 ByteBuffer 數(shù)組發(fā)送到 channel:

public class ByteBufferSend implements Send {
private final String destination;
// 總共要寫多少字節(jié)數(shù)據(jù)
private final int size;
// 用于寫入channel里的ByteBuffer數(shù)組,說(shuō)明kafka一次最大傳輸字節(jié)是有限定的
protected final ByteBuffer[] buffers;
// 總共還剩多少字節(jié)沒(méi)有寫完
private int remaining;
private boolean pending = false;

public ByteBufferSend(String destination, ByteBuffer... buffers) {
this.destination = destination;
this.buffers = buffers;
for (ByteBuffer buffer : buffers)
remaining += buffer.remaining();
// 計(jì)算需要寫入字節(jié)的總和
this.size = remaining;
}
}

我們來(lái)看下這個(gè)類中的幾個(gè)重要字段:

  1. destination:數(shù)據(jù)寫入的目標(biāo) channel id。
  2. size:總共需要往 channel 里寫多少字節(jié)數(shù)據(jù)。
  3. buffers:ByteBuffer數(shù)組類型,用來(lái)存儲(chǔ)要寫入 channel 里的數(shù)據(jù)。
  4. remaining:ByteBuffer數(shù)組所有的ByteBuffer 還剩多少字節(jié)沒(méi)有寫完。

介紹完字段后,我們來(lái)深度剖析下該類的幾個(gè)重要的方法。

(1)writeTo()

@Override
// 將字節(jié)流數(shù)據(jù)寫入到channel中
public long writeTo(GatheringByteChannel channel) throws IOException {
// 1.調(diào)用nio底層write方法把buffers寫入傳輸層返回寫入的字節(jié)數(shù)
long written = channel.write(buffers);
if (written < 0)
throw new EOFException("Wrote negative bytes to channel. This shouldn't happen.");
// 2.計(jì)算還剩多少字節(jié)沒(méi)有寫入傳輸層
remaining -= written;
// 每次發(fā)送 都檢查是否
pending = TransportLayers.hasPendingWrites(channel);
return written;
}

該方法主要用來(lái)把 buffers 數(shù)組寫入到 SocketChannel里,因?yàn)樵诰W(wǎng)絡(luò)編程中,寫一次不一定可以完全把數(shù)據(jù)都寫成功,所以調(diào)用底層 channel.write(buffers) 方法會(huì)返回「已經(jīng)寫入成功多少字節(jié)」的返回值,這樣調(diào)用一次后就知道已經(jīng)寫入多少字節(jié)了。

(2)some other

@Override
public String destination() {
// 返回對(duì)應(yīng)的channel id
return destination;
}
@Override
public boolean completed() {
// 判斷是否完成 即沒(méi)有剩余&pending=false
return remaining <= 0 && !pending;
}
/**
* always returns false as there will be not be any
* pending writes since we directly write to socketChannel.
*/
@Override
public boolean hasPendingWrites() {
// 在PLAINTEXT下 pending 始終為 false
return false;
}
@Override
public long size() {
// 返回寫入字節(jié)的總和
return this.size;
}

3、NetworkSend 類

NetworkSend 類繼承了 ByteBufferSend 類,真正用來(lái)寫 Buffer。

public class NetworkSend extends ByteBufferSend {
// 實(shí)例化
public NetworkSend(String destination, ByteBuffer buffer) {
// 調(diào)用父類的方法初始化
super(destination, sizeBuffer(buffer.remaining()), buffer);
}
// 用來(lái)構(gòu)造4個(gè)字節(jié)的 sizeBuffer
private static ByteBuffer sizeBuffer(int size) {
// 先分配一個(gè)4個(gè)字節(jié)的ByteBuffer
ByteBuffer sizeBuffer = ByteBuffer.allocate(4);
// 寫入size長(zhǎng)度值
sizeBuffer.putInt(size);
// 重置 position
sizeBuffer.rewind();
// 返回 sizeBuffer
return sizeBuffer;
}
}

該類相對(duì)簡(jiǎn)單些,就是構(gòu)建一個(gè)發(fā)往 channel 對(duì)應(yīng)的節(jié)點(diǎn) id 的消息數(shù)據(jù),它的實(shí)例化過(guò)程如下:

  1. 先分配一個(gè)4個(gè)字節(jié)的 ByteBuffer 的變量 sizeBuffer,再把要發(fā)送的數(shù)據(jù)長(zhǎng)度賦值給 sizeBuffer。
  2. 此時(shí) sizeBuffer 的響應(yīng)頭字節(jié)數(shù)和 sizeBuffer 的響應(yīng)數(shù)據(jù)就都有了。
  3. 然后調(diào)用父類 ByteBufferSend 的方法進(jìn)行初始化。

另外 ByteBuffer[] 為兩個(gè) buffer,可以理解為一個(gè)消息頭 buffer 即 size,一個(gè)消息體 buffer。消息頭 buffer 的長(zhǎng)度為4byte,存放的是消息體 buffer 的長(zhǎng)度。而消息體 buffer 是上層傳入的業(yè)務(wù)數(shù)據(jù),所以 send 就是持有一個(gè)待發(fā)送的 ByteBuffer。

接下來(lái)我們來(lái)看看 KafkaChannel 是如何對(duì)上面幾個(gè)類進(jìn)行封裝的。

六、KafkaChannel 封裝過(guò)程

github 源碼地址如下:

https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java。

public class KafkaChannel implements AutoCloseable {
....
// 節(jié)點(diǎn) id
private final String id;
// 傳輸層對(duì)象
private final TransportLayer transportLayer;
....
// 最大能接收請(qǐng)求的字節(jié)數(shù)
private final int maxReceiveSize;
// 內(nèi)存池,用來(lái)分配指定大小的 ByteBuffer
private final MemoryPool memoryPool;
// NetworkReceive 類的實(shí)例
private NetworkReceive receive;
// NetworkSend 類的實(shí)例
private Send send;
// 是否關(guān)閉連接
private boolean disconnected;
....
// 連接狀態(tài)
private ChannelState state;
// 需要連接的遠(yuǎn)端地址
private SocketAddress remoteAddress;
// 初始化
public KafkaChannel(String id, TransportLayer transportLayer, Supplier authenticatorCreator,int maxReceiveSize, MemoryPool memoryPool, ChannelMetadataRegistry metadataRegistry) {
this.id = id;
this.transportLayer = transportLayer;
this.authenticatorCreator = authenticatorCreator;
this.authenticator = authenticatorCreator.get();
this.networkThreadTimeNanos = 0L;
this.maxReceiveSize = maxReceiveSize;
this.memoryPool = memoryPool;
this.metadataRegistry = metadataRegistry;
this.disconnected = false;
this.muteState = ChannelMuteState.NOT_MUTED;
this.state = ChannelState.NOT_CONNECTED;
}
}

我們來(lái)看下這個(gè)類中的幾個(gè)重要字段:

  1. id:channel 對(duì)應(yīng)的節(jié)點(diǎn) id。
  2. transportLayer:傳輸層對(duì)象。
  3. maxReceiveSize:最大能接收請(qǐng)求的字節(jié)數(shù)。
  4. memoryPool:內(nèi)存池,用來(lái)分配指定大小的 ByteBuffer。
  5. receive:NetworkReceive 類的實(shí)例。
  6. send:NetworkSend 類的實(shí)例。
  7. disconnected:是否關(guān)閉連接。
  8. state:KafkaChannel 的狀態(tài)。
  9. remoteAddress:需要連接的遠(yuǎn)端地址。

從屬性可以看出,有3個(gè)最重要的成員變量:TransportLayer、NetworkReceive、Send。KafkaChannel 通過(guò) TransportLayer 進(jìn)行讀寫操作,NetworkReceive 用來(lái)讀取,Send 用來(lái)寫出。

為了封裝普通和加密的Channel「TransportLayer根據(jù)網(wǎng)絡(luò)協(xié)議的不同,提供不同的子類」而對(duì)于 KafkaChannel 提供統(tǒng)一的接口,「這是策略模式很好的應(yīng)用」。

  1. 每個(gè) NetworkReceive 代表一個(gè)單獨(dú)的響應(yīng),KafkaChannel 讀取的數(shù)據(jù)會(huì)存儲(chǔ)到 NetworkReceive 中,當(dāng) NetworkReceive 讀滿,一個(gè)請(qǐng)求就完整讀取了。
  2. 每個(gè) Send 代表一個(gè)單獨(dú)的請(qǐng)求,需要寫出時(shí)只需賦值此變量,之后調(diào)用 write() 方法將其中的數(shù)據(jù)寫出。

介紹完字段后,我們來(lái)深度剖析下其網(wǎng)絡(luò)讀寫操作是如何實(shí)現(xiàn)的?

1、setSend()

public void setSend(Send send) {
if (this.send != null)
throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress, connection id is " + id);
// 設(shè)置要發(fā)送消息的字段
this.send = send;
// 調(diào)用傳輸層增加寫事件
this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
}
// PlaintextTransportLayer 類方法
@Override
public void addInterestOps(int ops) {
//通過(guò) key.interestOps() | ops 來(lái)添加事件
key.interestOps(key.interestOps() | ops);
}

該方法主要用來(lái)預(yù)發(fā)送,即在發(fā)送網(wǎng)絡(luò)請(qǐng)求前,將需要發(fā)送的ByteBuffer 數(shù)據(jù)保存到 KafkaChannel 的 send 中,然后調(diào)用傳輸層方法增加對(duì)這個(gè) channel 上「OP_WRITE」事件的關(guān)注。當(dāng)真正執(zhí)行發(fā)送的時(shí)候,會(huì)從 send 中讀取數(shù)據(jù)。

2、write()

public long write() throws IOException {
// 判斷 send 是否為空,如果為空表示已經(jīng)發(fā)送完畢了
if (send == null)
return 0;
midWrite = true;
// 調(diào)用ByteBufferSend.writeTo把數(shù)據(jù)真正發(fā)送出去
return send.writeTo(transportLayer);
}

該方法主要用來(lái)把保存在 send 上的數(shù)據(jù)真正發(fā)送出去。

  1. 首先判斷要發(fā)送的 send 是否為空,如果為空則表示在 KafkaChannel 的 Buffer 的數(shù)據(jù)都發(fā)送完畢了。
  2. 如果不為空就調(diào)用ByteBufferSend.writeTo() 方法通過(guò)網(wǎng)絡(luò) I/O 操作將數(shù)據(jù)發(fā)送出去。

3、read()

public long read() throws IOException {
// 如果receive為空表示數(shù)據(jù)已經(jīng)讀完,需要重新實(shí)例化對(duì)象
if (receive == null) {

本文名稱:圖解 Kafka 網(wǎng)絡(luò)層實(shí)現(xiàn)機(jī)制(一)
分享網(wǎng)址:http://www.dlmjj.cn/article/dhipood.html