新聞中心

創(chuàng)新互聯(lián)是一家專注于成都網(wǎng)站設計、網(wǎng)站建設與策劃設計,古冶網(wǎng)站建設哪家好?創(chuàng)新互聯(lián)做網(wǎng)站,專注于網(wǎng)站建設10余年,網(wǎng)設計領域的專業(yè)建站公司;建站業(yè)務涵蓋:古冶等地區(qū)。古冶做網(wǎng)站價格咨詢:13518219792
什么是Stream?
Stream 實際上是一個具有消息發(fā)布/訂閱功能的組件,也就常說的消息隊列。其實這種類似于 broker/consumer(生產(chǎn)者/消費者)的數(shù)據(jù)結構很常見,比如 RabbitMQ 消息中間件、Celery 消息中間件,以及 Kafka 分布式消息系統(tǒng)等,而 Redis Stream 正是借鑒了 Kafaka 系統(tǒng)。
1) 優(yōu)點
Strean 除了擁有很高的性能和內(nèi)存利用率外, 它最大的特點就是提供了消息的持久化存儲,以及主從復制功能,從而解決了網(wǎng)絡斷開、Redis 宕機情況下,消息丟失的問題,即便是重啟 Redis,存儲的內(nèi)容也會存在。
2) 流程
Stream 消息隊列主要由四部分組成,分別是:消息本身、生產(chǎn)者、消費者和消費組,對于前述三者很好理解,下面了解什么是消費組。
一個 Stream 隊列可以擁有多個消費組,每個消費組中又包含了多個消費者,組內(nèi)消費者之間存在競爭關系。當某個消費者消費了一條消息時,同組消費者,都不會再次消費這條消息。被消費的消息 ID 會被放入等待處理的 Pending_ids 中。每消費完一條信息,消費組的游標就會向前移動一位,組內(nèi)消費者就繼續(xù)去爭搶下消息。
Redis Stream 消息隊列結構程如下圖所示:
圖1:Redis Stream流程處理圖
下面對上圖涉及的專有名詞做簡單解釋:
- Stream direction:表示數(shù)據(jù)流,它是一個消息鏈,將所有的消息都串起來,每個消息都有一個唯一標識 ID 和對應的消息內(nèi)容(Message content)。
- Consumer Group :表示消費組,擁有唯一的組名,使用 XGROUP CREATE 命令創(chuàng)建。一個 Stream 消息鏈上可以有多個消費組,一個消費組內(nèi)擁有多個消費者,每一個消費者也有一個唯一的 ID 標識。
- last_delivered_id :表示消費組游標,每個消費組都會有一個游標 last_delivered_id,任意一個消費者讀取了消息都會使游標 last_delivered_id 往前移動。
- pending_ids :Redis 官方稱為 PEL,表示消費者的狀態(tài)變量,它記錄了當前已經(jīng)被客戶端讀取的消息 ID,但是這些消息沒有被 ACK(確認字符)。如果客戶端沒有 ACK,那么這個變量中的消息 ID 會越來越多,一旦被某個消息被 ACK,它就開始減少。
3) ACK
ACK(Acknowledge character)即確認字符,在數(shù)據(jù)通信中,接收方傳遞給發(fā)送方的一種傳輸類控制字符。表示發(fā)來的數(shù)據(jù)已確認接收無誤。在 TCP/IP 協(xié)議中,如果接收方成功的接收到數(shù)據(jù),那么會回復一個 ACK 數(shù)據(jù)。通常 ACK 信號有自己固定的格式,長度大小,由接收方回復給發(fā)送方。
常用命令匯總
| 命令 | 說明 |
|---|---|
| XADD | 添加消息到末尾。 |
| XTRIM | 對 Stream 流進行修剪,限制長度。 |
| XDEL | 刪除指定的消息。 |
| XLEN | 獲取流包含的元素數(shù)量,即消息長度。 |
| XRANGE | 獲取消息列表,會自動過濾已經(jīng)刪除的消息。 |
| XREVRANGE | 反向獲取消息列表,ID 從大到小。 |
| XREAD | 以阻塞或非阻塞方式獲取消息列表。 |
| XGROUP CREATE | 創(chuàng)建消費者組。 |
| XREADGROUP GROUP | 讀取消費者組中的消息。 |
| XACK | 將消息標記為"已處理"。 |
| XGROUP SETID | 為消費者組設置新的最后遞送消息ID。 |
| XGROUP DELCONSUMER | 刪除消費者。 |
| XGROUP DESTROY | 刪除消費者組。 |
| XPENDING | 顯示待處理消息的相關信息。 |
| XCLAIM | 轉移消息的歸屬權。 |
| XINFO | 查看 Stream 流、消費者和消費者組的相關信息。 |
| XINFO GROUPS | 查看消費者組的信息。 |
| XINFO STREAM | 查看 Stream 流信息。 |
| XINFO CONSUMERS key group | 查看組內(nèi)消費者流信息。 |
基本命令應用
下面通過一組示例演示有關 Stream 命令的使用:
#添加一個消息, * 表示以時間戳自動創(chuàng)建id
127.0.0.1:6379> XADD mystream * username www.biancheng.net age 10 c.biancheng.net age 9
"1610619132674-1"
#自定義id等于001,注意id只增不減
127.0.0.1:6379> XADD mystream1 001 name zhangsan addr hebei
"1-0"
127.0.0.1:6379> XADD mystream1 002 name lisi addr hunan
"2-0"
#如果插入重復的id號會報錯
127.0.0.1:6379> XADD mystream1 001 name wangwu addr fujian
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item
127.0.0.1:6379> XADD mystream1 003 name wangwu addr fujian
"3-0"
#刪除id=001的數(shù)據(jù)
127.0.0.1:6379> XDEL mystream1 001
(integer) 1
#查看stream隊列包含的消息數(shù)量,也就消息長度
127.0.0.1:6379> XLEN mystream1
(integer) 2
#獲取消息列表,-表示最小,+表示最大
127.0.0.1:6379> XRANGE mystream - +
1) 1) "1610619132674-0"
2) 1) "username"
2) "www.biancheng.net"
3) "age"
4) "10"
2) 1) "1610619178028-0"
2) 1) "username"
2) "c.biancheng.net"
3) "age"
4) "9"
#獲取消息列表
127.0.0.1:6379> XRANGE mystream1 - 003
1) 1) "2-0"
2) 1) "name"
2) "lisi"
3) "addr"
4) "hunan"
2) 1) "3-0"
2) 1) "name"
2) "wangwu"
3) "addr"
4) "fujian"
#使用count指定返回數(shù)據(jù)的數(shù)量
127.0.0.1:6379> XRANGE mystream1 - 003 count 1
1) 1) "2-0"
2) 1) "name"
2) "lisi"
3) "addr"
4) "hunan"
#刪除整個Stream
127.0.0.1:6379> DEL mystream
#使用xread讀取消息
127.0.0.1:6379> XREAD count 2 STREAMS mystream1 2-0
1) 1) "mystream1"
2) 1) 1) "3-0"
2) 1) "name"
2) "wangwu"
3) "addr"
4) "fujian"
創(chuàng)建消息ID
上述示例中,當我們創(chuàng)建一個 Srteam 時, 需要創(chuàng)建消息 ID,該 ID 是唯一、不可重復的,并且只增不減。消息 ID 有兩種創(chuàng)建方式,一是系統(tǒng)自動生成,二是自定義創(chuàng)建。
1) 系統(tǒng)自動創(chuàng)建
語法格式如下:
XADD key ID field value [field value ...]
參數(shù)說明如下:
- key :指定隊列名稱,如果不存就創(chuàng)建;
- ID :消息 id,我們使用
*表示由 redis 生成,可以自定義,但是要自己保證遞增性; - field value :消息記錄。
返回值是毫秒時間戳格式的字符串。比如 1610619132674-2,它表示在該毫秒內(nèi)產(chǎn)生的第 2 條消息。使用示例:
XADD mystream * username www.biancheng.net age 10
2) 自定義ID
自定義 ID 比較簡單,但是需要注意的是 ID 的形式必須是 “整數(shù)”,并且后面加入消息的 ID 必須大于前面消息的 ID,也就是自定義 ID 也必須遵守遞增的規(guī)則。示例如下:
XADD mystream1 001 name zhangsan addr hebei
創(chuàng)建消費組
Redis Stream通過
XGROUP CREATE指令創(chuàng)建消費組(Consumer Group),在創(chuàng)建時,需要傳遞起始消息的 ID 用來初始化 last_delivered_id 變量。語法格式如下:
XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
參數(shù)說明如下:
- key :指定 Stream 隊列名稱,若不存在則自動創(chuàng)建。
- groupname :自定義消費組的名稱,不可重復。
- $ :表示從尾部開始消費,只接受新消息,而當前 Stream 的消息則被忽略。
示例如下:
#創(chuàng)建消費組,并傳遞消息起始id 0-0
127.0.0.1:6379> XGROUP CREATE mystream1 ms1 0-0
OK
#從尾部開始消費信息,只接受新消息
127.0.0.1:6379> XGROUP CREATE mystream1 ms3 $
OK
#xinfo查看隊列信息
127.0.0.1:6379> XINFO stream mystream1
#隊列中消息的長度
1) "length"
2) (integer) 2
3) "radix-tree-keys"
4) (integer) 1
5) "radix-tree-nodes"
6) (integer) 2
#有幾個消費組
7) "groups"
8) (integer) 2
9) "last-generated-id"
10) "3-0"
11) "first-entry" #第一個消息
12) 1) "2-0"
2) 1) "name"
2) "lisi"
3) "addr"
4) "hunan"
13) "last-entry" #最后一個消息
14) 1) "3-0"
2) 1) "name"
2) "wangwu"
3) "addr"
4) "fujian"
#查看消費組信息
127.0.0.1:6379> XINFO GROUPS mystream1
1) 1) "name"
2) "ms1"
3) "consumers"
4) (integer) 0
5) "pending"
6) (integer) 0
7) "last-delivered-id"
8) "0-0"
2) 1) "name"
2) "ms3"
3) "consumers"
4) (integer) 0
5) "pending"
6) (integer) 0
7) "last-delivered-id"
8) "3-0"
消費消息
Redis Stream 通過
XREADGROUP命令使消費組消費信息,它和
XREAD命令一樣,都可以阻塞等待新消息。讀到新消息后,對應的消息 ID 就會進入消費者的 PLE(正在處理的消息)結構里,客戶端處理完畢后使用 XACK 命令通知 Redis 服務器,本條消息已經(jīng)處理完畢,該消息的 ID 就會從 PEL 中移除。示意圖如下:
圖2:Redis Stream 流程示意圖
XREADGROUP命令的語法格式如下所示:
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
參數(shù)說明如下:
- group :消費組名稱。
- consumer :消費者名稱。
- count : 要讀取的數(shù)量。
- milliseconds : 阻塞時間,以毫秒為單位。
- key : 鍵指定的隊列名稱。
- ID : 表示消息 ID。
使用示例如下:
#消費組中消費者讀取消息,> 表示每當消費一個信息,消費組游標就前移一位
127.0.0.1:6379> XREADGROUP GROUP ms1 c1 COUNT 1 STREAMS mystream1 >
1) 1) "mystream1"
2) 1) 1) "2-0"
2) 1) "name"
2) "lisi"
3) "addr"
4) "hunan"
#再使用ms1-c1讀取一條消息
127.0.0.1:6379> XREADGROUP GROUP ms1 c1 COUNT 1 STREAMS mystream1 >
1) 1) "mystream1"
2) 1) 1) "3-0"
2) 1) "name"
2) "wangwu"
3) "addr"
4) "fujian"
#BLOCK 1000表示等待1秒,如果沒有任何消息到來,則返回nill,此時移動到了末尾
127.0.0.1:6379> XREADGROUP GROUP ms1 c1 COUNT 1 BLOCK 1000 STREAMS mystream1 >
(nil)
127.0.0.1:6379> XREADGROUP GROUP ms1 c1 COUNT 1 STREAMS mystream1 1
1) 1) "mystream1"
2) 1) 1) "2-0"
2) 1) "name"
2) "lisi"
3) "addr"
4) "hunan"
#超出了消息id的范圍
127.0.0.1:6379> XREADGROUP GROUP ms1 c1 COUNT 1 STREAMS mystream1 3
1) 1) "mystream1"
2) (empty list or set)
#添加新的消息 ID為004
127.0.0.1:6379> XADD mystream1 004 name zhangwu age 24
#使用另外一個消費組讀取消息
127.0.0.1:6379> XREADGROUP GROUP ms3 c2 COUNT 2 STREAMS mystream1 >
1) 1) "mystream1"
2) 1) 1) "4-0"
2) 1) "name"
2) "zhangwu"
3) "age"
4) "21"
#xack將id=002消息標記為已經(jīng)處理
127.0.0.1:6379> XACK mystream1 ms1 002
(integer) 1
注意:
>表示每當消費者讀取一條消息時,last_delivered_id 變量就會前移一位。
標題名稱:RedisStream消息隊列
當前路徑:http://www.dlmjj.cn/article/djjeooe.html


咨詢
建站咨詢
