新聞中心
大家好,我是君哥。

今天來聊一聊 RocketMQ 的灰度方案。
灰度發(fā)布是指在黑與白之間,平滑過渡的一種發(fā)布方式。在大流量的系統(tǒng)中,如果一次升級改造范圍比較大,或者影響內容不太確定,一般會采用切量的方式進行升級,這樣可以減少生產(chǎn)變更帶來的影響。
如上圖,對 ServiceA 這個服務進行升級,采用灰度發(fā)布,先升級 Server5,一周后如果沒有問題,升級 Server4 和 Server 3,再運行一周沒有問題,把剩下兩個節(jié)點都升級。
上面的案例是一個 RPC 的調用。但如果使用消息隊列該怎么做呢?使用消息隊列,并不能使用網(wǎng)關來進行流量轉發(fā)。這里需要分不同場景進行分析。
1、只升級消費者
這是最簡單的情況,比如只有消費者修改了消費邏輯,就是 RPC 調用的情況類似,我們只要把消費者進行灰度發(fā)布就可以。如下圖:
2、生產(chǎn)者也升級
下面是一個訂單的實體類,我們新加了一個屬性,訂單生成時間:
public class Order {
private Long id;
private Long userId;
private Long productId;
private Integer count;
private BigDecimal payAmount;
/**訂單狀態(tài):0:創(chuàng)建中;1:已完結*/
private Integer status;
/**新加屬性,訂單生成時間*/
private String createTime;
}消費端的改造是需要對 createTime 這個屬性進行處理。
(1)消費端過濾
在生產(chǎn)者的 Order 類中增加 createTime 屬性,如果我們直接使用 createTime 屬性來過濾,消費者并不能實現(xiàn)灰度,因為所有的消費者都可能會拉取到帶有 createTime 屬性的消息。
RocketMQ 中 Message 的定義如下:
public class Message implements Serializable {
private String topic;
private int flag;
private Map properties;
private byte[] body;
private String transactionId;
} 可以在 properties 屬性中增加一個灰度標識,比如生產(chǎn)者發(fā)送消息的時候封裝如下:
Message msg = buildMessage(topic);
msg.putUserProperty("gray", "true");
注意:也可以在 SendMessageHook 這個鉤子函數(shù)中定義。通過這種方式可以在消費端新增加一個灰度 Consumer Group,用來對灰度消息則進行消費。如下圖:
對于灰度 Consumer Group 判斷到 gray 屬性是 true 時進行消費,而對于普通 Consumer Group,判斷到 gray 屬性不等于 true 時再進行消費。這里可以借助 RocketMQ 客戶端的 FilterMessageHook,代碼如下:
defaultMQPushConsumerImpl.registerFilterMessageHook(new FilterMessageHook() {
@Override
public String hookName(){
return "filterHook";
}
@Override
public void filterMessage(FilterMessageContext context){
List messages = context.getMsgList();
context.setMsgList(messages.stream().filter(m -> StringUtils.equals(m.getProperty("gray"),"true"))
.collect(Collectors.toList()));
}
}); ?不過這樣會有兩個問題,灰度和正常的兩個 Consumer Group 相當于是廣播組:
- 兩個組都要對所有的消息進行拉取,比如本來使用灰度發(fā)布計劃切 10% 的流量,但實際上全部流量都切過去了,只是根據(jù)屬性做了判斷。這讓消費端整體承擔了兩倍的壓力;
- 因為兩個消費者組都要去 Broker 拉取消息,Broker 的壓力也增加了一倍。
(2)Broker 過濾
使用 tag 過濾
如果一個 Consumer 不訂閱一個 Topic 中的全部消息,可以通過 Tag 來過濾。比如一個 Consumer 訂閱了 TopicA 這個 Topic 中的 Tag1 和 Tag2 這兩個 tag,那這個 Consumer 的訂閱關系如下圖:
SubscriptionData 這個對象封裝了 Topic、tag 以及所訂閱 tag 的 hashcode 集合。
Consumer 發(fā)送拉取消息請求時,會把訂閱關系傳給 Broker(Broker 解析成 SubscriptionData 對象),Broker 使用 consumequeue 獲取消息時,首先判斷判斷最后 8 個字節(jié)的 tag hashcode 是否在 SubscriptionData 的 codeSet 中,如果不在就跳過,如果存在把消息返回給 Consumer。如下圖:
這樣可以在灰度 Producer 發(fā)送消息時加上 Tag,如下代碼:
Message msg = new Message();
msg.setBody("Test");
msg.setTopic("Topic");
msg.setTags("Gray");
而在灰度消費者訂閱 Gray 這個 tag。這樣就避免了 2.1 節(jié)中消息全量拉取的問題。
使用 SQL92 過濾
使用 SQL92 過濾,可以應對更加復雜的場景,不僅可以過濾 Tag,還可以過濾 UserProperty。
比如下面是一個生產(chǎn)者的代碼:
Message msg = new Message();
msg.setTopic("testTopic");
msg.setTags("tag1");
msg.putUserProperty("gray","true");
這樣消費者初始化的時候,可以定義使用 SQL92 過濾,代碼如下:
consumer.subscribe("testTopic",
MessageSelector.bySql("(TAGS is not null and TAGS in TAGS='''''tag1''''')" +
"and (gray is not null gray='true')"));下面是 bySql 的源代碼:
public static MessageSelector bySql(String sql){
return new MessageSelector(ExpressionType.SQL92, sql);
}3、總結
本文介紹了 RocketMQ 灰度消息的使用方法,場景比較簡單。對于全鏈路的復雜灰度場景,可以參考使用阿里的微服引擎 MSE。
網(wǎng)站欄目:面試官:使用RocketMQ怎么進行灰度發(fā)布?
URL標題:http://www.dlmjj.cn/article/dpedeeg.html


咨詢
建站咨詢
