新聞中心
大家好呀,我是樓仔。

創(chuàng)新互聯(lián)是網(wǎng)站建設(shè)技術(shù)企業(yè),為成都企業(yè)提供專業(yè)的成都做網(wǎng)站、網(wǎng)站建設(shè),網(wǎng)站設(shè)計(jì),網(wǎng)站制作,網(wǎng)站改版等技術(shù)服務(wù)。擁有十年豐富建站經(jīng)驗(yàn)和眾多成功案例,為您定制適合企業(yè)的網(wǎng)站。十年品質(zhì),值得信賴!
RabbitMQ 的文章之前寫過(guò),但是當(dāng)時(shí)給的示例是 Demo 版的,這篇文章主要是結(jié)合之前寫的理論知識(shí),將 RabbitMQ 集成到技術(shù)派項(xiàng)目中。
不 BB,上文章目錄:
下面我們先回顧一下理論知識(shí),如果對(duì)這塊知識(shí)已經(jīng)清楚的同學(xué),可以直接跳到實(shí)戰(zhàn)部分。
1. 消息隊(duì)列
1.1 消息隊(duì)列模式
消息隊(duì)列目前主要 2 種模式,分別為“點(diǎn)對(duì)點(diǎn)模式”和“發(fā)布/訂閱模式”。
點(diǎn)對(duì)點(diǎn)模式
一個(gè)具體的消息只能由一個(gè)消費(fèi)者消費(fèi),多個(gè)生產(chǎn)者可以向同一個(gè)消息隊(duì)列發(fā)送消息,但是一個(gè)消息在被一個(gè)消息者處理的時(shí)候,這個(gè)消息在隊(duì)列上會(huì)被鎖住或者被移除并且其他消費(fèi)者無(wú)法處理該消息。
需要額外注意的是,如果消費(fèi)者處理一個(gè)消息失敗了,消息系統(tǒng)一般會(huì)把這個(gè)消息放回隊(duì)列,這樣其他消費(fèi)者可以繼續(xù)處理。
發(fā)布/訂閱模式
單個(gè)消息可以被多個(gè)訂閱者并發(fā)的獲取和處理。一般來(lái)說(shuō),訂閱有兩種類型:
- 臨時(shí)(ephemeral)訂閱:這種訂閱只有在消費(fèi)者啟動(dòng)并且運(yùn)行的時(shí)候才存在。一旦消費(fèi)者退出,相應(yīng)的訂閱以及尚未處理的消息就會(huì)丟失。
- 持久(durable)訂閱:這種訂閱會(huì)一直存在,除非主動(dòng)去刪除。消費(fèi)者退出后,消息系統(tǒng)會(huì)繼續(xù)維護(hù)該訂閱,并且后續(xù)消息可以被繼續(xù)處理。
1.2 RabbitMQ 特征
- 消息路由(支持):RabbitMQ可以通過(guò)不同的交換器支持不同種類的消息路由;
- 消息有序(不支持):當(dāng)消費(fèi)消息時(shí),如果消費(fèi)失敗,消息會(huì)被放回隊(duì)列,然后重新消費(fèi),這樣會(huì)導(dǎo)致消息無(wú)序;
- 消息時(shí)序(非常好):通過(guò)延時(shí)隊(duì)列,可以指定消息的延時(shí)時(shí)間,過(guò)期時(shí)間TTL等;
- 容錯(cuò)處理(非常好):通過(guò)交付重試和死信交換器(DLX)來(lái)處理消息處理故障;
- 伸縮(一般):伸縮其實(shí)沒(méi)有非常智能,因?yàn)榧词股炜s了,master queue還是只有一個(gè),負(fù)載還是只有這一個(gè)master queue去抗,所以我理解RabbitMQ的伸縮很弱(個(gè)人理解)。
- 持久化(不太好):沒(méi)有消費(fèi)的消息,可以支持持久化,這個(gè)是為了保證機(jī)器宕機(jī)時(shí)消息可以恢復(fù),但是消費(fèi)過(guò)的消息,就會(huì)被馬上刪除,因?yàn)镽abbitMQ設(shè)計(jì)時(shí),就不是為了去存儲(chǔ)歷史數(shù)據(jù)的。
- 消息回溯(支持):因?yàn)橄⒉恢С钟谰帽4妫宰匀痪筒恢С只厮荨?/li>
- 高吞吐(中等):因?yàn)樗械恼?qǐng)求的執(zhí)行,最后都是在master queue,它的這個(gè)設(shè)計(jì),導(dǎo)致單機(jī)性能達(dá)不到十萬(wàn)級(jí)的標(biāo)準(zhǔn)。
2. RabbitMQ 原理初探
RabbitMQ 2007 年發(fā)布,是使用 Erlang 語(yǔ)言開發(fā)的開源消息隊(duì)列系統(tǒng),基于 AMQP 協(xié)議來(lái)實(shí)現(xiàn)。
2.1 基本概念
提到RabbitMQ,就不得不提AMQP協(xié)議。AMQP協(xié)議是具有現(xiàn)代特征的二進(jìn)制協(xié)議。是一個(gè)提供統(tǒng)一消息服務(wù)的應(yīng)用層標(biāo)準(zhǔn)高級(jí)消息隊(duì)列協(xié)議,是應(yīng)用層協(xié)議的一個(gè)開放標(biāo)準(zhǔn),為面向消息的中間件設(shè)計(jì)。
先了解一下AMQP協(xié)議中間的幾個(gè)重要概念:
- Server:接收客戶端的連接,實(shí)現(xiàn)AMQP實(shí)體服務(wù)。
- Connection:連接,應(yīng)用程序與Server的網(wǎng)絡(luò)連接,TCP連接。
- Channel:信道,消息讀寫等操作在信道中進(jìn)行??蛻舳丝梢越⒍鄠€(gè)信道,每個(gè)信道代表一個(gè)會(huì)話任務(wù)。
- Message:消息,應(yīng)用程序和服務(wù)器之間傳送的數(shù)據(jù),消息可以非常簡(jiǎn)單,也可以很復(fù)雜。由Properties和Body組成。Properties為外包裝,可以對(duì)消息進(jìn)行修飾,比如消息的優(yōu)先級(jí)、延遲等高級(jí)特性;Body就是消息體內(nèi)容。
- Virtual Host:虛擬主機(jī),用于邏輯隔離。一個(gè)虛擬主機(jī)里面可以有若干個(gè)Exchange和Queue,同一個(gè)虛擬主機(jī)里面不能有相同名稱的Exchange或Queue。
- Exchange:交換器,接收消息,按照路由規(guī)則將消息路由到一個(gè)或者多個(gè)隊(duì)列。如果路由不到,或者返回給生產(chǎn)者,或者直接丟棄。RabbitMQ常用的交換器常用類型有direct、topic、fanout、headers四種,后面詳細(xì)介紹。
- Binding:綁定,交換器和消息隊(duì)列之間的虛擬連接,綁定中可以包含一個(gè)或者多個(gè)RoutingKey。
- RoutingKey:路由鍵,生產(chǎn)者將消息發(fā)送給交換器的時(shí)候,會(huì)發(fā)送一個(gè)RoutingKey,用來(lái)指定路由規(guī)則,這樣交換器就知道把消息發(fā)送到哪個(gè)隊(duì)列。路由鍵通常為一個(gè)“.”分割的字符串,例如“com.rabbitmq”。
- Queue:消息隊(duì)列,用來(lái)保存消息,供消費(fèi)者消費(fèi)。
2.2 工作原理
AMQP 協(xié)議模型由三部分組成:生產(chǎn)者、消費(fèi)者和服務(wù)端,執(zhí)行流程如下:
- 生產(chǎn)者是連接到 Server,建立一個(gè)連接,開啟一個(gè)信道。
- 生產(chǎn)者聲明交換器和隊(duì)列,設(shè)置相關(guān)屬性,并通過(guò)路由鍵將交換器和隊(duì)列進(jìn)行綁定。
- 消費(fèi)者也需要進(jìn)行建立連接,開啟信道等操作,便于接收消息。
- 生產(chǎn)者發(fā)送消息,發(fā)送到服務(wù)端中的虛擬主機(jī)。
- 虛擬主機(jī)中的交換器根據(jù)路由鍵選擇路由規(guī)則,發(fā)送到不同的消息隊(duì)列中。
- 訂閱了消息隊(duì)列的消費(fèi)者就可以獲取到消息,進(jìn)行消費(fèi)。
2.3 常用交換器
RabbitMQ常用的交換器類型有direct、topic、fanout、headers四種:
- Direct Exchange:見(jiàn)文知意,直連交換機(jī)意思是此交換機(jī)需要綁定一個(gè)隊(duì)列,要求該消息與一個(gè)特定的路由鍵完全匹配。簡(jiǎn)單點(diǎn)說(shuō)就是一對(duì)一的,點(diǎn)對(duì)點(diǎn)的發(fā)送。
- Fanout Exchange:這種類型的交換機(jī)需要將隊(duì)列綁定到交換機(jī)上。一個(gè)發(fā)送到交換機(jī)的消息都會(huì)被轉(zhuǎn)發(fā)到與該交換機(jī)綁定的所有隊(duì)列上。很像子網(wǎng)廣播,每臺(tái)子網(wǎng)內(nèi)的主機(jī)都獲得了一份復(fù)制的消息。簡(jiǎn)單點(diǎn)說(shuō)就是發(fā)布訂閱。
- Topic Exchange:直接翻譯的話叫做主題交換機(jī),如果從用法上面翻譯可能叫通配符交換機(jī)會(huì)更加貼切。這種交換機(jī)是使用通配符去匹配,路由到對(duì)應(yīng)的隊(duì)列。通配符有兩種:"*" 、 "#"。需要注意的是通配符前面必須要加上"."符號(hào)。
- *符號(hào):有且只匹配一個(gè)詞。比如 a.*可以匹配到"a.b"、"a.c",但是匹配不了"a.b.c"。
- #符號(hào):匹配一個(gè)或多個(gè)詞。比如"rabbit.#"既可以匹配到"rabbit.a.b"、"rabbit.a",也可以匹配到"rabbit.a.b.c"。
- Headers Exchange:這種交換機(jī)用的相對(duì)沒(méi)這么多。它跟上面三種有點(diǎn)區(qū)別,它的路由不是用routingKey進(jìn)行路由匹配,而是在匹配請(qǐng)求頭中所帶的鍵值進(jìn)行路由。創(chuàng)建隊(duì)列需要設(shè)置綁定的頭部信息,有兩種模式:全部匹配和部分匹配。如上圖所示,交換機(jī)會(huì)根據(jù)生產(chǎn)者發(fā)送過(guò)來(lái)的頭部信息攜帶的鍵值去匹配隊(duì)列綁定的鍵值,路由到對(duì)應(yīng)的隊(duì)列。
3. RabbitMQ環(huán)境搭建
因?yàn)槲矣玫氖荕ac,所以直接可以參考官網(wǎng):
https://www.rabbitmq.com/install-homebrew.html
需要注意的是,一定需要先執(zhí)行:
brew update然后再執(zhí)行:
brew install rabbitmq
之前沒(méi)有執(zhí)行brew update,直接執(zhí)行brew install rabbitmq時(shí),會(huì)報(bào)各種各樣奇怪的錯(cuò)誤,其中“403 Forbidde”居多。
但是在執(zhí)行“brew install rabbitmq”,會(huì)自動(dòng)安裝其它的程序,如果你使用源碼安裝Rabbitmq,因?yàn)閱?dòng)該服務(wù)依賴erlang環(huán)境,所以你還需手動(dòng)安裝erlang,但是目前官方已經(jīng)一鍵給你搞定,會(huì)自動(dòng)安裝Rabbitmq依賴的所有程序,是不是很棒!
最后執(zhí)行成功的輸出如下:
啟動(dòng)服務(wù):
# 啟動(dòng)方式1:后臺(tái)啟動(dòng)
brew services start rabbitmq
# 啟動(dòng)方式2:當(dāng)前窗口啟動(dòng)
cd /usr/local/Cellar/rabbitmq/3.8.19
rabbitmq-server在瀏覽器輸入:
http://localhost:15672/會(huì)出現(xiàn)RabbitMQ后臺(tái)管理界面(用戶名和密碼都為guest):
通過(guò)brew安裝,一行命令搞定,真香!
4. RabbitMQ 集成
4.1 前置工作
添加賬號(hào):
## 添加賬號(hào)
./rabbitmqctl add_user admin admin
## 添加訪問(wèn)權(quán)限
./rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
## 設(shè)置超級(jí)權(quán)限
./rabbitmqctl set_user_tags admin administratorpom 引入依賴:
com.rabbitmq
amqp-client
5.5.1
4.2 代碼實(shí)現(xiàn)
核心代碼
先整一個(gè) ConnectionFactory 單例,每臺(tái)機(jī)器都有自己的 ConnectionFactory,防止每次都初始化(在后面的迭代中,我會(huì)把這個(gè)去掉,整成連接池)。
/**
* @author Louzai
* @date 2023/5/10
*/
public class RabbitmqUtil {
/**
* 每個(gè)key都有自己的工廠
*/
private static Map executors = new ConcurrentHashMap<>();
/**
* 初始化一個(gè)工廠
*
* @param host
* @param port
* @param username
* @param passport
* @param virtualhost
* @return
*/
public static ConnectionFactory init(String host,
Integer port,
String username,
String passport,
String virtualhost) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(passport);
factory.setVirtualHost(virtualhost);
return factory;
}
/**
* 工廠單例,每個(gè)key都有屬于自己的工廠
*
* @param key
* @param host
* @param port
* @param username
* @param passport
* @param virtualhost
* @return
*/
public static ConnectionFactory getOrInitConnectionFactory(String key,
String host,
Integer port,
String username,
String passport,
String virtualhost) {
ConnectionFactory connectionFactory = executors.get(key);
if (null == connectionFactory) {
synchronized (RabbitmqUtil.class) {
connectionFactory = executors.get(key);
if (null == connectionFactory) {
connectionFactory = init(host, port, username, passport, virtualhost);
executors.put(key, connectionFactory);
}
}
}
return connectionFactory;
}
} 獲取 RabbitmqClient:
/**
* @author Louzai
* @date 2023/5/10
*/
@Component
public class RabbitmqClient {
@Autowired
private RabbitmqProperties rabbitmqProperties;
/**
* 創(chuàng)建一個(gè)工廠
* @param key
* @return
*/
public ConnectionFactory getConnectionFactory(String key) {
String host = rabbitmqProperties.getHost();
Integer port = rabbitmqProperties.getPort();
String userName = rabbitmqProperties.getUsername();
String password = rabbitmqProperties.getPassport();
String virtualhost = rabbitmqProperties.getVirtualhost();
return RabbitmqUtil.getOrInitConnectionFactory(key, host, port, userName,password, virtualhost);
}
}重點(diǎn)!敲黑板?。?!這里就是 RabbmitMQ 的核心邏輯了。
我們使用的交換機(jī)類型是 Direct Exchange,此交換機(jī)需要綁定一個(gè)隊(duì)列,要求該消息與一個(gè)特定的路由鍵完全匹配,簡(jiǎn)單點(diǎn)說(shuō)就是一對(duì)一的,點(diǎn)對(duì)點(diǎn)的發(fā)送。
至于為什么不用廣播和主題交換機(jī)模式,因?yàn)榧夹g(shù)派的使用場(chǎng)景就是發(fā)送單個(gè)消息,點(diǎn)到點(diǎn)發(fā)送和消費(fèi)的模式完全可以滿足我們的需求。
下面 3 個(gè)方法都很簡(jiǎn)單:
- 發(fā)送消息:拿到工廠 -> 創(chuàng)建鏈接 -> 創(chuàng)建通道 -> 聲明交換機(jī) -> 發(fā)送消息 -> 關(guān)閉鏈接;
- 消費(fèi)消息:拿到工廠 -> 創(chuàng)建鏈接 -> 創(chuàng)建通道 -> 確定消息隊(duì)列 -> 綁定隊(duì)列到交換機(jī) -> 接受并消費(fèi)消息;
- 消費(fèi)消息永動(dòng)模式:非阻塞模式消費(fèi) RabbitMQ 消息。
@Component
public class RabbitmqServiceImpl implements RabbitmqService {
@Autowired
private RabbitmqClient rabbitmqClient;
@Autowired
private NotifyService notifyService;
@Override
public void publishMsg(String exchange,
BuiltinExchangeType exchangeType,
String toutingKey,
String message) throws IOException, TimeoutException {
ConnectionFactory factory = rabbitmqClient.getConnectionFactory(toutingKey);
// TODO: 這種并發(fā)量起不來(lái),需要改造成連接池
//創(chuàng)建連接
Connection connection = factory.newConnection();
//創(chuàng)建消息通道
Channel channel = connection.createChannel();
// 聲明exchange中的消息為可持久化,不自動(dòng)刪除
channel.exchangeDeclare(exchange, exchangeType, true, false, null);
// 發(fā)布消息
channel.basicPublish(exchange, toutingKey, null, message.getBytes());
System.out.println("Publish msg:" + message);
channel.close();
connection.close();
}
@Override
public void consumerMsg(String exchange,
String queue,
String routingKey) throws IOException, TimeoutException {
ConnectionFactory factory = rabbitmqClient.getConnectionFactory(routingKey);
// TODO: 這種并發(fā)量起不來(lái),需要改造成連接池
//創(chuàng)建連接
Connection connection = factory.newConnection();
//創(chuàng)建消息信道
final Channel channel = connection.createChannel();
//消息隊(duì)列
channel.queueDeclare(queue, true, false, false, null);
//綁定隊(duì)列到交換機(jī)
channel.queueBind(queue, exchange, routingKey);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Consumer msg:" + message);
// 獲取Rabbitmq消息,并保存到DB
// 說(shuō)明:這里僅作為示例,如果有多種類型的消息,可以根據(jù)消息判定,簡(jiǎn)單的用 if...else 處理,復(fù)雜的用工廠 + 策略模式
notifyService.saveArticleNotify(JsonUtil.toObj(message, UserFootDO.class), NotifyTypeEnum.PRAISE);
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 取消自動(dòng)ack
channel.basicConsume(queue, false, consumer);
}
@Override
public void processConsumerMsg() {
System.out.println("Begin to processConsumerMsg.");
Integer stepTotal = 1;
Integer step = 0;
// TODO: 這種方式非常 Low,后續(xù)會(huì)改造成阻塞 I/O 模式
while (true) {
step ++;
try {
System.out.println("processConsumerMsg cycle.");
consumerMsg(CommonConstants.EXCHANGE_NAME_DIRECT, CommonConstants.QUERE_NAME_PRAISE,
CommonConstants.QUERE_KEY_PRAISE);
if (step.equals(stepTotal)) {
Thread.sleep(10000);
step = 0;
}
} catch (Exception e) {
}
}
}
}這里只是給個(gè)示例,如果要真正用到生產(chǎn)環(huán)境,你覺(jué)得有哪些問(wèn)題呢? 你自己先想想,文末再告訴你。
調(diào)用入口
其實(shí)之前我們是通過(guò) Java 的內(nèi)置異步調(diào)用方式,為了方便驗(yàn)證,我把文章點(diǎn)贊的功能遷移到 RabbitMQ 中,只要是點(diǎn)贊,就走 RabbitMQ 模式。
// 點(diǎn)贊消息走 RabbitMQ,其它走 Java 內(nèi)置消息機(jī)制
if (notifyType.equals(NotifyTypeEnum.PRAISE) && rabbitmqProperties.getSwitchFlag()) {
rabbitmqService.publishMsg(
CommonConstants.EXCHANGE_NAME_DIRECT,
BuiltinExchangeType.DIRECT,
CommonConstants.QUERE_KEY_PRAISE,
JsonUtil.toStr(foot));
} else {
Optional.ofNullable(notifyType).ifPresent(notify -> SpringUtil.publishEvent(new NotifyMsgEvent<>(this, notify, foot)));
}那消費(fèi)入口放哪里呢?其實(shí)是在程序啟動(dòng)的時(shí)候,我們就啟動(dòng) RabbitMQ 進(jìn)行消費(fèi),然后整個(gè)進(jìn)程一直在程序中跑。
@Override
public void run(ApplicationArguments args) {
// 設(shè)置類型轉(zhuǎn)換, 主要用于mybatis讀取varchar/json類型數(shù)據(jù)據(jù),并寫入到j(luò)son格式的實(shí)體Entity中
JacksonTypeHandler.setObjectMapper(new ObjectMapper());
// 應(yīng)用啟動(dòng)之后執(zhí)行
GlobalViewConfig config = SpringUtil.getBean(GlobalViewConfig.class);
if (webPort != null) {
config.setHost("http://127.0.0.1:" + webPort);
}
// 啟動(dòng) RabbitMQ 進(jìn)行消費(fèi)
if (rabbitmqProperties.getSwitchFlag()) {
taskExecutor.execute(() -> rabbitmqService.processConsumerMsg());
}
log.info("啟動(dòng)成功,點(diǎn)擊進(jìn)入首頁(yè): {}", config.getHost());
}
4.3 演示一下
我們多次點(diǎn)擊“點(diǎn)贊”按鈕,觸發(fā) RammitMQ 消息發(fā)送。
可以通過(guò)日志,也可以看到發(fā)送和消費(fèi)過(guò)的消息。
我靠!好多沒(méi)有關(guān)閉的鏈接。
還有一堆沒(méi)有關(guān)閉的 channel。
估計(jì)再多跑一會(huì),內(nèi)存全部吃光,機(jī)器就死機(jī)了,怎么破?答案是連接池!
4.4 代碼分支
為了方便大家學(xué)習(xí)功能演變的過(guò)程,每個(gè)模塊都會(huì)單獨(dú)開個(gè)分支,包括后面的升級(jí)版:
- 代碼倉(cāng)庫(kù):https://github.com/itwanger/paicoding
- 代碼分支:feature/add_rabbitmq_20230506
如果需要運(yùn)行 RabbitMQ,下面的配置需要改成 true,因?yàn)榇a默認(rèn)是 false。
5 后記
這篇文章,讓大家知道 RabbitMQ 的基本原理,以及如何去集成 RabbitMQ,但是還不能用到實(shí)際生產(chǎn)環(huán)境,但是這個(gè)確實(shí)是我寫的第一個(gè)版本,存粹是搞著玩的,因?yàn)槔锩娲嬖诘膯?wèn)題還非常多。
我簡(jiǎn)單列舉一下:
- 需要給 Connection 加個(gè)連接池,否則內(nèi)存會(huì)持續(xù)消耗,機(jī)器肯定扛不??;
- 需要對(duì) RabbitMQ 的消費(fèi)方式進(jìn)行改造,因?yàn)?while + sleep 的方式過(guò)于簡(jiǎn)單粗暴;
- 假如消費(fèi)的任務(wù)掛掉了,你需要有重啟 RabbitMQ 的消費(fèi)機(jī)制;
- 假如機(jī)器掛了,重啟后,RabbitMQ 內(nèi)部的消息不能丟失。
如果你對(duì)上面的問(wèn)題也非常感興趣,可以直接基于分支 feature/add_rabbitmq_20230506,然后給我提 PR,技術(shù)嘛,我喜歡邊玩邊學(xué)。
新聞標(biāo)題:從原理到實(shí)戰(zhàn),手把手教你在項(xiàng)目中使用RabbitMQ
當(dāng)前URL:http://www.dlmjj.cn/article/dpicgpi.html


咨詢
建站咨詢
