日本综合一区二区|亚洲中文天堂综合|日韩欧美自拍一区|男女精品天堂一区|欧美自拍第6页亚洲成人精品一区|亚洲黄色天堂一区二区成人|超碰91偷拍第一页|日韩av夜夜嗨中文字幕|久久蜜综合视频官网|精美人妻一区二区三区

RELATEED CONSULTING
相關(guān)咨詢(xún)
選擇下列產(chǎn)品馬上在線(xiàn)溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問(wèn)題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營(yíng)銷(xiāo)解決方案
SpringBoot整合RocketMQ事務(wù)/廣播/順序消息

環(huán)境:springboot2.3.9RELEASE + RocketMQ4.8.0

依賴(lài)

 
 
 
 
  1.  
  2.   org.springframework.boot 
  3.     spring-boot-starter-web 
  4.  
  5.  
  6.     org.apache.rocketmq 
  7.     rocketmq-spring-boot-starter 
  8.     2.2.0 
  9.  

配置文件

 
 
 
 
  1. server: 
  2.   port: 8080 
  3. --- 
  4. rocketmq: 
  5.   nameServer: localhost:9876 
  6.   producer: 
  7.     group: demo-mq 

普通消息

發(fā)送

 
 
 
 
  1. @Resource 
  2. private RocketMQTemplate rocketMQTemplate ; 
  3.      
  4. public void send(String message) { 
  5.   rocketMQTemplate.convertAndSend("test-topic:tag2", MessageBuilder.withPayload(message).build()); 

接受

 
 
 
 
  1. @RocketMQMessageListener(topic = "test-topic", consumerGroup = "consumer01-group", selectorExpression = "tag1 || tag2") 
  2. @Component 
  3. public class ConsumerListener implements RocketMQListener { 
  4.  
  5.     @Override 
  6.     public void onMessage(String message) { 
  7.         System.out.println("接收到消息:" + message) ; 
  8.     } 
  9.  

順序消息

發(fā)送

 
 
 
 
  1. @Resource 
  2. private RocketMQTemplate rocketMQTemplate ; 
  3.  
  4. public void sendOrder(String topic, String message, String tags, int id) { 
  5.     rocketMQTemplate.asyncSendOrderly(topic + ":" + tags, MessageBuilder.withPayload(message).build(),  
  6.             "order-" + id, new SendCallback() { 
  7.                 @Override 
  8.                 public void onSuccess(SendResult sendResult) { 
  9.                     System.err.println("msg-id: " + sendResult.getMsgId() + ": " + message +"\tqueueId: " + sendResult.getMessageQueue().getQueueId()) ; 
  10.                 } 
  11.                 @Override 
  12.                 public void onException(Throwable e) { 
  13.                     e.printStackTrace() ; 
  14.                 } 
  15.             }); 

這里是根據(jù)hashkey將消息發(fā)送到不同的隊(duì)列中

 
 
 
 
  1. @RocketMQMessageListener(topic = "order-topic", consumerGroup = "consumer02-group",  
  2.     selectorExpression = "tag3 || tag4", consumeMode = ConsumeMode.ORDERLY) 
  3. @Component 
  4. public class ConsumerOrderListener implements RocketMQListener { 
  5.  
  6.     @Override 
  7.     public void onMessage(String message) { 
  8.         System.out.println(Thread.currentThread().getName() + " 接收到Order消息:" + message) ; 
  9.     } 
  10.  

consumeMode = ConsumeMode.ORDERLY,指明了消息模式為順序模式,一個(gè)隊(duì)列,一個(gè)線(xiàn)程。

結(jié)果

當(dāng)consumeMode = ConsumeMode.CONCURRENTLY執(zhí)行結(jié)果如下:

集群/廣播消息模式

發(fā)送端

 
 
 
 
  1. @Resource 
  2. private RocketMQTemplate rocketMQTemplate ; 
  3.      
  4. public void send(String topic, String message, String tags) { 
  5.     rocketMQTemplate.send(topic + ":" + tags, MessageBuilder.withPayload(message).build()) ; 

集群消息模式

消費(fèi)端

 
 
 
 
  1. @RocketMQMessageListener(topic = "broad-topic", consumerGroup = "consumer03-group",  
  2.     selectorExpression = "tag6 || tag7", messageModel = MessageModel.CLUSTERING) 
  3. @Component 
  4. public class ConsumerBroadListener implements RocketMQListener { 
  5.  
  6.     @Override 
  7.     public void onMessage(String message) { 
  8.         System.out.println("ConsumerBroadListener1接收到消息:" + message) ; 
  9.     } 
  10.  

messageModel = MessageModel.CLUSTERING

測(cè)試

啟動(dòng)兩個(gè)服務(wù)分別端口是8080,8081

8080服務(wù)

8081服務(wù)

集群消息模式下,每個(gè)服務(wù)分別接收一部分消息,實(shí)現(xiàn)了負(fù)載均衡

廣播消息模式

消費(fèi)端

 
 
 
 
  1. @RocketMQMessageListener(topic = "broad-topic", consumerGroup = "consumer03-group",  
  2.     selectorExpression = "tag6 || tag7", messageModel = MessageModel.BROADCASTING) 
  3. @Component 
  4. public class ConsumerBroadListener implements RocketMQListener { 
  5.  
  6.     @Override 
  7.     public void onMessage(String message) { 
  8.         System.out.println("ConsumerBroadListener1接收到消息:" + message) ; 
  9.     } 
  10.  

messageModel = MessageModel.BROADCASTING

測(cè)試

啟動(dòng)兩個(gè)服務(wù)分別端口是8080,8081

8080服務(wù)

8081服務(wù)

集群消息模式下,每個(gè)服務(wù)分別都接受了同樣的消息。

事務(wù)消息

RocketMQ事務(wù)的3個(gè)狀態(tài)

TransactionStatus.CommitTransaction:提交事務(wù)消息,消費(fèi)者可以消費(fèi)此消息

TransactionStatus.RollbackTransaction:回滾事務(wù),它代表該消息將被刪除,不允許被消費(fèi)。

TransactionStatus.Unknown :中間狀態(tài),它代表需要檢查消息隊(duì)列來(lái)確定狀態(tài)。

RocketMQ實(shí)現(xiàn)事務(wù)消息主要分為兩個(gè)階段:正常事務(wù)的發(fā)送及提交、事務(wù)信息的補(bǔ)償流程 整體流程為:

正常事務(wù)發(fā)送與提交階段

1、生產(chǎn)者發(fā)送一個(gè)半消息給MQServer(半消息是指消費(fèi)者暫時(shí)不能消費(fèi)的消息)

2、服務(wù)端響應(yīng)消息寫(xiě)入結(jié)果,半消息發(fā)送成功

3、開(kāi)始執(zhí)行本地事務(wù)

4、根據(jù)本地事務(wù)的執(zhí)行狀態(tài)執(zhí)行Commit或者Rollback操作

事務(wù)信息的補(bǔ)償流程

1、如果MQServer長(zhǎng)時(shí)間沒(méi)收到本地事務(wù)的執(zhí)行狀態(tài)會(huì)向生產(chǎn)者發(fā)起一個(gè)確認(rèn)回查的操作請(qǐng)求

2、生產(chǎn)者收到確認(rèn)回查請(qǐng)求后,檢查本地事務(wù)的執(zhí)行狀態(tài)

3、根據(jù)檢查后的結(jié)果執(zhí)行Commit或者Rollback操作

補(bǔ)償階段主要是用于解決生產(chǎn)者在發(fā)送Commit或者Rollback操作時(shí)發(fā)生超時(shí)或失敗的情況。

發(fā)送端

 
 
 
 
  1. @Resource 
  2. private RocketMQTemplate rocketMQTemplate ; 
  3.      
  4. public void sendTx(String topic, Long id, String tags) { 
  5.     rocketMQTemplate.sendMessageInTransaction(topic + ":" + tags, MessageBuilder.withPayload( 
  6.             new Users(id, UUID.randomUUID().toString().replaceAll("-", ""))). 
  7.             setHeader("BID", UUID.randomUUID().toString().replaceAll("-", "")).build(),  
  8.             UUID.randomUUID().toString().replaceAll("-", "")) ; 

生產(chǎn)者對(duì)應(yīng)的監(jiān)聽(tīng)器

 
 
 
 
  1. @RocketMQTransactionListener 
  2. public class ProducerTxListener implements RocketMQLocalTransactionListener { 
  3.      
  4.     @Resource 
  5.     private BusinessService bs ; 
  6.  
  7.     @Override 
  8.     public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { 
  9.         // 這里執(zhí)行本地的事務(wù)操作,比如保存數(shù)據(jù)。 
  10.         try { 
  11.             // 創(chuàng)建一個(gè)日志記錄表,將這唯一的ID存入數(shù)據(jù)庫(kù)中,在下面的check方法中可以根據(jù)這個(gè)id查詢(xún)是否有數(shù)據(jù) 
  12.             String id = (String) msg.getHeaders().get("BID") ; 
  13.             Users users = new JsonMapper().readValue((byte[])msg.getPayload(), Users.class) ; 
  14.             System.out.println("消息內(nèi)容:" + users + "\t參與數(shù)據(jù):" + arg + "\t本次事務(wù)的唯一編號(hào):" + id) ; 
  15.             bs.save(users, new UsersLog(users.getId(), id)) ; 
  16.         } catch (Exception e) { 
  17.             e.printStackTrace() ; 
  18.             return RocketMQLocalTransactionState.ROLLBACK ; 
  19.         } 
  20.         return RocketMQLocalTransactionState.COMMIT ; 
  21.     } 
  22.  
  23.     @Override 
  24.     public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { 
  25.         // 這里檢查本地事務(wù)是否執(zhí)行成功 
  26.         String id = (String) msg.getHeaders().get("BID") ; 
  27.         System.out.println("執(zhí)行查詢(xún)ID為:" + id + " 的數(shù)據(jù)是否存在") ; 
  28.         UsersLog usersLog = bs.queryUsersLog(id) ; 
  29.         if (usersLog == null) { 
  30.             return RocketMQLocalTransactionState.ROLLBACK ; 
  31.         } 
  32.         return RocketMQLocalTransactionState.COMMIT ; 
  33.     } 
  34.  

消費(fèi)端

 
 
 
 
  1. @RocketMQMessageListener(topic = "tx-topic", consumerGroup = "consumer05-group", selectorExpression = "tag10") 
  2. @Component 
  3. public class ConsumerTxListener implements RocketMQListener { 
  4.  
  5.     @Override 
  6.     public void onMessage(Users users) { 
  7.         System.out.println("TX接收到消息:" + users) ; 
  8.     } 
  9.  

Service

 
 
 
 
  1. @Transactional 
  2. public boolean save(Users users, UsersLog usersLog) { 
  3.     usersRepository.save(users) ; 
  4.     usersLogRepository.save(usersLog) ; 
  5.     if (users.getId() == 1) { 
  6.         throw new RuntimeException("數(shù)據(jù)錯(cuò)誤") ; 
  7.     } 
  8.     return true ; 
  9.      
  10. public UsersLog queryUsersLog(String bid) { 
  11.     return usersLogRepository.findByBid(bid) ; 

Controller

 
 
 
 
  1. @GetMapping("/tx/{id}") 
  2. public Object sendTx(@PathVariable("id")Long id) { 
  3.     ps.sendTx("tx-topic", id, "tag10") ; 
  4.     return "send transaction success" ; 

測(cè)試

調(diào)用接口后,控制臺(tái)輸出:

從打印日志看出來(lái)都保存完畢了后 消費(fèi)端才接受到消息。

刪除數(shù)據(jù),再測(cè)試ID為1會(huì)報(bào)錯(cuò)的。

數(shù)據(jù)庫(kù)中沒(méi)有數(shù)據(jù)。

是不是也不是很復(fù)雜,2個(gè)階段來(lái)處理。

完畢!!!


新聞名稱(chēng):SpringBoot整合RocketMQ事務(wù)/廣播/順序消息
網(wǎng)頁(yè)路徑:http://www.dlmjj.cn/article/cddccog.html