新聞中心
Redis Stream
Redis Stream 是 Redis 5.0 版本新增加的數(shù)據(jù)結(jié)構(gòu)。

創(chuàng)新互聯(lián)-專業(yè)網(wǎng)站定制、快速模板網(wǎng)站建設(shè)、高性價比興文網(wǎng)站開發(fā)、企業(yè)建站全套包干低至880元,成熟完善的模板庫,直接使用。一站式興文網(wǎng)站制作公司更省心,省錢,快速模板網(wǎng)站建設(shè)找我們,業(yè)務(wù)覆蓋興文地區(qū)。費用合理售后完善,十載實體公司更值得信賴。
Redis Stream 主要用于消息隊列(MQ,Message Queue),Redis 本身是有一個 Redis 發(fā)布訂閱 (pub/sub) 來實現(xiàn)消息隊列的功能,但它有個缺點就是消息無法持久化,如果出現(xiàn)網(wǎng)絡(luò)斷開、Redis 宕機等,消息就會被丟棄。
簡單來說發(fā)布訂閱 (pub/sub) 可以分發(fā)消息,但無法記錄歷史消息。
而 Redis Stream 提供了消息的持久化和主備復制功能,可以讓任何客戶端訪問任何時刻的數(shù)據(jù),并且能記住每一個客戶端的訪問位置,還能保證消息不丟失。
Redis Stream 的結(jié)構(gòu)如下所示,它有一個消息鏈表,將所有加入的消息都串起來,每個消息都有一個唯一的 ID 和對應(yīng)的內(nèi)容:
每個 Stream 都有唯一的名稱,它就是 Redis 的 key,在我們首次使用 xadd 指令追加消息時自動創(chuàng)建。
上圖解析:
- Consumer Group :消費組,使用 XGROUP CREATE 命令創(chuàng)建,一個消費組有多個消費者(Consumer)。
- last_delivered_id :游標,每個消費組會有個游標 last_delivered_id,任意一個消費者讀取了消息都會使游標 last_delivered_id 往前移動。
- pending_ids :消費者(Consumer)的狀態(tài)變量,作用是維護消費者的未確認的 id。 pending_ids 記錄了當前已經(jīng)被客戶端讀取的消息,但是還沒有 ack (Acknowledge character:確認字符)。
消息隊列相關(guān)命令:
- XADD - 添加消息到末尾
- XTRIM - 對流進行修剪,限制長度
- XDEL - 刪除消息
- XLEN - 獲取流包含的元素數(shù)量,即消息長度
- XRANGE - 獲取消息列表,會自動過濾已經(jīng)刪除的消息
- XREVRANGE - 反向獲取消息列表,ID 從大到小
- XREAD - 以阻塞或非阻塞方式獲取消息列表
消費者組相關(guān)命令:
- XGROUP CREATE - 創(chuàng)建消費者組
- XREADGROUP GROUP - 讀取消費者組中的消息
- XACK - 將消息標記為"已處理"
- XGROUP SETID - 為消費者組設(shè)置新的最后遞送消息ID
- XGROUP DELCONSUMER - 刪除消費者
- XGROUP DESTROY - 刪除消費者組
- XPENDING - 顯示待處理消息的相關(guān)信息
- XCLAIM - 轉(zhuǎn)移消息的歸屬權(quán)
- XINFO - 查看流和消費者組的相關(guān)信息;
- XINFO GROUPS - 打印消費者組的信息;
- XINFO STREAM - 打印流信息
XADD
使用 XADD 向隊列添加消息,如果指定的隊列不存在,則創(chuàng)建一個隊列,XADD 語法格式:
XADD key ID field value [field value ...]
- key :隊列名稱,如果不存在就創(chuàng)建
- ID :消息 id,我們使用 * 表示由 redis 生成,可以自定義,但是要自己保證遞增性。
- field value : 記錄。
實例
redis
> XADD mystream
* name Sara surname OConnor
"1601372323627-0"
redis
> XADD mystream
* field1 value1 field2 value2 field3 value3
"1601372323627-1"
redis
> XLEN mystream
(integer
)
2
redis
> XRANGE mystream - +
1
)
1
)
"1601372323627-0"
2
)
1
)
"name"
2
)
"Sara"
3
)
"surname"
4
)
"OConnor"
2
)
1
)
"1601372323627-1"
2
)
1
)
"field1"
2
)
"value1"
3
)
"field2"
4
)
"value2"
5
)
"field3"
6
)
"value3"
redis
>
XTRIM
使用 XTRIM 對流進行修剪,限制長度, 語法格式:
XTRIM key MAXLEN [~] count
- key :隊列名稱
- MAXLEN :長度
- count :數(shù)量
實例
127.0.0.1:
6379
> XADD mystream
* field1 A field2 B field3 C field4 D
"1601372434568-0"
127.0.0.1:
6379
> XTRIM mystream MAXLEN
2
(integer
)
0
127.0.0.1:
6379
> XRANGE mystream - +
1
)
1
)
"1601372434568-0"
2
)
1
)
"field1"
2
)
"A"
3
)
"field2"
4
)
"B"
5
)
"field3"
6
)
"C"
7
)
"field4"
8
)
"D"
127.0.0.1:
6379
>
redis
>
XDEL
使用 XDEL 刪除消息,語法格式:
XDEL key ID [ID ...]
- key:隊列名稱
- ID :消息 ID
實例
> XADD mystream
* a
1
1538561698944-
0
> XADD mystream
* b
2
1538561700640-
0
> XADD mystream
* c
3
1538561701744-
0
> XDEL mystream
1538561700640-
0
(integer
)
1
127.0.0.1:
6379
> XRANGE mystream - +
1
)
1
)
1538561698944-
0
2
)
1
)
"a"
2
)
"1"
2
)
1
)
1538561701744-
0
2
)
1
)
"c"
2
)
"3"
XLEN
使用 XLEN 獲取流包含的元素數(shù)量,即消息長度,語法格式:
XLEN key
- key:隊列名稱
實例
redis
> XADD mystream
* item
1
"1601372563177-0"
redis
> XADD mystream
* item
2
"1601372563178-0"
redis
> XADD mystream
* item
3
"1601372563178-1"
redis
> XLEN mystream
(integer
)
3
redis
>
XRANGE
使用 XRANGE 獲取消息列表,會自動過濾已經(jīng)刪除的消息 ,語法格式:
XRANGE key start end [COUNT count]
- key :隊列名
- start :開始值, - 表示最小值
- end :結(jié)束值, + 表示最大值
- count :數(shù)量
實例
redis
> XADD writers
* name Virginia surname Woolf
"1601372577811-0"
redis
> XADD writers
* name Jane surname Austen
"1601372577811-1"
redis
> XADD writers
* name Toni surname Morrison
"1601372577811-2"
redis
> XADD writers
* name Agatha surname Christie
"1601372577812-0"
redis
> XADD writers
* name Ngozi surname Adichie
"1601372577812-1"
redis
> XLEN writers
(integer
)
5
redis
> XRANGE writers - + COUNT
2
1
)
1
)
"1601372577811-0"
2
)
1
)
"name"
2
)
"Virginia"
3
)
"surname"
4
)
"Woolf"
2
)
1
)
"1601372577811-1"
2
)
1
)
"name"
2
)
"Jane"
3
)
"surname"
4
)
"Austen"
redis
>
XREVRANGE
使用 XREVRANGE 獲取消息列表,會自動過濾已經(jīng)刪除的消息 ,語法格式:
XREVRANGE key end start [COUNT count]
- key :隊列名
- end :結(jié)束值, + 表示最大值
- start :開始值, - 表示最小值
- count :數(shù)量
實例
redis
> XADD writers
* name Virginia surname Woolf
"1601372731458-0"
redis
> XADD writers
* name Jane surname Austen
"1601372731459-0"
redis
> XADD writers
* name Toni surname Morrison
"1601372731459-1"
redis
> XADD writers
* name Agatha surname Christie
"1601372731459-2"
redis
> XADD writers
* name Ngozi surname Adichie
"1601372731459-3"
redis
> XLEN writers
(integer
)
5
redis
> XREVRANGE writers + - COUNT
1
1
)
1
)
"1601372731459-3"
2
)
1
)
"name"
2
)
"Ngozi"
3
)
"surname"
4
)
"Adichie"
redis
>
XREAD
使用 XREAD 以阻塞或非阻塞方式獲取消息列表 ,語法格式:
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
- count :數(shù)量
- milliseconds :可選,阻塞毫秒數(shù),沒有設(shè)置就是非阻塞模式
- key :隊列名
- id :消息 ID
實例
# 從 Stream 頭部讀取兩條消息
> XREAD COUNT
2 STREAMS mystream writers
0-
0
0-
0
1
)
1
)
"mystream"
2
)
1
)
1
)
1526984818136-
0
2
)
1
)
"duration"
2
)
"1532"
3
)
"event-id"
4
)
"5"
5
)
"user-id"
6
)
"7782813"
2
)
1
)
1526999352406-
0
2
)
1
)
"duration"
2
)
"812"
3
)
"event-id"
4
)
"9"
5
)
"user-id"
6
)
"388234"
2
)
1
)
"writers"
2
)
1
)
1
)
1526985676425-
0
2
)
1
)
"name"
2
)
"Virginia"
3
)
"surname"
4
)
"Woolf"
2
)
1
)
1526985685298-
0
2
)
1
)
"name"
2
)
"Jane"
3
)
"surname"
4
)
"Austen"
XGROUP CREATE
使用 XGROUP CREATE 創(chuàng)建消費者組,語法格式:
XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
- key :隊列名稱,如果不存在就創(chuàng)建
- groupname :組名。
- $ : 表示從尾部開始消費,只接受新消息,當前 Stream 消息會全部忽略。
從頭開始消費:
XGROUP CREATE mystream consumer-group-name 0-0
從尾部開始消費:
XGROUP CREATE mystream consumer-group-name $
XREADGROUP GROUP
使用 XREADGROUP GROUP 讀取消費組中的消息,語法格式:
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
- group :消費組名
- consumer :消費者名。
- count : 讀取數(shù)量。
- milliseconds : 阻塞毫秒數(shù)。
- key : 隊列名。
- ID : 消息 ID。
XREADGROUP GROUP consumer-group-name consumer-name COUNT 1 STREAMS mystream >
本文名稱:創(chuàng)新互聯(lián)Redis教程:RedisStream
文章位置:http://www.dlmjj.cn/article/cdohdhh.html


咨詢
建站咨詢
