日本综合一区二区|亚洲中文天堂综合|日韩欧美自拍一区|男女精品天堂一区|欧美自拍第6页亚洲成人精品一区|亚洲黄色天堂一区二区成人|超碰91偷拍第一页|日韩av夜夜嗨中文字幕|久久蜜综合视频官网|精美人妻一区二区三区

RELATEED CONSULTING
相關咨詢
選擇下列產(chǎn)品馬上在線溝通
服務時間:8:30-17:00
你可能遇到了下面的問題
關閉右側工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
RedisStream消息隊列
Redis Stream 是 Redis 5.0 版本引入的一種新數(shù)據(jù)類型,同時它也是 Redis 中最為復雜的數(shù)據(jù)結構,本節(jié)主要對 Stream 做相關介紹。

創(chuàng)新互聯(lián)是一家專注于成都網(wǎng)站設計、網(wǎng)站建設與策劃設計,古冶網(wǎng)站建設哪家好?創(chuàng)新互聯(lián)做網(wǎng)站,專注于網(wǎng)站建設10余年,網(wǎng)設計領域的專業(yè)建站公司;建站業(yè)務涵蓋:古冶等地區(qū)。古冶做網(wǎng)站價格咨詢:13518219792

什么是Stream?

Stream 實際上是一個具有消息發(fā)布/訂閱功能的組件,也就常說的消息隊列。其實這種類似于 broker/consumer(生產(chǎn)者/消費者)的數(shù)據(jù)結構很常見,比如 RabbitMQ 消息中間件、Celery 消息中間件,以及 Kafka 分布式消息系統(tǒng)等,而 Redis Stream 正是借鑒了 Kafaka 系統(tǒng)。

1) 優(yōu)點

Strean 除了擁有很高的性能和內(nèi)存利用率外, 它最大的特點就是提供了消息的持久化存儲,以及主從復制功能,從而解決了網(wǎng)絡斷開、Redis 宕機情況下,消息丟失的問題,即便是重啟 Redis,存儲的內(nèi)容也會存在。

2) 流程

Stream 消息隊列主要由四部分組成,分別是:消息本身、生產(chǎn)者、消費者和消費組,對于前述三者很好理解,下面了解什么是消費組。

一個 Stream 隊列可以擁有多個消費組,每個消費組中又包含了多個消費者,組內(nèi)消費者之間存在競爭關系。當某個消費者消費了一條消息時,同組消費者,都不會再次消費這條消息。被消費的消息 ID 會被放入等待處理的 Pending_ids 中。每消費完一條信息,消費組的游標就會向前移動一位,組內(nèi)消費者就繼續(xù)去爭搶下消息。

Redis Stream 消息隊列結構程如下圖所示:



圖1:Redis Stream流程處理圖

下面對上圖涉及的專有名詞做簡單解釋:

  • Stream direction:表示數(shù)據(jù)流,它是一個消息鏈,將所有的消息都串起來,每個消息都有一個唯一標識 ID 和對應的消息內(nèi)容(Message content)。
  • Consumer Group :表示消費組,擁有唯一的組名,使用 XGROUP CREATE 命令創(chuàng)建。一個 Stream 消息鏈上可以有多個消費組,一個消費組內(nèi)擁有多個消費者,每一個消費者也有一個唯一的 ID 標識。
  • last_delivered_id :表示消費組游標,每個消費組都會有一個游標 last_delivered_id,任意一個消費者讀取了消息都會使游標 last_delivered_id 往前移動。
  • pending_ids :Redis 官方稱為 PEL,表示消費者的狀態(tài)變量,它記錄了當前已經(jīng)被客戶端讀取的消息 ID,但是這些消息沒有被 ACK(確認字符)。如果客戶端沒有 ACK,那么這個變量中的消息 ID 會越來越多,一旦被某個消息被 ACK,它就開始減少。

3) ACK 

ACK(Acknowledge character)即確認字符,在數(shù)據(jù)通信中,接收方傳遞給發(fā)送方的一種傳輸類控制字符。表示發(fā)來的數(shù)據(jù)已確認接收無誤。在 TCP/IP 協(xié)議中,如果接收方成功的接收到數(shù)據(jù),那么會回復一個 ACK 數(shù)據(jù)。通常 ACK 信號有自己固定的格式,長度大小,由接收方回復給發(fā)送方。

常用命令匯總

Redis Stream命令
命令 說明
XADD  添加消息到末尾。
XTRIM 對 Stream 流進行修剪,限制長度。
XDEL 刪除指定的消息。
XLEN 獲取流包含的元素數(shù)量,即消息長度。
XRANGE 獲取消息列表,會自動過濾已經(jīng)刪除的消息。
XREVRANGE  反向獲取消息列表,ID 從大到小。
XREAD 以阻塞或非阻塞方式獲取消息列表。
XGROUP CREATE 創(chuàng)建消費者組。
XREADGROUP GROUP 讀取消費者組中的消息。
XACK 將消息標記為"已處理"。
XGROUP SETID 為消費者組設置新的最后遞送消息ID。
XGROUP DELCONSUMER 刪除消費者。
XGROUP DESTROY 刪除消費者組。
XPENDING 顯示待處理消息的相關信息。
XCLAIM  轉移消息的歸屬權。
XINFO 查看 Stream 流、消費者和消費者組的相關信息。
XINFO GROUPS 查看消費者組的信息。
XINFO STREAM  查看 Stream 流信息。
XINFO CONSUMERS key group 查看組內(nèi)消費者流信息。

基本命令應用

下面通過一組示例演示有關 Stream 命令的使用:

#添加一個消息, * 表示以時間戳自動創(chuàng)建id
127.0.0.1:6379> XADD mystream * username www.biancheng.net age 10 c.biancheng.net age 9
"1610619132674-1"
#自定義id等于001,注意id只增不減
127.0.0.1:6379> XADD mystream1 001 name zhangsan addr hebei
"1-0"
127.0.0.1:6379> XADD mystream1 002 name lisi addr hunan
"2-0"
#如果插入重復的id號會報錯
127.0.0.1:6379> XADD mystream1 001 name wangwu addr fujian
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item
127.0.0.1:6379> XADD mystream1 003 name wangwu addr fujian
"3-0"
#刪除id=001的數(shù)據(jù)
127.0.0.1:6379> XDEL mystream1 001
(integer) 1
#查看stream隊列包含的消息數(shù)量,也就消息長度
127.0.0.1:6379> XLEN mystream1
(integer) 2
#獲取消息列表,-表示最小,+表示最大
127.0.0.1:6379> XRANGE mystream - +
1) 1) "1610619132674-0"
   2) 1) "username"
      2) "www.biancheng.net"
      3) "age"
      4) "10"
2) 1) "1610619178028-0"
   2) 1) "username"
      2) "c.biancheng.net"
      3) "age"
      4) "9"
#獲取消息列表
127.0.0.1:6379> XRANGE mystream1 - 003
1) 1) "2-0"
   2) 1) "name"
      2) "lisi"
      3) "addr"
      4) "hunan"
2) 1) "3-0"
   2) 1) "name"
      2) "wangwu"
      3) "addr"
      4) "fujian"
#使用count指定返回數(shù)據(jù)的數(shù)量
127.0.0.1:6379> XRANGE mystream1 - 003 count 1
1) 1) "2-0"
   2) 1) "name"
      2) "lisi"
      3) "addr"
      4) "hunan"
#刪除整個Stream
127.0.0.1:6379> DEL mystream
#使用xread讀取消息
127.0.0.1:6379> XREAD count 2 STREAMS mystream1 2-0
1) 1) "mystream1"
   2) 1) 1) "3-0"
         2) 1) "name"
            2) "wangwu"
            3) "addr"
            4) "fujian"

創(chuàng)建消息ID

上述示例中,當我們創(chuàng)建一個 Srteam 時, 需要創(chuàng)建消息 ID,該 ID 是唯一、不可重復的,并且只增不減。消息 ID 有兩種創(chuàng)建方式,一是系統(tǒng)自動生成,二是自定義創(chuàng)建。

1) 系統(tǒng)自動創(chuàng)建

語法格式如下:

XADD key ID field value [field value ...]

參數(shù)說明如下:

  • key :指定隊列名稱,如果不存就創(chuàng)建;
  • ID :消息 id,我們使用*表示由 redis 生成,可以自定義,但是要自己保證遞增性;
  • field value :消息記錄。

返回值是毫秒時間戳格式的字符串。比如 1610619132674-2,它表示在該毫秒內(nèi)產(chǎn)生的第 2 條消息。使用示例:

XADD mystream * username www.biancheng.net age 10

2) 自定義ID

自定義 ID 比較簡單,但是需要注意的是 ID 的形式必須是 “整數(shù)”,并且后面加入消息的 ID 必須大于前面消息的 ID,也就是自定義 ID 也必須遵守遞增的規(guī)則。示例如下:

XADD mystream1 001 name zhangsan addr hebei

創(chuàng)建消費組

Redis Stream通過
XGROUP CREATE指令創(chuàng)建消費組(Consumer Group),在創(chuàng)建時,需要傳遞起始消息的 ID 用來初始化 last_delivered_id 變量。語法格式如下:

XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]

參數(shù)說明如下:

  • key :指定 Stream 隊列名稱,若不存在則自動創(chuàng)建。
  • groupname :自定義消費組的名稱,不可重復。
  • $ :表示從尾部開始消費,只接受新消息,而當前 Stream 的消息則被忽略。

示例如下:

#創(chuàng)建消費組,并傳遞消息起始id 0-0
127.0.0.1:6379> XGROUP CREATE mystream1 ms1 0-0
OK
#從尾部開始消費信息,只接受新消息
127.0.0.1:6379> XGROUP CREATE mystream1 ms3 $
OK
#xinfo查看隊列信息
127.0.0.1:6379> XINFO stream mystream1
#隊列中消息的長度
1) "length"
2) (integer) 2
3) "radix-tree-keys"
4) (integer) 1
5) "radix-tree-nodes"
6) (integer) 2
#有幾個消費組
7) "groups"
8) (integer) 2
9) "last-generated-id"
10) "3-0"
11) "first-entry" #第一個消息
12) 1) "2-0"
    2) 1) "name"
       2) "lisi"
       3) "addr"
       4) "hunan"
13) "last-entry" #最后一個消息
14) 1) "3-0"
    2) 1) "name"
       2) "wangwu"
       3) "addr"
       4) "fujian"
#查看消費組信息
127.0.0.1:6379> XINFO GROUPS mystream1
1) 1) "name"
   2) "ms1"
   3) "consumers"
   4) (integer) 0
   5) "pending"
   6) (integer) 0
   7) "last-delivered-id"
   8) "0-0"
2) 1) "name"
   2) "ms3"
   3) "consumers"
   4) (integer) 0
   5) "pending"
   6) (integer) 0
   7) "last-delivered-id"
   8) "3-0"

消費消息

Redis Stream 通過
XREADGROUP命令使消費組消費信息,它和
XREAD命令一樣,都可以阻塞等待新消息。讀到新消息后,對應的消息 ID 就會進入消費者的 PLE(正在處理的消息)結構里,客戶端處理完畢后使用 XACK 命令通知 Redis 服務器,本條消息已經(jīng)處理完畢,該消息的 ID 就會從 PEL 中移除。示意圖如下:



圖2:Redis Stream 流程示意圖


XREADGROUP命令的語法格式如下所示:

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]  

參數(shù)說明如下:

  • group :消費組名稱。
  • consumer :消費者名稱。
  • count : 要讀取的數(shù)量。
  • milliseconds : 阻塞時間,以毫秒為單位。
  • key :  鍵指定的隊列名稱。
  • ID : 表示消息 ID。

使用示例如下:

#消費組中消費者讀取消息,> 表示每當消費一個信息,消費組游標就前移一位
127.0.0.1:6379> XREADGROUP GROUP ms1 c1 COUNT 1 STREAMS mystream1 >
1) 1) "mystream1"
   2) 1) 1) "2-0"
         2) 1) "name"
            2) "lisi"
            3) "addr"
            4) "hunan"
#再使用ms1-c1讀取一條消息
127.0.0.1:6379> XREADGROUP GROUP ms1 c1 COUNT 1 STREAMS mystream1 >
1) 1) "mystream1"
   2) 1) 1) "3-0"
         2) 1) "name"
            2) "wangwu"
            3) "addr"
            4) "fujian"
#BLOCK 1000表示等待1秒,如果沒有任何消息到來,則返回nill,此時移動到了末尾
127.0.0.1:6379> XREADGROUP GROUP ms1 c1 COUNT 1 BLOCK 1000 STREAMS mystream1 >
(nil)
127.0.0.1:6379> XREADGROUP GROUP ms1 c1 COUNT 1 STREAMS mystream1 1
1) 1) "mystream1"
   2) 1) 1) "2-0"
         2) 1) "name"
            2) "lisi"
            3) "addr"
            4) "hunan"
#超出了消息id的范圍
127.0.0.1:6379> XREADGROUP GROUP ms1 c1 COUNT 1 STREAMS mystream1 3
1) 1) "mystream1"
   2) (empty list or set)
#添加新的消息 ID為004
127.0.0.1:6379> XADD mystream1 004 name zhangwu age 24
#使用另外一個消費組讀取消息
127.0.0.1:6379> XREADGROUP GROUP ms3 c2 COUNT 2 STREAMS mystream1 >
1) 1) "mystream1"
   2) 1) 1) "4-0"
         2) 1) "name"
            2) "zhangwu"
            3) "age"
            4) "21"
#xack將id=002消息標記為已經(jīng)處理
127.0.0.1:6379> XACK mystream1 ms1 002
(integer) 1

注意:
>表示每當消費者讀取一條消息時,last_delivered_id 變量就會前移一位。


標題名稱:RedisStream消息隊列
當前路徑:http://www.dlmjj.cn/article/djjeooe.html