日本综合一区二区|亚洲中文天堂综合|日韩欧美自拍一区|男女精品天堂一区|欧美自拍第6页亚洲成人精品一区|亚洲黄色天堂一区二区成人|超碰91偷拍第一页|日韩av夜夜嗨中文字幕|久久蜜综合视频官网|精美人妻一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
實(shí)戰(zhàn)干貨:基于Redis6.0部署迷你版本消息隊(duì)列

技術(shù)研究背景

由于目前的研發(fā)團(tuán)隊(duì)處于公司初創(chuàng)階段,尚未有能成熟的運(yùn)維體系,對(duì)于市面上常見的成熟MQ搭建維護(hù)能力不足,但是又希望能有一款輕量級(jí)的消息系統(tǒng)供研發(fā)團(tuán)隊(duì)的成員使用,因此開展了對(duì)該方面相關(guān)的技術(shù)調(diào)研工作。

目前創(chuàng)新互聯(lián)已為千余家的企業(yè)提供了網(wǎng)站建設(shè)、域名、虛擬空間、網(wǎng)站托管、企業(yè)網(wǎng)站設(shè)計(jì)、本溪網(wǎng)站維護(hù)等服務(wù),公司將堅(jiān)持客戶導(dǎo)向、應(yīng)用為本的策略,正道將秉承"和諧、參與、激情"的文化,與客戶和合作伙伴齊心協(xié)力一起成長,共同發(fā)展。

通過相關(guān)的技術(shù)調(diào)研后,決定挑選基于Redis實(shí)現(xiàn)消息系統(tǒng)。

具體技術(shù)選型原因:

  • 團(tuán)隊(duì)內(nèi)部已經(jīng)有搭建相關(guān)的Redis服務(wù),并且具備一定的運(yùn)維能力,可以節(jié)省技術(shù)成本
  • 業(yè)界有較多關(guān)于Redis搭建消息系統(tǒng)方面的技術(shù)文章
  • 目前的系統(tǒng)的整體吞吐量并不高,接入消息系統(tǒng)的主要目的只是為了實(shí)現(xiàn)系統(tǒng)之間的解耦

為了方便讓讀者們從0到1地學(xué)習(xí)這塊內(nèi)容,我將會(huì)從環(huán)節(jié)搭建開始介紹起。

基本環(huán)境的搭建

基于redis6.0.6版本搭建一套簡單的消息隊(duì)列系統(tǒng)。 環(huán)境部署:

docker run -p 6379:6379 --name redis_6_0_6 -d redis:6.0.6
  • 參數(shù)解釋: -d 后臺(tái)啟動(dòng) -p 端口映射 -name 容器名稱

如果本地沒有相關(guān)鏡像,可以嘗試通過搭建下方命令進(jìn)行鏡像的拉?。?

docker pull redis:6.0.6

當(dāng)redis的基礎(chǔ)環(huán)境配置好了之后,接下來便是基于redis內(nèi)置的一些基本功能開發(fā)一款消息隊(duì)列組件了。

下邊我將分三種不同的技術(shù)方案來介紹如何實(shí)現(xiàn)一款輕量級(jí)的消息隊(duì)列。

基于常規(guī)的隊(duì)列結(jié)構(gòu)來實(shí)現(xiàn)消息隊(duì)列

這塊的實(shí)現(xiàn)比較簡單,主要是基于Redis內(nèi)部的List結(jié)構(gòu)來落地的,發(fā)送方將消息從隊(duì)列的左邊寫入,然后消費(fèi)方從隊(duì)列的右邊讀取。

package org.idea.mq.redis.framework.mq.list;
import com.alibaba.fastjson.JSON;
import org.idea.mq.redis.framework.bean.MsgWrapper;
import org.idea.mq.redis.framework.mq.IMQTemplate;
import org.idea.mq.redis.framework.redis.IRedisService;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* @Author linhao
* @Date created in 3:09 下午 2022/2/7
*/
@Component
public class RedisListMQTemplate implements IMQTemplate {
@Resource
private IRedisService iRedisService;
@Override
public boolean send(MsgWrapper msgWrapper) {
try {
String json = JSON.toJSONString(msgWrapper.getMsgInfo());
iRedisService.lpush(msgWrapper.getTopic(),json);
return true;
}catch (Exception e){
e.printStackTrace();
}
return false;
}
}

問題思考

這里存在幾個(gè)問題點(diǎn)需要思考下:

多個(gè)服務(wù)之間如何訂閱同一個(gè)消息

這里我建議可以按照系統(tǒng)的項(xiàng)目名稱前綴+業(yè)務(wù)標(biāo)識(shí)來組織。

例如:用戶系統(tǒng)中需要發(fā)布一條 會(huì)員已升級(jí) 的消息給到下游系統(tǒng),此時(shí)可以將這條消息寫入到名為:user-service:member-upgrade-list 的List集合中。

如果訂單系統(tǒng)希望訪問用戶系統(tǒng)的消息,則需要在redis的key里指定user-service:member-upgrade-list關(guān)鍵字。

在這里插入圖片描述

消息的監(jiān)聽機(jī)制如何實(shí)現(xiàn)?

對(duì)于List的消息可以采用輪詢的方式獲取,例如下邊這段案例代碼:

/**
* 輪詢的方式獲取數(shù)據(jù)
*
* @param msgWrapper
*/
private void pollingGet(MsgWrapper msgWrapper) {
while (true) {
String value = iRedisService.rpop(msgWrapper.getTopic());
if (!StringUtils.isEmpty(value)) {
System.out.println(value);
}
//減少訪問壓力,定期睡眠一段時(shí)間
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

但是輪詢的方式比較消耗性能,所以可以嘗試使用Redis的阻塞式彈出指令,例如下邊這種方式來監(jiān)聽消息的觸發(fā)行為:

/**
* 阻塞的方式獲取數(shù)據(jù)
*/
private void blockGet(MsgWrapper msgWrapper) {
while (true) {
List values = iRedisService.brpop(msgWrapper.getTopic());
if (!CollectionUtils.isEmpty(values)) {
values.forEach(value -> {
System.out.println(value);
});
}
}
}

消息的可靠性傳輸如何確保?

在設(shè)計(jì)消息隊(duì)列的時(shí)候,我們非??粗氐木褪窍⒌目煽啃员WC。當(dāng)一條消息發(fā)送到消費(fèi)端之后,如果出現(xiàn)了異常,希望消息能夠?qū)崿F(xiàn)重新發(fā)送的效果。

對(duì)于這種場景的設(shè)計(jì)我們可以嘗試使用 BRPOPLPUSH 這條指令,這條指令可以幫助我們在Redis內(nèi)部將數(shù)據(jù)彈出時(shí)寫入到另一個(gè)備份隊(duì)列中,這樣即使彈出的消息消費(fèi)失敗了,備份隊(duì)列中還有一份備用消息可以使用,而且彈出和寫入備份隊(duì)列操作在Redis內(nèi)部做了封裝,外界調(diào)用可以視作為一個(gè)原子操作。

是否可以支持廣播的模式?

從List集合的實(shí)現(xiàn)原理來看,Redis彈出的元素只能返回給一個(gè)客戶端鏈接,因此無法支持廣播這種效果的實(shí)現(xiàn)。

基于發(fā)布訂閱功能實(shí)現(xiàn)消息隊(duì)列

Redis的內(nèi)部提供了一個(gè)叫做發(fā)布訂閱的功能,通過subscibe命令和publish指令可以幫助我們實(shí)現(xiàn)關(guān)于消息發(fā)布和通知的功能。

使用subscibe/publish命令實(shí)現(xiàn)的效果和List結(jié)構(gòu)最大的不同在于它的傳輸方式:

  • list更多的是實(shí)現(xiàn)點(diǎn)對(duì)點(diǎn)方式的傳輸(P2P方式)
  •  subscibe/publish則是可以實(shí)現(xiàn)廣播的方式和訂閱者進(jìn)行通信

publish部分的案例代碼:

@Override
public boolean publish(String channel, String content) {
try (Jedis jedis = iRedisFactory.getConnection()) {
jedis.publish(channel, content);
return true;
} catch (Exception e) {
throw new RuntimeException(e);
}
}

subscibe部分的代碼:

@Override
public boolean subscribe(JedisPubSub jedisPubSub, String... channel) {
try (Jedis jedis = iRedisFactory.getConnection()) {
jedis.subscribe(jedisPubSub, channel);
return true;
} catch (Exception e) {
throw new RuntimeException(e);
}
}

監(jiān)聽的部分可以通過額外開啟一個(gè)線程來實(shí)現(xiàn)這部分效果:

@Component
public class RedisSubscribeMQListener implements IMQListener {
@Resource
private IRedisService iRedisService;
class TestChannel extends JedisPubSub {
@Override
public void onMessage(String channel, String message) {
super.onMessage(channel, message);
System.out.println("channel " + channel + " 接收到消息:" + message);
}
@Override
public void onSubscribe(String channel, int subscribedChannels) {
System.out.println(String.format("subscribe redis channel success, channel %s, subscribedChannels %d",
channel, subscribedChannels));
}
@Override
public void onUnsubscribe(String channel, int subscribedChannels) {
System.out.println(String.format("unsubscribe redis channel, channel %s, subscribedChannels %d",
channel, subscribedChannels));
}
}
//所有頻道的消息都監(jiān)聽
@Override
public void onMessageReach(MsgWrapper msgWrapper) {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
iRedisService.subscribe(new TestChannel(), msgWrapper.getTopic());
}
});
thread.start();
}
}

要注意,回調(diào)通知的時(shí)候需要注入一個(gè)JedisPubSub的對(duì)象,這個(gè)對(duì)象的內(nèi)部定義了接收消息之后的處理操作。

問題思考

如何保證消息的可靠性傳輸?

通過subscibe/publish處理的消息沒有持久化的特性,一旦出現(xiàn)網(wǎng)絡(luò)中斷,Redis宕機(jī)這類異常的時(shí)候就會(huì)導(dǎo)致消息丟失,而且也沒有較好的機(jī)制取支持消息重復(fù)消費(fèi)的問題。因此可靠性方面較差。

基于Stream實(shí)現(xiàn)消息隊(duì)列

Redis5.0中發(fā)布的Stream類型,也用來實(shí)現(xiàn)典型的消息隊(duì)列。提供了消息的持久化和主備復(fù)制功能,可以讓任何客戶端訪問任何時(shí)刻的數(shù)據(jù),并且能記住每一個(gè)客戶端的訪問位置,還能保證消息不丟失。該Stream類型的出現(xiàn),幾乎滿足了消息隊(duì)列具備的全部內(nèi)容,包括但不限于:

  • 消息ID的序列化生成
  • 消息遍歷
  • 消息的阻塞和非阻塞讀取
  • 消息的分組消費(fèi)
  • 未完成消息的處理
  • 消息隊(duì)列監(jiān)控

關(guān)于Stream的一些基本入門篇章這里不做過多介紹,感興趣的朋友可以去閱讀下這篇文章:

??  https://xie.infoq.cn/article/cdb47caddc5ff49dc09ea58cd ??

下邊的部分我們直接來進(jìn)入關(guān)于Redis XStream相關(guān)的實(shí)戰(zhàn)環(huán)節(jié)。

封裝消息監(jiān)聽功能

首先是定義一個(gè)MQ相關(guān)的接口:

public interface RedisStreamListener {
/**
* 處理正常消息
*/
HandlerResult handleMsg(StreamEntry streamEntry);
}

接著是基于這套接口做消息發(fā)送的實(shí)現(xiàn):

package org.idea.mq.redis.framework.listener;
import com.alibaba.fastjson.JSON;
import org.idea.mq.redis.framework.bean.HandlerResult;
import org.idea.mq.redis.framework.config.StreamListener;
import org.idea.mq.redis.framework.mq.xstream.RedisStreamMQListener;
import org.idea.mq.redis.framework.redis.IRedisService;
import org.idea.mq.redis.framework.utils.PayMsg;
import redis.clients.jedis.StreamEntry;
import javax.annotation.Resource;
import java.util.Map;
import static org.idea.mq.redis.framework.config.MQConstants.SUCCESS;
/**
* @Author linhao
* @Date created in 10:07 下午 2022/2/9
*/
@StreamListener(streamName = "order-service:order-payed-stream", groupName = "order-service-group", consumerName = "user-service-consumer")
public class OrderPayedListener implements RedisStreamMQListener {
@Resource
private IRedisService iRedisService;
@Override
public HandlerResult handleMsg(StreamEntry streamEntry) {
Map map = streamEntry.getFields();
String json = map.get("json");
PayMsg payMsg = JSON.parseObject(json, PayMsg.class);
System.out.println("pending payMsg is : " + payMsg);
return SUCCESS;
}
}

自定義消息注解

package org.idea.mq.redis.framework.config;
import org.springframework.stereotype.Component;
import java.lang.annotation.*;
/**
* @Author linhao
* @Date created in 10:04 下午 2022/2/9
*/
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Component
public @interface StreamListener {
String streamName() default "";
String groupName() default "";
String consumerName() default "";
}

代碼中有一個(gè)自定義的@StreamListener的注解,該注解的內(nèi)部包含了一個(gè)@Component的注解,可以將使用了該注解的對(duì)象注入到Spring容器中。

為了能將這些個(gè)初始化類進(jìn)行自動(dòng)裝配,還需要加入一個(gè)配置的對(duì)象,代碼如下:

package org.idea.mq.redis.framework.config;
import org.idea.mq.redis.framework.bean.HandlerResult;
import org.idea.mq.redis.framework.mq.xstream.RedisStreamMQListener;
import org.idea.mq.redis.framework.redis.IRedisService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import redis.clients.jedis.StreamEntry;
import redis.clients.jedis.StreamEntryID;
import redis.clients.jedis.StreamPendingEntry;
import javax.annotation.Resource;
import java.util.List;
import java.util.Map;
import static org.idea.mq.redis.framework.config.MQConstants.SUCCESS;
/**
* @Author linhao
* @Date created in 3:25 下午 2022/2/7
*/
@Configuration
public class StreamListenerConfiguration implements ApplicationListener {
@Resource
private ApplicationContext applicationContext;
@Resource
private IRedisService iRedisService;
private static Logger logger = LoggerFactory.getLogger(StreamListenerConfiguration.class);
@Override
public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
Map beanMap = applicationContext.getBeansOfType(RedisStreamMQListener.class);
beanMap.values().forEach(redisStreamMQListener -> {
StreamListener StreamListener = redisStreamMQListener.getClass().getAnnotation(StreamListener.class);
ListenerInitWrapper listenerInitWrapper = new ListenerInitWrapper(StreamListener.streamName(), StreamListener.groupName(), StreamListener.consumerName());
Thread handleThread = new Thread(new CoreMsgHandlerThread(listenerInitWrapper, redisStreamMQListener, iRedisService));
Thread pendingHandleThread = new Thread(new PendingMsgHandlerThread(listenerInitWrapper, redisStreamMQListener, iRedisService));
handleThread.start();
pendingHandleThread.start();
logger.info("{} load successed ", redisStreamMQListener);
});
}
class PendingMsgHandlerThread implements Runnable {
private ListenerInitWrapper listenerInitWrapper;
private RedisStreamMQListener redisStreamMQListener;
private IRedisService iRedisService;
public PendingMsgHandlerThread(ListenerInitWrapper listenerInitWrapper, RedisStreamMQListener redisStreamMQListener, IRedisService iRedisService) {
this.redisStreamMQListener = redisStreamMQListener;
this.listenerInitWrapper = listenerInitWrapper;
this.iRedisService = iRedisService;
}
@Override
public void run() {
String startId = "0-0";
while (true) {
List streamConsumersInfos = iRedisService.xpending(listenerInitWrapper.getStreamName(), listenerInitWrapper.getGroupName(), new StreamEntryID(startId), 1);
//如果該集合非空,則觸發(fā)監(jiān)聽行為
if (!CollectionUtils.isEmpty(streamConsumersInfos)) {
for (StreamPendingEntry streamConsumersInfo : streamConsumersInfos) {
StreamEntryID streamEntryID = streamConsumersInfo.getID();
//比當(dāng)前pending的streamId小1
String streamIdStr = streamEntryID.toString();
String[] items = streamIdStr.split("-");
Long timestamp = Long.valueOf(items[0]) - 1;
String beforeId = timestamp + "-" + "0";
List>> result = iRedisService.xreadGroup(listenerInitWrapper.getStreamName(), listenerInitWrapper.getGroupName(), new StreamEntryID(beforeId), 1, listenerInitWrapper.getConsumerName());
for (Map.Entry> streamInfo : result) {
List streamEntries = streamInfo.getValue();
for (StreamEntry streamEntry : streamEntries) {
try {
//業(yè)務(wù)處理
HandlerResult handlerResult = redisStreamMQListener.handleMsg(streamEntry);
if (SUCCESS.equals(handlerResult)) {
startId = streamEntryID.toString();
iRedisService.xack(listenerInitWrapper.getStreamName(), listenerInitWrapper.getGroupName(), new StreamEntryID(startId));
}
} catch (Exception e) {
logger.error("[PendingMsgHandlerThread] e is ", e);
}
}
}
}
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class CoreMsgHandlerThread implements Runnable {
private ListenerInitWrapper listenerInitWrapper;
private RedisStreamMQListener redisStreamMQListener;
private IRedisService iRedisService;
public CoreMsgHandlerThread(ListenerInitWrapper listenerInitWrapper, RedisStreamMQListener redisStreamMQListener, IRedisService iRedisService) {
this.redisStreamMQListener = redisStreamMQListener;
this.listenerInitWrapper = listenerInitWrapper;
this.iRedisService = iRedisService;
}
@Override
public void run() {
while (true) {
List>> streamConsumersInfos = iRedisService.xreadGroup(listenerInitWrapper.getStreamName(), listenerInitWrapper.getGroupName(), StreamEntryID.UNRECEIVED_ENTRY, 1, listenerInitWrapper.getConsumerName());
for (Map.Entry> streamInfo : streamConsumersInfos) {
List streamEntries = streamInfo.getValue();
for (StreamEntry streamEntry : streamEntries) {
//業(yè)務(wù)處理
try {
HandlerResult result = redisStreamMQListener.handleMsg(streamEntry);
if (SUCCESS.equals(result)) {
iRedisService.xack(listenerInitWrapper.getStreamName(), listenerInitWrapper.getGroupName(), streamEntry.getID());
}
} catch (Exception e)
分享標(biāo)題:實(shí)戰(zhàn)干貨:基于Redis6.0部署迷你版本消息隊(duì)列
網(wǎng)站網(wǎng)址:http://www.dlmjj.cn/article/djsjcgo.html