Redis进阶-Stream多播的可持久化的消息队列
文章目錄
- Pre
- Stream簡(jiǎn)介
- Stream特性
- 消息 ID
- 消息內(nèi)容
- 命令預(yù)覽
- 獨(dú)立消費(fèi)
- 創(chuàng)建消費(fèi)組
- 消費(fèi)
- Stream 消息積壓怎么處理
- 消息如果忘記 ACK 會(huì)怎樣?
- PEL 如何避免消息丟失?
- Stream 的高可用
- 分區(qū) Partition
- 小結(jié)
- 數(shù)據(jù)結(jié)構(gòu) RadixTree
Pre
Redis-13Redis發(fā)布訂閱 中提到了PubSub的不足之處 。
PubSub 的生產(chǎn)者傳遞過(guò)來(lái)一個(gè)消息,Redis 會(huì)直接找到相應(yīng)的消費(fèi)者傳遞過(guò)去。如果一個(gè)消費(fèi)者都沒(méi)有,那么消息直接丟棄。
如果開(kāi)始有三個(gè)消費(fèi)者,一個(gè)消費(fèi)者突然掛掉了,生產(chǎn)者會(huì)繼續(xù)發(fā)送消息,另外兩個(gè)消費(fèi)者可以持續(xù)收到消息。但是掛掉的消費(fèi)者重新連上的時(shí)候,這斷連期間生產(chǎn)者發(fā)送的消息,對(duì)于這個(gè)消費(fèi)者來(lái)說(shuō)就是徹底丟失了。
如果 Redis 停機(jī)重啟,PubSub 的消息是不會(huì)持久化的,畢竟 Redis 宕機(jī)就相當(dāng)于一個(gè)消費(fèi)者都沒(méi)有,所有的消息直接被丟棄。
正是因?yàn)?PubSub 有這些缺點(diǎn),它幾乎找不到合適的應(yīng)用場(chǎng)景。Redis5.0 新增了 Stream 數(shù)據(jù)結(jié)構(gòu),這個(gè)功能給 Redis 帶來(lái)了持久化消息隊(duì)列,從此 PubSub 可以消失了。
Stream簡(jiǎn)介
Redis5.0 最大的新特性就是多出了一個(gè)數(shù)據(jù)結(jié)構(gòu) Stream,它是一個(gè)新的強(qiáng)大的支持多播的可持久化的消息隊(duì)列。 Redis Stream 借鑒了 Kafka 的設(shè)計(jì)。
Stream特性
-
Redis Stream 的結(jié)構(gòu)如上圖所示,它有一個(gè)消息鏈表,將所有加入的消息都串起來(lái),每個(gè)消息都有一個(gè)唯一的 ID 和對(duì)應(yīng)的內(nèi)容
-
消息是持久化的,Redis 重啟后,內(nèi)容還在
-
每個(gè) Stream 都有唯一的名稱,它就是 Redis 的 key,首次使用 xadd 指令追加消息時(shí)自動(dòng)創(chuàng)建
-
每個(gè) Stream 都可以掛多個(gè)消費(fèi)組,每個(gè)消費(fèi)組會(huì)有個(gè)游標(biāo)last_delivered_id 在 Stream數(shù)組之上往前移動(dòng),表示當(dāng)前消費(fèi)組已經(jīng)消費(fèi)到哪條消息了
-
每個(gè)消費(fèi)組都有一個(gè) Stream內(nèi)唯一的名稱,消費(fèi)組不會(huì)自動(dòng)創(chuàng)建,它需要單獨(dú)的指令 xgroup create 進(jìn)行創(chuàng)建,需要指定從 Stream 的某個(gè)消息 ID 開(kāi)始消費(fèi),這個(gè) ID 用來(lái)初始化 last_delivered_id 變量。
-
每個(gè)消費(fèi)組 (Consumer Group) 的狀態(tài)都是獨(dú)立的,相互不受影響,即同一份Stream 內(nèi)部的消息會(huì)被每個(gè)消費(fèi)組都消費(fèi)到
-
同一個(gè)消費(fèi)組 (Consumer Group) 可以掛接多個(gè)消費(fèi)者 (Consumer),這些消費(fèi)者之間是競(jìng)爭(zhēng)關(guān)系,任意一個(gè)消費(fèi)者讀取了消息都會(huì)使游標(biāo) last_delivered_id 往前移動(dòng)。每個(gè)消費(fèi)者有一個(gè)組內(nèi)唯一名稱。
-
消費(fèi)者 (Consumer) 內(nèi)部會(huì)有個(gè)狀態(tài)變量 pending_ids,它記錄了當(dāng)前已經(jīng)被客戶端讀取的消息,但是還沒(méi)有 ack。如果客戶端沒(méi)有 ack,這個(gè)變量里面的消息 ID 會(huì)越來(lái)越多,一旦某個(gè)消息被 ack,它就開(kāi)始減少。這個(gè) pending_ids 變量在 Redis 官方被稱之為 PEL ( Pending Entries List),這是一個(gè)很核心的數(shù)據(jù)結(jié)構(gòu),它用來(lái)確保客戶端至少消費(fèi)了消息一次,而不會(huì)在網(wǎng)絡(luò)傳輸?shù)闹型緛G失了沒(méi)處理。
消息 ID
消息 ID 的形式是 timestampInMillis-sequence,例如 1587877430819-3,它表示當(dāng)前的消息在毫米時(shí)間戳 1587877430819時(shí)產(chǎn)生,并且是該毫秒內(nèi)產(chǎn)生的第 3條消息。
消息 ID 可以由服務(wù)器自動(dòng)生成,也可以由客戶端自己指定,但是形式必須是整數(shù)-整數(shù),而且必須是后面加入的消息的 ID 要大于前面的消息 ID。
消息內(nèi)容
消息內(nèi)容就是鍵值對(duì),形如 hash 結(jié)構(gòu)的鍵值對(duì),這沒(méi)什么特別之處。
命令預(yù)覽
Redis Version _ 5.0.3
- xadd 追加消息
- xdel 刪除消息,這里的刪除僅僅是設(shè)置了標(biāo)志位,不影響消息總長(zhǎng)度
- xrange 獲取消息列表,會(huì)自動(dòng)過(guò)濾已經(jīng)刪除的消息
- xlen 消息長(zhǎng)度
- del 刪除 Stream
- 。。。。
獨(dú)立消費(fèi)
我們可以在不定義消費(fèi)組的情況下進(jìn)行 Stream 消息的獨(dú)立消費(fèi),當(dāng) Stream 沒(méi)有新消息時(shí),甚至可以阻塞等待。
Redis 設(shè)計(jì)了一個(gè)單獨(dú)的消費(fèi)指令 xread,可以將 Stream 當(dāng)成普通的消息隊(duì)列 (list) 來(lái)使用。
使用 xread 時(shí),我們可以完全忽略消費(fèi)組 (Consumer Group)的存在,就好比 Stream 就是一個(gè)普通的列表 (list)。
演示一下
#先通過(guò)xadd向artisan這個(gè)隊(duì)列寫(xiě)入5條數(shù)據(jù) 127.0.0.1:6379> XADD artisan * name artisan1 age 25 "1587886601587-0" 127.0.0.1:6379> XADD artisan * name artisan2 age 26 "1587886610449-0" 127.0.0.1:6379> XADD artisan * name artisan3 age 27 "1587886617014-0" 127.0.0.1:6379> XADD artisan * name artisan4 age 28 "1587886622590-0" 127.0.0.1:6379> XADD artisan * name artisan6 age 25 "1587886631932-0" 127.0.0.1:6379> XLEN artisan (integer) 5 127.0.0.1:6379> # 從 Stream 頭部讀取兩條消息 127.0.0.1:6379> XREAD COUNT 2 STREAMS artisan 0-0 1) 1) "artisan"2) 1) 1) "1587886601587-0"2) 1) "name"2) "artisan1"3) "age"4) "25"2) 1) "1587886610449-0"2) 1) "name"2) "artisan2"3) "age"4) "26" 127.0.0.1:6379> # 從 Stream 尾部讀取一條消息,毫無(wú)疑問(wèn),這里不會(huì)返回任何消息 127.0.0.1:6379> XREAD COUNT 1 STREAMS artisan $ (nil) 127.0.0.1:6379> # 從尾部阻塞等待新消息到來(lái),下面的指令會(huì)堵住,直到新消息到來(lái) 127.0.0.1:6379> XREAD block 0 count 1 streams artisan $ # 重新打開(kāi)一個(gè)窗口,在這個(gè)窗口往 Stream 里塞消息 127.0.0.1:6379> XADD artisan * name artisan7 age 27 "1587886999018-0" 127.0.0.1:6379> # 再切換到前面的窗口,我們可以看到阻塞解除了,返回了新的消息內(nèi)容 # 而且還顯示了一個(gè)等待時(shí)間,這里我們等待了 136.42s 127.0.0.1:6379> XREAD block 0 count 1 streams artisan $ 1) 1) "artisan"2) 1) 1) "1587886999018-0"2) 1) "name"2) "artisan7"3) "age"4) "27" (136.42s) 127.0.0.1:6379>-
客戶端如果想要使用 xread 進(jìn)行順序消費(fèi),一定要記住當(dāng)前消費(fèi)到哪里了,也就是返回的消息 ID。下次繼續(xù)調(diào)用 xread 時(shí),將上次返回的最后一個(gè)消息 ID 作為參數(shù)傳遞進(jìn)去,就可以繼續(xù)消費(fèi)后續(xù)的消息。
-
block 0 表示永遠(yuǎn)阻塞,直到消息到來(lái),block 1000 表示阻塞 1s,如果 1s 內(nèi)沒(méi)有任何消息到來(lái),就返回 nil
創(chuàng)建消費(fèi)組
Stream 通過(guò) xgroup create 指令創(chuàng)建消費(fèi)組 (Consumer Group),需要傳遞起始消息 ID 參數(shù)用來(lái)初始化 last_delivered_id 變量。
# 表示從頭開(kāi)始消費(fèi) 127.0.0.1:6379> XGROUP create artisan artisanGroup 0-0 OK 127.0.0.1:6379> # $ 表示從尾部開(kāi)始消費(fèi),只接受新消息,當(dāng)前 Stream 消息會(huì)全部忽略 127.0.0.1:6379> XGROUP create artisan artisanGroup2 $ OK 127.0.0.1:6379> 127.0.0.1:6379> XINFO stream artisan1) "length"2) (integer) 6 # 共 6 個(gè)消息3) "radix-tree-keys"4) (integer) 15) "radix-tree-nodes"6) (integer) 27) "groups"8) (integer) 2 # 兩個(gè)消費(fèi)組9) "last-generated-id" 10) "1587886999018-0" 11) "first-entry" # 第一個(gè)消息 12) 1) "1587886601587-0"2) 1) "name"2) "artisan1"3) "age"4) "25" 13) "last-entry" # 最后一個(gè)消息 14) 1) "1587886999018-0"2) 1) "name"2) "artisan7"3) "age"4) "27" 127.0.0.1:6379> # 獲取 Stream 的消費(fèi)組信息 127.0.0.1:6379> XINFO groups artisan 1) 1) "name"2) "artisanGroup"3) "consumers"4) (integer) 0 # 該消費(fèi)組還沒(méi)有消費(fèi)者5) "pending"6) (integer) 0 # 該消費(fèi)組沒(méi)有正在處理的消息7) "last-delivered-id"8) "0-0" 2) 1) "name"2) "artisanGroup2"3) "consumers"4) (integer) 0 # 該消費(fèi)組還沒(méi)有消費(fèi)者5) "pending"6) (integer) 0 # 該消費(fèi)組沒(méi)有正在處理的消息7) "last-delivered-id"8) "1587886999018-0"消費(fèi)
Stream 提供了 xreadgroup 指令可以進(jìn)行消費(fèi)組的組內(nèi)消費(fèi),需要提供消費(fèi)組名稱、消費(fèi)者名稱和起始消息 ID。
它同 xread 一樣,也可以阻塞等待新消息。讀到新消息后,對(duì)應(yīng)的消息 ID 就會(huì)進(jìn)入消費(fèi)者的 PEL(正在處理的消息) 結(jié)構(gòu)里,客戶端處理完畢后使用 xack指令通知服務(wù)器,本條消息已經(jīng)處理完畢,該消息 ID 就會(huì)從 PEL 中移除。
先看下目前隊(duì)列中的數(shù)據(jù)
127.0.0.1:6379> XRANGE artisan - + 1) 1) "1587886601587-0"2) 1) "name"2) "artisan1"3) "age"4) "25" 2) 1) "1587886610449-0"2) 1) "name"2) "artisan2"3) "age"4) "26" 3) 1) "1587886617014-0"2) 1) "name"2) "artisan3"3) "age"4) "27" 4) 1) "1587886622590-0"2) 1) "name"2) "artisan4"3) "age"4) "28" 5) 1) "1587886631932-0"2) 1) "name"2) "artisan6"3) "age"4) "25" 6) 1) "1587886999018-0"2) 1) "name"2) "artisan7"3) "age"4) "27" 127.0.0.1:6379> # > 號(hào)表示從當(dāng)前消費(fèi)組的 last_delivered_id 后面開(kāi)始讀 # 每當(dāng)消費(fèi)者讀取一條消息,last_delivered_id 變量就會(huì)前進(jìn) 127.0.0.1:6379> XREADGROUP group artisanGroup artisanGroup2 count 1 streams artisan > 1) 1) "artisan"2) 1) 1) "1587886601587-0"2) 1) "name"2) "artisan1"3) "age"4) "25" 127.0.0.1:6379> 127.0.0.1:6379> XREADGROUP group artisanGroup artisanGroup2 count 1 streams artisan > 1) 1) "artisan"2) 1) 1) "1587886610449-0"2) 1) "name"2) "artisan2"3) "age"4) "26" 127.0.0.1:6379> XREADGROUP group artisanGroup artisanGroup2 count 1 streams artisan > 1) 1) "artisan"2) 1) 1) "1587886617014-0"2) 1) "name"2) "artisan3"3) "age"4) "27" 127.0.0.1:6379> XREADGROUP group artisanGroup artisanGroup2 count 1 streams artisan > 1) 1) "artisan"2) 1) 1) "1587886622590-0"2) 1) "name"2) "artisan4"3) "age"4) "28" 127.0.0.1:6379> XREADGROUP group artisanGroup artisanGroup2 count 1 streams artisan > 1) 1) "artisan"2) 1) 1) "1587886631932-0"2) 1) "name"2) "artisan6"3) "age"4) "25" 127.0.0.1:6379> XREADGROUP group artisanGroup artisanGroup2 count 1 streams artisan > 1) 1) "artisan"2) 1) 1) "1587886999018-0"2) 1) "name"2) "artisan7"3) "age"4) "27"# 再繼續(xù)讀取,就沒(méi)有新消息了 127.0.0.1:6379> XREADGROUP group artisanGroup artisanGroup2 count 1 streams artisan > (nil) 127.0.0.1:6379> # 那就阻塞等待吧 127.0.0.1:6379> XREADGROUP group artisanGroup artisanGroup2 block 0 count 1 streams artisan > # 開(kāi)啟另一個(gè)窗口,往里塞消息 127.0.0.1:6379> XADD artisan * name artisan8 age 28 "1587889553099-0" 127.0.0.1:6379> # 回到前一個(gè)窗口,發(fā)現(xiàn)阻塞解除,收到新消息了 127.0.0.1:6379> XREADGROUP group artisanGroup artisanGroup2 block 0 count 1 streams artisan > 1) 1) "artisan"2) 1) 1) "1587889553099-0"2) 1) "name"2) "artisan8"3) "age"4) "28" (37.63s) 127.0.0.1:6379> # 觀察消費(fèi)組信息 127.0.0.1:6379> XINFO groups artisan 1) 1) "name"2) "artisanGroup"3) "consumers"4) (integer) 1 # 一個(gè)消費(fèi)者5) "pending"6) (integer) 7 # 共 7 條正在處理的信息還有沒(méi)有 ack7) "last-delivered-id"8) "1587889553099-0" 2) 1) "name"2) "artisanGroup2"3) "consumers"4) (integer) 0 # 消費(fèi)組 artisanGroup2沒(méi)有任何變化,因?yàn)榍懊嫖覀円恢痹诓倏v artisanGroup5) "pending"6) (integer) 07) "last-delivered-id"8) "1587886999018-0" 127.0.0.1:6379> # 如果同一個(gè)消費(fèi)組有多個(gè)消費(fèi)者,我們可以通過(guò) xinfo consumers 指令觀察每個(gè)消費(fèi)者的狀態(tài) 127.0.0.1:6379> XINFO consumers artisan artisanGroup 1) 1) "name"2) "artisanGroup2"3) "pending"4) (integer) 7 # 共 7 條待處理消息5) "idle"6) (integer) 179922 # 空閑了多長(zhǎng)時(shí)間 ms 沒(méi)有讀取消息了 127.0.0.1:6379> XINFO consumers artisan artisanGroup2 (empty list or set) 127.0.0.1:6379> # 接下來(lái)我們 ack 一條消息 127.0.0.1:6379> XACK artisan artisanGroup 1587886601587-0 (integer) 1 127.0.0.1:6379> XINFO consumers artisan artisanGroup 1) 1) "name"2) "artisanGroup2"3) "pending"4) (integer) 6 # 變成了 6 條待處理的消息5) "idle"6) (integer) 359659 127.0.0.1:6379> # 下面 ack 所有消息127.0.0.1:6379> XACK artisan artisanGroup 1587886610449-0 1587886617014-0 1587886622590-0 1587886631932-0 1587886999018-0 1587889553099-0 (integer) 6 127.0.0.1:6379> XINFO consumers artisan artisanGroup 1) 1) "name"2) "artisanGroup2"3) "pending"4) (integer) 0 # pel 空了5) "idle"6) (integer) 528547 127.0.0.1:6379>Stream 消息積壓怎么處理
消息積累太多,Stream 的鏈表豈不是很長(zhǎng),內(nèi)容會(huì)不會(huì)爆掉?xdel指令又不會(huì)刪除消息,它只是給消息做了個(gè)標(biāo)志位。
Redis考慮到了這一點(diǎn),所以它提供了一個(gè)定長(zhǎng) Stream 功能。在 xadd 的指令提供一個(gè)定長(zhǎng)長(zhǎng)度 maxlen,就可以將老的消息干掉,確保最多不超過(guò)指定長(zhǎng)度
127.0.0.1:6379> xlen artisan (integer) 7 127.0.0.1:6379> xadd artisan maxlen 3 * name artisan89 age 90 "1587890235506-0" 127.0.0.1:6379> XLEN artisan (integer) 3 127.0.0.1:6379>我們看到 Stream 的長(zhǎng)度被砍掉了,通過(guò)指定 maxlen,僅保留了 maxlen的長(zhǎng)度數(shù)據(jù)。
消息如果忘記 ACK 會(huì)怎樣?
Stream 在每個(gè)消費(fèi)者結(jié)構(gòu)中保存了正在處理中的消息 ID 列表 PEL,如果消費(fèi)者收到了消息處理完了但是沒(méi)有回復(fù) ack,就會(huì)導(dǎo)致 PEL 列表不斷增長(zhǎng),如果有很多消費(fèi)組的話,那么這個(gè) PEL 占用的內(nèi)存就會(huì)放大。
PEL 如何避免消息丟失?
在客戶端消費(fèi)者讀取 Stream 消息時(shí),Redis 服務(wù)器將消息回復(fù)給客戶端的過(guò)程中,客戶端突然斷開(kāi)了連接,消息就丟失了.
但是 PEL 里已經(jīng)保存了發(fā)出去的消息 ID。待客戶端重新連上之后,可以再次收到 PEL 中的消息 ID 列表。不過(guò)此時(shí) xreadgroup 的起始消息ID 不能為參數(shù)>,而必須是任意有效的消息 ID,一般將參數(shù)設(shè)為 0-0,表示讀取所有的PEL 消息以及自 last_delivered_id 之后的新消息。
Stream 的高可用
Stream 的高可用是建立主從復(fù)制基礎(chǔ)上的,它和其它數(shù)據(jù)結(jié)構(gòu)的復(fù)制機(jī)制沒(méi)有區(qū)別,也就是說(shuō)在 Sentinel 和 Cluster 集群環(huán)境下 Stream 是可以支持高可用的。不過(guò)鑒于 Redis 的指令復(fù)制是異步的,在 failover 發(fā)生時(shí),Redis 可能會(huì)丟失極小部分?jǐn)?shù)據(jù),這點(diǎn) Redis 的其它數(shù)據(jù)結(jié)構(gòu)也是一樣的。
分區(qū) Partition
Redis 的服務(wù)器沒(méi)有原生支持分區(qū)能力,如果想要使用分區(qū),那就需要分配多個(gè)Stream,然后在客戶端使用一定的策略來(lái)生產(chǎn)消息到不同的 Stream。
Kafka 是原生支持 Partition 的,但也是客戶端做的。Kafka 的客戶端存在 HashStrategy ,因?yàn)樗彩峭ㄟ^(guò)客戶端的 hash 算法來(lái)將不同的消息塞入不同分區(qū)
的。
另外,Kafka 還支持動(dòng)態(tài)增加分區(qū)數(shù)量的能力,但是這種調(diào)整能力也是很蹩腳的,它不會(huì)把之前已經(jīng)存在的內(nèi)容進(jìn)行 rehash,不會(huì)重新分區(qū)歷史數(shù)據(jù)。這種簡(jiǎn)單的動(dòng)態(tài)調(diào)整的能力Redis Stream 通過(guò)增加新的 Stream 就可以做到。
小結(jié)
Stream 的消費(fèi)模型借鑒了 Kafka 的消費(fèi)分組的概念,它彌補(bǔ)了 Redis Pub/Sub 不能持久化消息的缺陷。但是它又不同于 kafka,Kafka 的消息可以分 partition,而 Stream 不行。如果非要分 parition 的話,得在客戶端做,提供不同的 Stream 名稱,對(duì)消息進(jìn)行 hash 取模來(lái)選擇往哪個(gè) Stream 里塞。
數(shù)據(jù)結(jié)構(gòu) RadixTree
參考:radix tree,基數(shù)樹(shù)
總結(jié)
以上是生活随笔為你收集整理的Redis进阶-Stream多播的可持久化的消息队列的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: Redis进阶-如何从海量的 key 中
- 下一篇: Redis进阶-Redis 4种MQ 方