日本综合一区二区|亚洲中文天堂综合|日韩欧美自拍一区|男女精品天堂一区|欧美自拍第6页亚洲成人精品一区|亚洲黄色天堂一区二区成人|超碰91偷拍第一页|日韩av夜夜嗨中文字幕|久久蜜综合视频官网|精美人妻一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
消息服務(wù):項目整合RocketMQ

在《??SpringCloud Alibaba實戰(zhàn)??》專欄前面的文章中,我們實現(xiàn)了用戶微服務(wù)、商品微服務(wù)和訂單微服務(wù)之間的遠程調(diào)用,并且實現(xiàn)了服務(wù)調(diào)用的負載均衡。也基于阿里開源的Sentinel實現(xiàn)了服務(wù)的限流與容錯,并詳細介紹了Sentinel的核心技術(shù)與配置規(guī)則。

成都創(chuàng)新互聯(lián)長期為數(shù)千家客戶提供的網(wǎng)站建設(shè)服務(wù),團隊從業(yè)經(jīng)驗10年,關(guān)注不同地域、不同群體,并針對不同對象提供差異化的產(chǎn)品和服務(wù);打造開放共贏平臺,與合作伙伴共同營造健康的互聯(lián)網(wǎng)生態(tài)環(huán)境。為吉陽企業(yè)提供專業(yè)的做網(wǎng)站、成都網(wǎng)站建設(shè),吉陽網(wǎng)站改版等技術(shù)服務(wù)。擁有10多年豐富建站經(jīng)驗和眾多成功案例,為您定制開發(fā)。

簡單介紹了服務(wù)網(wǎng)關(guān),并對SpringCloud Gateway的核心架構(gòu)進行了簡要說明,也在項目中整合了SpringCloud Gateway網(wǎng)關(guān)實現(xiàn)了通過網(wǎng)關(guān)訪問后端微服務(wù)。

同時,也基于SpringCloud Gateway整合Sentinel實現(xiàn)了網(wǎng)關(guān)的限流功能,詳細介紹了SpringCloud Gateway網(wǎng)關(guān)的核心技術(shù)。在鏈路追蹤章節(jié),我們開始簡單介紹了分布式鏈路追蹤技術(shù)與解決方案,隨后在項目中整合Sleuth實現(xiàn)了鏈路追蹤,并使用Sleuth整合ZipKin實現(xiàn)了分布式鏈路追蹤的可視化 。

在消息服務(wù)章節(jié),我們介紹了MQ的使用場景,引入MQ后的注意事項以及MQ的選型對比。接下來,我們就在項目中整合RocketMQ。

本章總覽?

RocketMQ環(huán)境準備?

RocketMQ是阿里開源的消息中間件,目前是Apache下的頂級項目。正式在項目中接入RocketMQ之前,我們需要搭建RocketMQ的環(huán)境。這里呢,我把搭建RocketMQ的基礎(chǔ)環(huán)境分為兩個部分:搭建RocketMQ環(huán)境和搭建RocketMQ控制臺。

「注意:冰河這里都是先下載RocketMQ的源碼和RocketMQ控制臺的源碼,然后對源碼進行編譯后,再搭建的。目的也是讓小伙伴們能夠跟著冰河實現(xiàn)手動編譯RocketMQ的源碼,另外,編譯RocketMQ源碼和控制臺源碼需要JDK1.8+Maven?!?/p>

源碼編譯安裝RocketMQ

(1)到鏈接https://github.com/apache/rocketmq/releases/tag/rocketmq-all-4.9.3下載RocketMQ 4.9.3版本的源碼。下載并解壓后的源碼如下所示。

(2)打開cmd命令行,進入RocketMQ的解壓目錄,我這里是E:\Application\RocketMQ\rocketmq-rocketmq-all-4.9.3目錄,然后在cmd命令行輸入如下命令開始編譯打包。

mvn clean install -Dmaven.test.skip=true -Prelease-all

編譯過程如下所示。

編譯打包成功后,如下圖所示。

(3)編譯成功后,會在RocketMQ解壓目錄下的distribution目錄下的target目錄下生成RocketMQ的安裝包,在我電腦上的目錄就是:E:\Application\RocketMQ\rocketmq-rocketmq-all-4.9.3\distribution\target。如下所示。

這樣,我們就自己下載RocketMQ的源碼,并打包成功了。

注意:這里,為了方便,我還是將RocketMQ部署到我本機Windows操作系統(tǒng)上,小伙伴們也可以將之前的Nacos、Sentinel和這次的RocketMQ都部署在Linux操作系統(tǒng)上,部署方式幾乎與在Windows操作系統(tǒng)一樣,這里,冰河就不再贅述了?!?/p>

(4)將編譯出的安裝包,解壓到電腦的某個目錄下,例如我解壓后的目錄為:E:\Application\microservices\RocketMQ\rocketmq-4.9.3。

(5)在RocketMQ的解壓目錄下的conf目錄下修改broker.conf文件,修改后的文件內(nèi)容如下所示。

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
# 自動創(chuàng)建Topic
autoCreateTopicEnable=true
# nameServ地址
namesrvAddr=127.0.0.1:9876
# 存儲路徑
storePathRootDir=E:/RocketMQ/data/rocketmq/dataDir
# commitLog路徑
storePathCommitLog=E:/RocketMQ/data/rocketmq/dataDir/commitlog
# 消息隊列存儲路徑
storePathConsumeQueue=E:/RocketMQ/data/rocketmq/dataDir/consumequeue
# 消息索引存儲路徑
storePathIndex=E:/RocketMQ/data/rocketmq/dataDir/index
# checkpoint文件路徑
storeCheckpoint=E:/RocketMQ/data/rocketmq/dataDir/checkpoint
# abort文件存儲路徑
abortFile=E:/RocketMQ/data/rocketmq/dataDir/abort

小伙伴們可以根據(jù)自己的實際情況,自行修改上述文件中配置的目錄地址。

(6)非常重要的一步,在啟動RocketMQ之前,需要配置下ROCKETMQ_HOME環(huán)境變量,否則在啟動RocketMQ的時候,會提示如下錯誤信息。

E:\Application\microservices\RocketMQ\rocketmq-4.9.3\bin>mqnamesrv.cmd
Please set the ROCKETMQ_HOME variable in your environment!

「提示:設(shè)置ROCKETMQ_HOME環(huán)境變量?!?/strong>?

接下來,就在系統(tǒng)環(huán)境變量中,設(shè)置下ROCKETMQ_HOME的環(huán)境變量,如下所示。

(7)配置完RocketMQ的環(huán)境變量后,打開cmd命令行,進入RocketMQ的bin目錄,例如,我電腦的目錄是:E:\Application\microservices\RocketMQ\rocketmq-4.9.3\bin。執(zhí)行??mqnamesrv.cmd??命令啟動NameServer,如下所示。

打印出如下信息,說明RocketMQ的NameServer啟動成功了。

The Name Server boot success. serializeType=JSON

(8)重新打開一個cmd命令行,進入RocketMQ的bin目錄,輸入??mqbroker.cmd -n localhost:9876??命令啟動RocketMQ的Broker服務(wù),如下所示。

打印出如下信息,說明RocketMQ的Broker服務(wù)啟動成功了。

boot success. serializeType=JSON and name server is localhost:9876

測試RocketMQ環(huán)境

RocketMQ內(nèi)置了大量的測試案例,并且這些測試案例可以通過RocketMQ的bin目錄下的tools.cmd命令進行測試。接下來,我們就使用RocketMQ自帶的tools.cmd命令測試RocketMQ的環(huán)境。

(1)啟動生產(chǎn)者程序向RocketMQ發(fā)送消息。

重新打開cmd命令行,進入RocketMQ的bin目錄,在命令行輸入如下命令調(diào)用RocketMQ自帶的生產(chǎn)者程序向RocketMQ發(fā)送消息。

set NAMESRV_ADDR=localhost:9876
tools.cmd org.apache.rocketmq.example.quickstart.Producer

可以看到,執(zhí)行完上述兩條命令后,生產(chǎn)者程序開始向RocketMQ發(fā)送消息。

(2)啟動消費者程序消費RocketMQ中的消息。

重新打開cmd命令行,進入RocketMQ的bin目錄,在命令行輸入如下命令調(diào)用RocketMQ自帶的消費者程序消費RocketMQ中的消息。

set NAMESRV_ADDR=localhost:9876
tools.cmd org.apache.rocketmq.example.quickstart.Consumer

可以看到,執(zhí)行完上述兩條命令后,消費者程序開始消費RocketMQ中的消息。

說明我們使用源碼編譯搭建RocketMQ環(huán)境成功了。

源碼編譯RocketMQ控制臺

這里需要注意的是:RocketMQ控制臺本質(zhì)上是一個SpringBoot程序,啟動后默認監(jiān)聽的端口是8080。RocketMQ的新版控制臺已經(jīng)從RocketMQ的rocketmq-externals項目中分離出來了。也就是說,新版的RocketMQ控制臺已經(jīng)從https://github.com/apache/rocketmq-externals鏈接所示的項目中分離出來,新版控制臺的鏈接地址為:https://github.com/apache/rocketmq-dashboard。

(1)從鏈接https://github.com/apache/rocketmq-dashboard下載新版的RocketMQ控制臺源碼。下載后解壓。

(2)進入到RocketMQ控制臺源碼解壓目錄的src/main/resources目錄下,編輯application.yml文件,修改??namesrvAddrs??地址,去掉多余的namesrvAddrs地址。

application.yml文件中原來的配置如下所示。

rocketmq:
config:
# if this value is empty,use env value rocketmq.config.namesrvAddr NAMESRV_ADDR | now, default localhost:9876
# configure multiple namesrv addresses to manage multiple different clusters
namesrvAddrs:
- 127.0.0.1:9876
- 127.0.0.2:9876

將127.0.0.2:9876刪除或者注釋掉,如下所示。

rocketmq:
config:
# if this value is empty,use env value rocketmq.config.namesrvAddr NAMESRV_ADDR | now, default localhost:9876
# configure multiple namesrv addresses to manage multiple different clusters
namesrvAddrs:
- 127.0.0.1:9876
# - 127.0.0.2:9876

RocketMQ控制臺啟動時默認監(jiān)聽的端口是8080,由于我們項目中訂單微服務(wù)監(jiān)聽的端口也是8080,所以,將RocketMQ控制臺監(jiān)聽的端口修改為10003,修改前的配置如下所示。

server:
port: 8080

修改后的配置如下所示。

server:
port: 10003

(3)修改完application.yml文件后,打開cmd命令行,進入RocketMQ控制臺源碼的根目錄,輸入如下Maven命令開始編譯RocketMQ控制臺的源碼。

mvn clean install -Dmaven.test.skip=true

編譯過程如下所示。

(4)編譯完成后,會在RocketMQ控制臺源碼的根目錄下生成target目錄,如下所示。

進入target目錄后,可以看到生成了rocketmq-dashboard-1.0.1-SNAPSHOT.jar文件,如下所示。

這個jar文件就是RocketMQ控制臺的運行文件。

(5)重新打開cmd命令行,進入rocketmq-dashboard-1.0.1-SNAPSHOT.jar文件所在的命令,在命令行直接輸入如下命令啟動RocketMQ控制臺程序。

java -jar rocketmq-dashboard-1.0.1-SNAPSHOT.jar

驗證RocketMQ控制臺

在瀏覽器中輸入??http://localhost:10003??后,出現(xiàn)如下畫面說明RocketMQ啟動成功。

界面默認是英文,我們也可以點擊右上角的??changeLanguage??切換語言,切換成中文顯示,如下所示。

選擇主題菜單想后如下所示。

可以看到目前RocketMQ中存在一個名稱為TopicTest的主題,點擊TopicTest主題的狀態(tài)按鈕,如下所示。

會顯示TopicTest主題的消息隊列信息,如下所示。

可以看到,正確顯示出了TopicTest主題的消息隊列信息,說明RocketMQ控制臺啟動成功了。

編碼測試RocketMQ?

我們使用RocketMQ自帶的生產(chǎn)者和消費者程序?qū)崿F(xiàn)了消息的生成與消費,為了讓小伙伴們能夠更加直觀的感受到消息中間件在項目中的作用,接下來,我們自己編碼測試下RocketMQ。

導(dǎo)入RocketMQ依賴

在用戶微服務(wù)shop-user的pom.xml中,添加RocketMQ相關(guān)的依賴,如下所示。


org.apache.rocketmq
rocketmq-spring-boot-starter
2.0.3


org.apache.rocketmq
rocketmq-client
4.5.2

編寫生產(chǎn)者代碼

在用戶微服務(wù)的sec/test/java目錄下新建??io.binghe.shop.rocketmq.test??包,在包下創(chuàng)建RocketMQProducer類,作為RocketMQ的生產(chǎn)者,代碼如下所示。

/**
* @author binghe
* @version 1.0.0
* @description RocketMQ生產(chǎn)者
*/
public class RocketMQProducer {

public static void main(String[] args) throws Exception {
//創(chuàng)建消息生產(chǎn)者
DefaultMQProducer producer = new DefaultMQProducer("bingheProducerGroup");
//設(shè)置NameServer地址
producer.setNamesrvAddr("127.0.0.1:9876");
//啟動生產(chǎn)者
producer.start();
//構(gòu)建消息對象
Message message = new Message("bingheTopic", "bingheTag", "Hello RocketMQ".getBytes());
System.out.println("生產(chǎn)者發(fā)出的消息為:" + JSONObject.toJSONString(message));
//發(fā)送消息并接收結(jié)果
SendResult sendResult = producer.send(message);
//打印結(jié)果信息
System.out.println("生產(chǎn)者收到的發(fā)送結(jié)果信息為:" + JSONObject.toJSONString(sendResult));
//關(guān)閉生產(chǎn)者
producer.shutdown();
}
}

生產(chǎn)者的代碼比較簡單,這里就不再贅述了。

編寫消費者代碼

在??io.binghe.shop.rocketmq.test??包下新建RocketMQConsumer類,作為RocketMQ的消費者,代碼如下所示。

/**
* @author binghe
* @version 1.0.0
* @description RocketMQ消費者
*/
public class RocketMQConsumer {
public static void main(String[] args) throws Exception {
try{
//創(chuàng)建消息消費者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("bingheConsumerGroup");
//設(shè)置NameServer地址
consumer.setNamesrvAddr("127.0.0.1:9876");
//訂閱bingheTopic主題
consumer.subscribe("bingheTopic", "*");
//設(shè)置消息監(jiān)聽,當(dāng)收到消息時RocketMQ會回調(diào)消息監(jiān)聽
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List list,
ConsumeConcurrentlyContext consumeConcurrentlyContext) {
//打印消息消費者收到的RocketMQ消息
System.out.println("消費者收到的消息為:" + list);
//返回消息消費成功的標識
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//啟動消費者
consumer.start();
System.out.println("消費者啟動成功");
}catch (Exception e){
e.printStackTrace();
}
}
}

測試消息的生產(chǎn)與消費

(1)為了便于觀察,這里我們先啟動消費者程序RocketMQConsumer,啟動RocketMQConsumer后會在IDEA的控制臺打印如下信息。

消費者啟動成功

說明消費者啟動成功了。

(2)運行生產(chǎn)者程序RocketMQProducer,運行后RocketMQProducer程序控制臺會輸出如下信息。

生產(chǎn)者發(fā)出的消息為:{"body":"SGVsbG8gUm9ja2V0TVE=","delayTimeLevel":0,"flag":0,"properties":{"WAIT":"true","TAGS":"bingheTag"},"tags":"bingheTag","topic":"bingheTopic","waitStoreMsgOK":true}
生產(chǎn)者收到的發(fā)送結(jié)果信息為:{"messageQueue":{"brokerName":"DESKTOP-PSKC7T1","queueId":1,"topic":"bingheTopic"},"msgId":"C0A8006F538418B4AAC25B9EDDAC0000","offsetMsgId":"C0A8B80100002A9F0000000000036B16","queueOffset":2,"regionId":"DefaultRegion","sendStatus":"SEND_OK","traceOn":true}

說明生產(chǎn)者程序RocketMQProducer成功將消息發(fā)送到RocketMQ。

(3)接下來,再看下消費者程序RocketMQConsumer的控制臺,如下所示。

消費者收到的消息為:[MessageExt [queueId=1, storeSize=206, queueOffset=2, sysFlag=0, bornTimestamp=1652871538093, bornHost=/192.168.184.1:52915, storeTimestamp=1652871538099, storeHost=/192.168.184.1:10911, msgId=C0A8B80100002A9F0000000000036B16, commitLogOffset=224022, bodyCRC=1774740973, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='bingheTopic', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=3, CONSUME_START_TIME=1652871538103, UNIQ_KEY=C0A8006F538418B4AAC25B9EDDAC0000, CLUSTER=DefaultCluster, WAIT=true, TAGS=bingheTag}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81], transactionId='null'}]]

說明生成者發(fā)送到RocketMQ的消息,被消費者成功消費到了。

項目整合RocketMQ?

我們在項目中模擬一個用戶成功下單后,為用戶發(fā)送通知,通知用戶下單成功的邏輯,具體的流程就是下單成功后將訂單的信息發(fā)送到RocketMQ,然后用戶微服務(wù)訂閱RocketMQ的消息,接收到消息后進行打印。

用戶微服務(wù)整合RocketMQ

(1)編碼測試RocketMQ時,導(dǎo)入了RocketMQ的依賴,這里就不用再次導(dǎo)入了。

(2)在用戶微服務(wù)shop-user的application.yml文件中添加如下RocketMQ的配置。

rocketmq:
name-server: 127.0.0.1:9876

(3)在用戶微服務(wù)shop-user中創(chuàng)建??io.binghe.shop.user.rocketmq??包,在包下創(chuàng)建RocketConsumeListener,實現(xiàn)org.apache.rocketmq.spring.core.RocketMQListener接口,具體代碼如下所示。

/**
* @author binghe
* @version 1.0.0
* @description 監(jiān)聽消費
*/
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "user-group", topic = "order-topic")
public class RocketConsumeListener implements RocketMQListener {
@Override
public void onMessage(Order order) {
log.info("用戶微服務(wù)收到了訂單信息:{}", JSONObject.toJSONString(order));
}
}

其中,RocketConsumeListener類上的@RocketMQMessageListener注解,表示當(dāng)前類是一個RocketMQ的消費者,在@RocketMQMessageListener注解中配置了消費者組為user-group,主題為order-topic。

至此,用戶微服務(wù)整合RocketMQ完畢。

訂單微服務(wù)整合RocketMQ

(1)在訂單微服務(wù)shop-order的pom.xml文件中添加RocketMQ的依賴,如下所示。


org.apache.rocketmq
rocketmq-spring-boot-starter
2.0.3



org.apache.rocketmq
rocketmq-client
4.5.2

(2)在訂單微服務(wù)shop-order的application.yml文件中添加如下配置。

rocketmq:
name-server: 127.0.0.1:9876
producer:
group: order-group

(3)將??io.binghe.shop.order.service.impl.OrderServiceV6Impl???類,復(fù)制一份成??io.binghe.shop.order.service.impl.OrderServiceV7Impl???類,接下來,在??io.binghe.shop.order.service.impl.OrderServiceV7Impl??類中操作。

將??io.binghe.shop.order.service.impl.OrderServiceV7Impl??類上的@Service注解中的名稱修改為orderServiceV7,如下所示。

@Slf4j
@Service("orderServiceV7")
public class OrderServiceV7Impl implements OrderService {
//省略具體代碼
}

(4)在??io.binghe.shop.order.service.impl.OrderServiceV7Impl??類中,注入RocketMQTemplate對象,如下所示。

@Autowired
private RocketMQTemplate rocketMQTemplate;

(5)在??io.binghe.shop.order.service.impl.OrderServiceV7Impl#saveOrder()??方法中,提交訂單成功后將訂單信息寫入RocketMQ,如下所示。

@Override
@Transactional(rollbackFor = Exception.class)
public void saveOrder(OrderParams orderParams) {
//省略上面所有代碼
rocketMQTemplate.convertAndSend("order-topic", order);
}

(6)在??io.binghe.shop.order.controller.OrderController??中,將注入的OrderService的名稱修改成orderServiceV7,如下所示。

@Autowired
@Qualifier(value = "orderServiceV7")
private OrderService orderService;

「注意:訂單微服務(wù)shop-order中,修改后的代碼見源碼工程,冰河在這里不再粘貼完整的源代碼?!?/p>

測試項目整合的RocketMQ

(1)分別啟動Nacos,Sentinel,ZipKin和RocketMQ。

(2)分別啟動用戶微服務(wù)、商品微服務(wù)、訂單微服務(wù)和網(wǎng)關(guān)服務(wù)。

(3)在瀏覽器中輸入??localhost:10001/server-order/order/submit_order?userId=1001&productId=1001&count=1??,如下所示。

(4)查看用戶微服務(wù)shop-user的控制臺,發(fā)現(xiàn)會輸出訂單的信息,如下所示。

2022-05-18 20:37:26.440  INFO [server-user,,,] 18064 --- [MessageThread_1] i.b.s
                                                網(wǎng)頁名稱:消息服務(wù):項目整合RocketMQ                                                
URL地址:http://www.dlmjj.cn/article/cccihed.html