新聞中心
前言
本次繼續(xù)分享 RabbitMQ Client pulish -- 發(fā)送消息,先將 之前分享過的 RabbitMQ 客戶端源碼 - Connection 和 RabbitMQ 客戶端源碼 - Channel 和 發(fā)布消息 - Pulish Message 做個(gè)小總結(jié)(還是基于之前的 Java Client Connecting to RabbitMQ Demo )。

10余年的平塘網(wǎng)站建設(shè)經(jīng)驗(yàn),針對(duì)設(shè)計(jì)、前端、開發(fā)、售后、文案、推廣等六對(duì)一服務(wù),響應(yīng)快,48小時(shí)及時(shí)工作處理。成都全網(wǎng)營銷的優(yōu)勢是能夠根據(jù)用戶設(shè)備顯示端的尺寸不同,自動(dòng)調(diào)整平塘建站的顯示方式,使網(wǎng)站能夠適用不同顯示終端,在瀏覽器中調(diào)整網(wǎng)站的寬度,無論在任何一種瀏覽器上瀏覽網(wǎng)站,都能展現(xiàn)優(yōu)雅布局與設(shè)計(jì),從而大程度地提升瀏覽體驗(yàn)。創(chuàng)新互聯(lián)公司從事“平塘網(wǎng)站設(shè)計(jì)”,“平塘網(wǎng)站推廣”以來,每個(gè)客戶項(xiàng)目都認(rèn)真落實(shí)執(zhí)行。
RabbitMQ pulisher 小總結(jié)
由圖可知 RabbitMQ 發(fā)布消息流程:ConnectionFactory --> Connection --> Channel --> Pulish Message。
Pulish Message 交互抓包
老套路 -- 分享源碼之前先 抓包看看(「助于快速理解」)。
Basic Publish & Ack
抓包可以看到:pulisher(RabbitMQ 消息發(fā)送者) 與 Broker(RabbitMQ Broker)打開 Channel 后,又發(fā)起了 Confirm.Select/Select-Ok -- 通知 Broker 接收到發(fā)布消息需要確認(rèn),因此也就有了后續(xù)的 Basic.Pulish/Ack。
梳理完交互流程 我們開始進(jìn)入今天的主題 Pulish Message。
Pulish Message 源碼分析
「發(fā)布消息總?cè)肟?-- ChannelN.basicPublish()」。
/** Public API - {@inheritDoc} */
@Override
public void basicPublish(String exchange, String routingKey,
boolean mandatory,
BasicProperties props, byte[] body)
throws IOException
{
basicPublish(exchange, routingKey, mandatory, false, props, body);
}
/** Public API - {@inheritDoc} */
@Override
public void basicPublish(String exchange, String routingKey,
boolean mandatory, boolean immediate,
BasicProperties props, byte[] body)
throws IOException
{
// Pulisher 配置了 `Confirm.Select` nextPublishSeqNo 設(shè)置從 1 開始
// 將未確認(rèn)的消息 放入 unconfirmedSet,并 自增加一
if (nextPublishSeqNo > 0) {
unconfirmedSet.add(getNextPublishSeqNo());
nextPublishSeqNo++;
}
BasicProperties useProps = props;
if (props == null) {
useProps = MessageProperties.MINIMAL_BASIC;
}
// 構(gòu)造 AMQCommand 并傳輸
transmit(new AMQCommand(new Basic.Publish.Builder()
.exchange(exchange)
.routingKey(routingKey)
.mandatory(mandatory)
.immediate(immediate)
.build(),
useProps, body));
// 用于指標(biāo)統(tǒng)計(jì)和監(jiān)控,默認(rèn)是 NoOpMetricsCollector,需要配置才會(huì)可以使用 提供的 MicrometerMetricsCollector 和 StandardMetricsCollector(引入對(duì)應(yīng)的包和配置 開箱即可食用~)
metricsCollector.basicPublish(this);
}「構(gòu)造 AMQCommand」
值得一提,RabbitMQ Client 應(yīng)用消息的最小單位是 Frame (幀,在Connection篇提到過),F(xiàn)rame 主要由 type 類型、channel 通道、payload 消息內(nèi)容字節(jié)、accumulator 寫出數(shù)據(jù)、NON_BODY_SIZE 構(gòu)成。
「Frame結(jié)構(gòu)」
public class Frame {
/** Frame type code */
// FRAME_HEARTBEAT :心跳, FRAME_METHOD: 方法, FRAME_HEADER : 頭部信息, FRAME_BODY 內(nèi)容主題
public final int type;
/** Frame channel number, 0-65535 */
// channel 序列號(hào)
public final int channel;
/** Frame payload bytes (for inbound frames) */
// 消息內(nèi)容字節(jié)
private final byte[] payload;
/** Frame payload (for outbound frames) */
// 寫出數(shù)據(jù)
private final ByteArrayOutputStream accumulator;
private static final int NON_BODY_SIZE = 1 /* type */ + 2 /* channel */ + 4 /* payload size */ + 1 /* end character */;
...
}AMQP 0-9-1 特定的 「Command」 讀取,是從一系列幀中累積 方法、頭部和正文。
/**
* Construct a command with a specified method, header and body.
* @param method the wrapped method
* @param contentHeader the wrapped content header
* @param body the message body data
*/
public AMQCommand(com.rabbitmq.client.Method method, AMQContentHeader contentHeader, byte[] body) {
this.assembler = new CommandAssembler((Method) method, contentHeader, body);
}
// AMQP 0-9-1 特定的Command,構(gòu)造 方法、頭部和正文
public CommandAssembler(Method method, AMQContentHeader contentHeader, byte[] body) {
this.method = method;
this.contentHeader = contentHeader;
this.bodyN = new ArrayList(2);
this.bodyLength = 0;
this.remainingBodyBytes = 0;
appendBodyFragment(body);
if (method == null) {
this.state = CAState.EXPECTING_METHOD;
} else if (contentHeader == null) {
this.state = method.hasContent() ? CAState.EXPECTING_CONTENT_HEADER : CAState.COMPLETE;
} else {
this.remainingBodyBytes = contentHeader.getBodySize() - this.bodyLength;
updateContentBodyState();
}
}
「傳輸 AMQCommand -- Channel.transmit()」
public void transmit(AMQCommand c) throws IOException {
synchronized (_channelMutex) {
// 確認(rèn) channel 是否打開(邏輯比較簡單:判斷 shutdownCause 為空即是打開)
ensureIsOpen();
quiescingTransmit(c);
}
}
public void quiescingTransmit(AMQCommand c) throws IOException {
// 防止并發(fā)同時(shí)使用 同一個(gè)channel
synchronized (_channelMutex) {
// 判斷 該消息是否 攜帶content,如果有 需要判斷該 channel是否是阻塞(如果channel state為 `FLOW` 即為 阻塞 _blockContent = true)
if (c.getMethod().hasContent()) {
while (_blockContent) {
try {
_channelMutex.wait();
} catch (InterruptedException ignored) {}
// 防止 從阻塞中被喚醒時(shí),channel 已經(jīng)關(guān)閉(挺好的一個(gè) 多線程操作的案例)
ensureIsOpen();
}
}
c.transmit(this);
}
}「AMQCommand.transmit」
/**
* Sends this command down the named channel on the channel's
* connection, possibly in multiple frames.
* @param channel the channel on which to transmit the command
* @throws IOException if an error is encountered
*/
public void transmit(AMQChannel channel) throws IOException {
// 每個(gè) channel 都有序列號(hào) 從 0開始,(0是特殊的channel)
int channelNumber = channel.getChannelNumber();
AMQConnection connection = channel.getConnection();
synchronized (assembler) {
// 方法:FRAME_HEARTBEAT :心跳, FRAME_METHOD: 方法, FRAME_HEADER : 頭部信息, FRAME_BODY 內(nèi)容主題
Method m = this.assembler.getMethod();
if (m.hasContent()) {
byte[] body = this.assembler.getContentBody();
// FRAME_HEADER : 頭部信息
Frame headerFrame = this.assembler.getContentHeader().toFrame(channelNumber, body.length);
int frameMax = connection.getFrameMax();
boolean cappedFrameMax = frameMax > 0;
int bodyPayloadMax = cappedFrameMax ? frameMax - EMPTY_FRAME_SIZE : body.length;
if (cappedFrameMax && headerFrame.size() > frameMax) {
String msg = String.format("Content headers exceeded max frame size: %d > %d", headerFrame.size(), frameMax);
throw new IllegalArgumentException(msg);
}
// 1. 寫 channelNumber幀 FRAME_METHOD
connection.writeFrame(m.toFrame(channelNumber));
// 2. 寫 頭部信息幀 AMQP.FRAME_HEADER
connection.writeFrame(headerFrame);
// 3. 如果 body過多,會(huì)拆成多個(gè)幀 AMQP.FRAME_BODY
for (int offset = 0; offset < body.length; offset += bodyPayloadMax) {
int remaining = body.length - offset;
int fragmentLength = (remaining < bodyPayloadMax) ? remaining
: bodyPayloadMax;
Frame frame = Frame.fromBodyFragment(channelNumber, body,
offset, fragmentLength);
connection.writeFrame(frame);
}
} else {
// 1. 寫 channelNumber幀 FRAME_METHOD
connection.writeFrame(m.toFrame(channelNumber));
}
}
// 最后刷新 輸出緩沖區(qū)
connection.flush();
}
「最后分析下 connection.writeFrame(frame)」
/**
* Public API - sends a frame directly to the broker.
*/
public void writeFrame(Frame f) throws IOException {
_frameHandler.writeFrame(f);
// lastActivityTime
_heartbeatSender.signalActivity();
}
@Override
public void writeFrame(Frame frame) throws IOException {
synchronized (_outputStream) {
frame.writeTo(_outputStream);
}
}
/**
* Public API - writes this Frame to the given DataOutputStream
*/
public void writeTo(DataOutputStream os) throws IOException {
// 1. 寫type 類型
os.writeByte(type);
// 2. 寫channel 序列號(hào)
os.writeShort(channel);
if (accumulator != null) {
// 3. 寫出數(shù)據(jù)大小
os.writeInt(accumulator.size());
// 4. 輸出數(shù)據(jù)
accumulator.writeTo(os);
} else {
// 3. 寫消息內(nèi)容字節(jié)大小
os.writeInt(payload.length);
// 4. 寫消息內(nèi)容
os.write(payload);
}
// 5. 幀結(jié)束標(biāo)志位
os.write(AMQP.FRAME_END);
}
最后
希望可以讓你們 對(duì)于 RabbitMQ Client 與 RabbitMQ Broker 根據(jù) AMQP協(xié)議 發(fā)布消息 有個(gè)清晰的認(rèn)識(shí),并有助于你們熟悉ChannelN.basicPublish 源碼,當(dāng)然其中還有很多細(xì)節(jié)源碼需要讀者慢慢品味。
新聞標(biāo)題:RabbitMQ 客戶端源碼系列 - Pulish Message
文章URL:http://www.dlmjj.cn/article/ccsggsg.html


咨詢
建站咨詢
