日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 运维知识 > 数据库 >内容正文

数据库

Redis(8)——发布/订阅与Stream

發(fā)布時(shí)間:2024/5/8 数据库 77 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Redis(8)——发布/订阅与Stream 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

一、Redis 中的發(fā)布/訂閱功能

發(fā)布/ 訂閱系統(tǒng)?是 Web 系統(tǒng)中比較常用的一個(gè)功能。簡(jiǎn)單點(diǎn)說(shuō)就是?發(fā)布者發(fā)布消息,訂閱者接受消息,這有點(diǎn)類似于我們的報(bào)紙/ 雜志社之類的:?(借用前邊的一張圖)

  • 圖片引用自:「消息隊(duì)列」看過(guò)來(lái)! -?https://www.wmyskxz.com/2019/07/16/xiao-xi-dui-lie-kan-guo-lai/

從我們?前面(下方相關(guān)閱讀)?學(xué)習(xí)的知識(shí)來(lái)看,我們雖然可以使用一個(gè)?list?列表結(jié)構(gòu)結(jié)合?lpush?和?rpop?來(lái)實(shí)現(xiàn)消息隊(duì)列的功能,但是似乎很難實(shí)現(xiàn)實(shí)現(xiàn)?消息多播?的功能:

為了支持消息多播,Redis?不能再依賴于那 5 種基礎(chǔ)的數(shù)據(jù)結(jié)構(gòu)了,它單獨(dú)使用了一個(gè)模塊來(lái)支持消息多播,這個(gè)模塊就是?PubSub,也就是?PublisherSubscriber?(發(fā)布者/ 訂閱者模式)。

PubSub 簡(jiǎn)介

我們從?上面的圖?中可以看到,基于?list?結(jié)構(gòu)的消息隊(duì)列,是一種?Publisher?與?Consumer?點(diǎn)對(duì)點(diǎn)的強(qiáng)關(guān)聯(lián)關(guān)系,Redis?為了消除這樣的強(qiáng)關(guān)聯(lián),引入了另一種概念:頻道?(channel):

當(dāng)?Publisher?往?channel?中發(fā)布消息時(shí),關(guān)注了指定?channel?的?Consumer?就能夠同時(shí)受到消息。但這里的?問(wèn)題?是,消費(fèi)者訂閱一個(gè)頻道是必須?明確指定頻道名稱?的,這意味著,如果我們想要?訂閱多個(gè)?頻道,那么就必須?顯式地關(guān)注多個(gè)?名稱。

為了簡(jiǎn)化訂閱的繁瑣操作,Redis?提供了?模式訂閱?的功能?Pattern Subscribe,這樣就可以?一次性關(guān)注多個(gè)頻道?了,即使生產(chǎn)者新增了同模式的頻道,消費(fèi)者也可以立即受到消息:

例如上圖中,所有?位于圖片下方的?Consumer?都能夠受到消息

Publisher?往?wmyskxz.chat?這個(gè)?channel?中發(fā)送了一條消息,不僅僅關(guān)注了這個(gè)頻道的?Consumer 1?和?Consumer 2?能夠受到消息,圖片中的兩個(gè)?channel?都和模式?wmyskxz.*?匹配,所以?Redis?此時(shí)會(huì)同樣發(fā)送消息給訂閱了?wmyskxz.*?這個(gè)模式的?Consumer 3?和關(guān)注了在這個(gè)模式下的另一個(gè)頻道?wmyskxz.log?下的?Consumer 4?和?Consumer 5。

另一方面,如果接收消息的頻道是?wmyskxz.chat,那么?Consumer 3?也會(huì)受到消息。

快速體驗(yàn)

在?Redis?中,PubSub?模塊的使用非常簡(jiǎn)單,常用的命令也就下面這么幾條:

# 訂閱頻道: SUBSCRIBE channel [channel ....] # 訂閱給定的一個(gè)或多個(gè)頻道的信息 PSUBSCRIBE pattern [pattern ....] # 訂閱一個(gè)或多個(gè)符合給定模式的頻道 # 發(fā)布頻道: PUBLISH channel message # 將消息發(fā)送到指定的頻道 # 退訂頻道: UNSUBSCRIBE [channel [channel ....]] # 退訂指定的頻道 PUNSUBSCRIBE [pattern [pattern ....]] #退訂所有給定模式的頻道

我們可以在本地快速地來(lái)體驗(yàn)一下?PubSub

具體步驟如下:

  • 開(kāi)啟本地 Redis 服務(wù),新建兩個(gè)控制臺(tái)窗口;

  • 在其中一個(gè)窗口輸入?SUBSCRIBE wmyskxz.chat?關(guān)注?wmyskxz.chat?頻道,讓這個(gè)窗口成為?消費(fèi)者

  • 在另一個(gè)窗口輸入?PUBLISH wmyskxz.chat 'message'?往這個(gè)頻道發(fā)送消息,這個(gè)時(shí)候就會(huì)看到?另一個(gè)窗口實(shí)時(shí)地出現(xiàn)?了發(fā)送的測(cè)試消息。

  • 實(shí)現(xiàn)原理

    可以看到,我們通過(guò)很簡(jiǎn)單的兩條命令,幾乎就可以簡(jiǎn)單使用這樣的一個(gè)?發(fā)布/ 訂閱系統(tǒng)?了,但是具體是怎么樣實(shí)現(xiàn)的呢?

    每個(gè) Redis 服務(wù)器進(jìn)程維持著一個(gè)標(biāo)識(shí)服務(wù)器狀態(tài)?的?redis.h/redisServer?結(jié)構(gòu),其中就?保存著有訂閱的頻道?以及?訂閱模式?的信息:

    struct redisServer {// ...dict *pubsub_channels; // 訂閱頻道list *pubsub_patterns; // 訂閱模式// ... };

    訂閱頻道原理

    當(dāng)客戶端訂閱某一個(gè)頻道之后,Redis 就會(huì)往?pubsub_channels?這個(gè)字典中新添加一條數(shù)據(jù),實(shí)際上這個(gè)?dict?字典維護(hù)的是一張鏈表,比如,下圖展示的?pubsub_channels?示例中,client 1、client 2?就訂閱了?channel 1,而其他頻道也分別被其他客戶端訂閱:

    SUBSCRIBE 命令

    SUBSCRIBE?命令的行為可以用下列的偽代碼表示:

    def SUBSCRIBE(client, channels):# 遍歷所有輸入頻道for channel in channels:# 將客戶端添加到鏈表的末尾redisServer.pubsub_channels[channel].append(client)

    通過(guò)?pubsub_channels?字典,程序只要檢查某個(gè)頻道是否為字典的鍵,就可以知道該頻道是否正在被客戶端訂閱;只要取出某個(gè)鍵的值,就可以得到所有訂閱該頻道的客戶端的信息。

    PUBLISH 命令

    了解?SUBSCRIBE,那么?PUBLISH?命令的實(shí)現(xiàn)也變得十分簡(jiǎn)單了,只需要通過(guò)上述字典定位到具體的客戶端,再把消息發(fā)送給它們就好了:(偽代碼實(shí)現(xiàn)如下)

    def PUBLISH(channel, message):# 遍歷所有訂閱頻道 channel 的客戶端for client in server.pubsub_channels[channel]:# 將信息發(fā)送給它們send_message(client, message)

    UNSUBSCRIBE 命令

    使用?UNSUBSCRIBE?命令可以退訂指定的頻道,這個(gè)命令執(zhí)行的是訂閱的反操作:它從?pubsub_channels?字典的給定頻道(鍵)中,刪除關(guān)于當(dāng)前客戶端的信息,這樣被退訂頻道的信息就不會(huì)再發(fā)送給這個(gè)客戶端。

    訂閱模式原理

    正如我們上面說(shuō)到了,當(dāng)發(fā)送一條消息到?wmyskxz.chat?這個(gè)頻道時(shí),Redis 不僅僅會(huì)發(fā)送到當(dāng)前的頻道,還會(huì)發(fā)送到匹配于當(dāng)前模式的所有頻道,實(shí)際上,pubsub_patterns?背后還維護(hù)了一個(gè)?redis.h/pubsubPattern?結(jié)構(gòu):

    typedefstruct pubsubPattern {redisClient *client; // 訂閱模式的客戶端robj *pattern; // 訂閱的模式 } pubsubPattern;

    每當(dāng)調(diào)用?PSUBSCRIBE?命令訂閱一個(gè)模式時(shí),程序就創(chuàng)建一個(gè)包含客戶端信息和被訂閱模式的?pubsubPattern?結(jié)構(gòu),并將該結(jié)構(gòu)添加到?redisServer.pubsub_patterns?鏈表中。

    我們來(lái)看一個(gè)?pusub_patterns?鏈表的示例:

    這個(gè)時(shí)候客戶端?client 3?執(zhí)行?PSUBSCRIBE wmyskxz.java.*,那么?pubsub_patterns?鏈表就會(huì)被更新成這樣:

    通過(guò)遍歷整個(gè)?pubsub_patterns?鏈表,程序可以檢查所有正在被訂閱的模式,以及訂閱這些模式的客戶端。

    PUBLISH 命令

    上面給出的偽代碼并沒(méi)有?完整描述?PUBLISH?命令的行為,因?yàn)?PUBLISH?除了將?message?發(fā)送到?所有訂閱?channel?的客戶端?之外,它還會(huì)將?channel?和?pubsub_patterns?中的?模式?進(jìn)行對(duì)比,如果?channel?和某個(gè)模式匹配的話,那么也將?message?發(fā)送到?訂閱那個(gè)模式的客戶端

    完整描述?PUBLISH?功能的偽代碼定于如下:

    def PUBLISH(channel, message):# 遍歷所有訂閱頻道 channel 的客戶端for client in server.pubsub_channels[channel]:# 將信息發(fā)送給它們send_message(client, message)# 取出所有模式,以及訂閱模式的客戶端for pattern, client in server.pubsub_patterns:# 如果 channel 和模式匹配if match(channel, pattern):# 那么也將信息發(fā)給訂閱這個(gè)模式的客戶端send_message(client, message)

    PUNSUBSCRIBE 命令

    使用?PUNSUBSCRIBE?命令可以退訂指定的模式,這個(gè)命令執(zhí)行的是訂閱模式的反操作:序會(huì)刪除?redisServer.pubsub_patterns?鏈表中,所有和被退訂模式相關(guān)聯(lián)的?pubsubPattern?結(jié)構(gòu),這樣客戶端就不會(huì)再收到和模式相匹配的頻道發(fā)來(lái)的信息。

    PubSub 的缺點(diǎn)

    盡管?Redis?實(shí)現(xiàn)了?PubSub?模式來(lái)達(dá)到了?多播消息隊(duì)列?的目的,但在實(shí)際的消息隊(duì)列的領(lǐng)域,幾乎?找不到特別合適的場(chǎng)景,因?yàn)樗娜秉c(diǎn)十分明顯:

    • 沒(méi)有 Ack 機(jī)制,也不保證數(shù)據(jù)的連續(xù):?PubSub 的生產(chǎn)者傳遞過(guò)來(lái)一個(gè)消息,Redis 會(huì)直接找到相應(yīng)的消費(fèi)者傳遞過(guò)去。如果沒(méi)有一個(gè)消費(fèi)者,那么消息會(huì)被直接丟棄。如果開(kāi)始有三個(gè)消費(fèi)者,其中一個(gè)突然掛掉了,過(guò)了一會(huì)兒等它再重連時(shí),那么重連期間的消息對(duì)于這個(gè)消費(fèi)者來(lái)說(shuō)就徹底丟失了。

    • 不持久化消息:?如果 Redis 停機(jī)重啟,PubSub 的消息是不會(huì)持久化的,畢竟 Redis 宕機(jī)就相當(dāng)于一個(gè)消費(fèi)者都沒(méi)有,所有的消息都會(huì)被直接丟棄。

    基于上述缺點(diǎn),Redis 的作者甚至單獨(dú)開(kāi)啟了一個(gè) Disque 的項(xiàng)目來(lái)專門(mén)用來(lái)做多播消息隊(duì)列,不過(guò)該項(xiàng)目目前好像都沒(méi)有成熟。不過(guò)后來(lái)在 2018 年 6 月,Redis 5.0?新增了?Stream?數(shù)據(jù)結(jié)構(gòu),這個(gè)功能給 Redis 帶來(lái)了?持久化消息隊(duì)列,從此 PubSub 作為消息隊(duì)列的功能可以說(shuō)是就消失了..

    二、更為強(qiáng)大的 Stream | 持久化的發(fā)布/訂閱系統(tǒng)

    Redis Stream?從概念上來(lái)說(shuō),就像是一個(gè)?僅追加內(nèi)容?的?消息鏈表,把所有加入的消息都一個(gè)一個(gè)串起來(lái),每個(gè)消息都有一個(gè)唯一的 ID 和內(nèi)容,這很簡(jiǎn)單,讓它復(fù)雜的是從 Kafka 借鑒的另一種概念:消費(fèi)者組(Consumer Group)?(思路一致,實(shí)現(xiàn)不同):

    上圖就展示了一個(gè)典型的?Stream?結(jié)構(gòu)。每個(gè) Stream 都有唯一的名稱,它就是 Redis 的?key,在我們首次使用?xadd?指令追加消息時(shí)自動(dòng)創(chuàng)建。我們對(duì)圖中的一些概念做一下解釋:

    • Consumer Group:消費(fèi)者組,可以簡(jiǎn)單看成記錄流狀態(tài)的一種數(shù)據(jù)結(jié)構(gòu)。消費(fèi)者既可以選擇使用?XREAD?命令進(jìn)行?獨(dú)立消費(fèi),也可以多個(gè)消費(fèi)者同時(shí)加入一個(gè)消費(fèi)者組進(jìn)行?組內(nèi)消費(fèi)。同一個(gè)消費(fèi)者組內(nèi)的消費(fèi)者共享所有的 Stream 信息,同一條消息只會(huì)有一個(gè)消費(fèi)者消費(fèi)到,這樣就可以應(yīng)用在分布式的應(yīng)用場(chǎng)景中來(lái)保證消息的唯一性。

    • last_delivered_id:用來(lái)表示消費(fèi)者組消費(fèi)在 Stream 上?消費(fèi)位置?的游標(biāo)信息。每個(gè)消費(fèi)者組都有一個(gè) Stream 內(nèi)?唯一的名稱,消費(fèi)者組不會(huì)自動(dòng)創(chuàng)建,需要使用?XGROUP CREATE?指令來(lái)顯式創(chuàng)建,并且需要指定從哪一個(gè)消息 ID 開(kāi)始消費(fèi),用來(lái)初始化?last_delivered_id?這個(gè)變量。

    • pending_ids:每個(gè)消費(fèi)者內(nèi)部都有的一個(gè)狀態(tài)變量,用來(lái)表示?已經(jīng)?被客戶端?獲取,但是?還沒(méi)有 ack?的消息。記錄的目的是為了?保證客戶端至少消費(fèi)了消息一次,而不會(huì)在網(wǎng)絡(luò)傳輸?shù)闹型緛G失而沒(méi)有對(duì)消息進(jìn)行處理。如果客戶端沒(méi)有 ack,那么這個(gè)變量里面的消息 ID 就會(huì)越來(lái)越多,一旦某個(gè)消息被 ack,它就會(huì)對(duì)應(yīng)開(kāi)始減少。這個(gè)變量也被 Redis 官方稱為?PEL?(Pending Entries List)。

    消息 ID 和消息內(nèi)容

    消息 ID

    消息 ID 如果是由?XADD?命令返回自動(dòng)創(chuàng)建的話,那么它的格式會(huì)像這樣:timestampInMillis-sequence?(毫秒時(shí)間戳-序列號(hào)),例如?1527846880585-5,它表示當(dāng)前的消息是在毫秒時(shí)間戳?1527846880585?時(shí)產(chǎn)生的,并且是該毫秒內(nèi)產(chǎn)生的第 5 條消息。

    這些 ID 的格式看起來(lái)有一些奇怪,為什么要使用時(shí)間來(lái)當(dāng)做 ID 的一部分呢??一方面,我們要?滿足 ID 自增?的屬性,另一方面,也是為了?支持范圍查找?的功能。由于 ID 和生成消息的時(shí)間有關(guān),這樣就使得在根據(jù)時(shí)間范圍內(nèi)查找時(shí)基本上是沒(méi)有額外損耗的。

    當(dāng)然消息 ID 也可以由客戶端自定義,但是形式必須是?"整數(shù)-整數(shù)",而且后面加入的消息的 ID 必須要大于前面的消息 ID。

    消息內(nèi)容

    消息內(nèi)容就是普通的鍵值對(duì),形如 hash 結(jié)構(gòu)的鍵值對(duì)。

    增刪改查示例

    增刪改查命令很簡(jiǎn)單,詳情如下:

  • xadd:追加消息

  • xdel:刪除消息,這里的刪除僅僅是設(shè)置了標(biāo)志位,不影響消息總長(zhǎng)度

  • xrange:獲取消息列表,會(huì)自動(dòng)過(guò)濾已經(jīng)刪除的消息

  • xlen:消息長(zhǎng)度

  • del:刪除Stream

  • 使用示例:

    # *號(hào)表示服務(wù)器自動(dòng)生成ID,后面順序跟著一堆key/value 127.0.0.1:6379> xadd codehole * name laoqian age 30 # 名字叫l(wèi)aoqian,年齡30歲 1527849609889-0 # 生成的消息ID 127.0.0.1:6379> xadd codehole * name xiaoyu age 29 1527849629172-0 127.0.0.1:6379> xadd codehole * name xiaoqian age 1 1527849637634-0 127.0.0.1:6379> xlen codehole (integer) 3 127.0.0.1:6379> xrange codehole - + # -表示最小值, +表示最大值 1) 1) 1527849609889-02) 1) "name"2) "laoqian"3) "age"4) "30" 2) 1) 1527849629172-02) 1) "name"2) "xiaoyu"3) "age"4) "29" 3) 1) 1527849637634-02) 1) "name"2) "xiaoqian"3) "age"4) "1" 127.0.0.1:6379> xrange codehole 1527849629172-0 + # 指定最小消息ID的列表 1) 1) 1527849629172-02) 1) "name"2) "xiaoyu"3) "age"4) "29" 2) 1) 1527849637634-02) 1) "name"2) "xiaoqian"3) "age"4) "1" 127.0.0.1:6379> xrange codehole - 1527849629172-0 # 指定最大消息ID的列表 1) 1) 1527849609889-02) 1) "name"2) "laoqian"3) "age"4) "30" 2) 1) 1527849629172-02) 1) "name"2) "xiaoyu"3) "age"4) "29" 127.0.0.1:6379> xdel codehole 1527849609889-0 (integer) 1 127.0.0.1:6379> xlen codehole # 長(zhǎng)度不受影響 (integer) 3 127.0.0.1:6379> xrange codehole - + # 被刪除的消息沒(méi)了 1) 1) 1527849629172-02) 1) "name"2) "xiaoyu"3) "age"4) "29" 2) 1) 1527849637634-02) 1) "name"2) "xiaoqian"3) "age"4) "1" 127.0.0.1:6379> del codehole # 刪除整個(gè)Stream (integer) 1

    獨(dú)立消費(fèi)示例

    我們可以在不定義消費(fèi)組的情況下進(jìn)行 Stream 消息的?獨(dú)立消費(fèi),當(dāng) Stream 沒(méi)有新消息時(shí),甚至可以阻塞等待。Redis 設(shè)計(jì)了一個(gè)單獨(dú)的消費(fèi)指令?xread,可以將 Stream 當(dāng)成普通的消息隊(duì)列(list)來(lái)使用。使用?xread?時(shí),我們可以完全忽略?消費(fèi)組(Consumer Group)?的存在,就好比 Stream 就是一個(gè)普通的列表(list):

    # 從Stream頭部讀取兩條消息 127.0.0.1:6379> xread count 2 streams codehole 0-0 1) 1) "codehole"2) 1) 1) 1527851486781-02) 1) "name"2) "laoqian"3) "age"4) "30"2) 1) 1527851493405-02) 1) "name"2) "yurui"3) "age"4) "29" # 從Stream尾部讀取一條消息,毫無(wú)疑問(wèn),這里不會(huì)返回任何消息 127.0.0.1:6379> xread count 1 streams codehole $ (nil) # 從尾部阻塞等待新消息到來(lái),下面的指令會(huì)堵住,直到新消息到來(lái) 127.0.0.1:6379> xread block 0 count 1 streams codehole $ # 我們從新打開(kāi)一個(gè)窗口,在這個(gè)窗口往Stream里塞消息 127.0.0.1:6379> xadd codehole * name youming age 60 1527852774092-0 # 再切換到前面的窗口,我們可以看到阻塞解除了,返回了新的消息內(nèi)容 # 而且還顯示了一個(gè)等待時(shí)間,這里我們等待了93s 127.0.0.1:6379> xread block 0 count 1 streams codehole $ 1) 1) "codehole"2) 1) 1) 1527852774092-02) 1) "name"2) "youming"3) "age"4) "60" (93.11s)

    客戶端如果想要使用?xread?進(jìn)行?順序消費(fèi),一定要?記住當(dāng)前消費(fèi)?到哪里了,也就是返回的消息 ID。下次繼續(xù)調(diào)用?xread?時(shí),將上次返回的最后一個(gè)消息 ID 作為參數(shù)傳遞進(jìn)去,就可以繼續(xù)消費(fèi)后續(xù)的消息。

    block 0?表示永遠(yuǎn)阻塞,直到消息到來(lái),block 1000?表示阻塞?1s,如果?1s?內(nèi)沒(méi)有任何消息到來(lái),就返回?nil:

    127.0.0.1:6379> xread block 1000 count 1 streams codehole $ (nil) (1.07s)

    創(chuàng)建消費(fèi)者示例

    Stream 通過(guò)?xgroup create?指令創(chuàng)建消費(fèi)組(Consumer Group),需要傳遞起始消息 ID 參數(shù)用來(lái)初始化?last_delivered_id?變量:

    127.0.0.1:6379> xgroup create codehole cg1 0-0 # 表示從頭開(kāi)始消費(fèi) OK # $表示從尾部開(kāi)始消費(fèi),只接受新消息,當(dāng)前Stream消息會(huì)全部忽略 127.0.0.1:6379> xgroup create codehole cg2 $ OK 127.0.0.1:6379> xinfo codehole # 獲取Stream信息1) length2) (integer) 3 # 共3個(gè)消息3) radix-tree-keys4) (integer) 15) radix-tree-nodes6) (integer) 27) groups8) (integer) 2 # 兩個(gè)消費(fèi)組9) first-entry # 第一個(gè)消息 10) 1) 1527851486781-02) 1) "name"2) "laoqian"3) "age"4) "30" 11) last-entry # 最后一個(gè)消息 12) 1) 1527851498956-02) 1) "name"2) "xiaoqian"3) "age"4) "1" 127.0.0.1:6379> xinfo groups codehole # 獲取Stream的消費(fèi)組信息 1) 1) name2) "cg1"3) consumers4) (integer) 0 # 該消費(fèi)組還沒(méi)有消費(fèi)者5) pending6) (integer) 0 # 該消費(fèi)組沒(méi)有正在處理的消息 2) 1) name2) "cg2"3) consumers # 該消費(fèi)組還沒(méi)有消費(fèi)者4) (integer) 05) pending6) (integer) 0 # 該消費(fèi)組沒(méi)有正在處理的消息

    組內(nèi)消費(fèi)示例

    Stream 提供了?xreadgroup?指令可以進(jìn)行消費(fèi)組的組內(nèi)消費(fèi),需要提供?消費(fèi)組名稱、消費(fèi)者名稱和起始消息 ID。它同?xread?一樣,也可以阻塞等待新消息。讀到新消息后,對(duì)應(yīng)的消息 ID 就會(huì)進(jìn)入消費(fèi)者的?PEL?(正在處理的消息)?結(jié)構(gòu)里,客戶端處理完畢后使用?xack?指令?通知服務(wù)器,本條消息已經(jīng)處理完畢,該消息 ID 就會(huì)從?PEL?中移除,下面是示例:

    # >號(hào)表示從當(dāng)前消費(fèi)組的last_delivered_id后面開(kāi)始讀 # 每當(dāng)消費(fèi)者讀取一條消息,last_delivered_id變量就會(huì)前進(jìn) 127.0.0.1:6379> xreadgroup GROUP cg1 c1 count 1 streams codehole > 1) 1) "codehole"2) 1) 1) 1527851486781-02) 1) "name"2) "laoqian"3) "age"4) "30" 127.0.0.1:6379> xreadgroup GROUP cg1 c1 count 1 streams codehole > 1) 1) "codehole"2) 1) 1) 1527851493405-02) 1) "name"2) "yurui"3) "age"4) "29" 127.0.0.1:6379> xreadgroup GROUP cg1 c1 count 2 streams codehole > 1) 1) "codehole"2) 1) 1) 1527851498956-02) 1) "name"2) "xiaoqian"3) "age"4) "1"2) 1) 1527852774092-02) 1) "name"2) "youming"3) "age"4) "60" # 再繼續(xù)讀取,就沒(méi)有新消息了 127.0.0.1:6379> xreadgroup GROUP cg1 c1 count 1 streams codehole > (nil) # 那就阻塞等待吧 127.0.0.1:6379> xreadgroup GROUP cg1 c1 block 0 count 1 streams codehole > # 開(kāi)啟另一個(gè)窗口,往里塞消息 127.0.0.1:6379> xadd codehole * name lanying age 61 1527854062442-0 # 回到前一個(gè)窗口,發(fā)現(xiàn)阻塞解除,收到新消息了 127.0.0.1:6379> xreadgroup GROUP cg1 c1 block 0 count 1 streams codehole > 1) 1) "codehole"2) 1) 1) 1527854062442-02) 1) "name"2) "lanying"3) "age"4) "61" (36.54s) 127.0.0.1:6379> xinfo groups codehole # 觀察消費(fèi)組信息 1) 1) name2) "cg1"3) consumers4) (integer) 1 # 一個(gè)消費(fèi)者5) pending6) (integer) 5 # 共5條正在處理的信息還有沒(méi)有ack 2) 1) name2) "cg2"3) consumers4) (integer) 0 # 消費(fèi)組cg2沒(méi)有任何變化,因?yàn)榍懊嫖覀円恢痹诓倏vcg15) pending6) (integer) 0 # 如果同一個(gè)消費(fèi)組有多個(gè)消費(fèi)者,我們可以通過(guò)xinfo consumers指令觀察每個(gè)消費(fèi)者的狀態(tài) 127.0.0.1:6379> xinfo consumers codehole cg1 # 目前還有1個(gè)消費(fèi)者 1) 1) name2) "c1"3) pending4) (integer) 5 # 共5條待處理消息5) idle6) (integer) 418715 # 空閑了多長(zhǎng)時(shí)間ms沒(méi)有讀取消息了 # 接下來(lái)我們ack一條消息 127.0.0.1:6379> xack codehole cg1 1527851486781-0 (integer) 1 127.0.0.1:6379> xinfo consumers codehole cg1 1) 1) name2) "c1"3) pending4) (integer) 4 # 變成了5條5) idle6) (integer) 668504 # 下面ack所有消息 127.0.0.1:6379> xack codehole cg1 1527851493405-0 1527851498956-0 1527852774092-0 1527854062442-0 (integer) 4 127.0.0.1:6379> xinfo consumers codehole cg1 1) 1) name2) "c1"3) pending4) (integer) 0 # pel空了5) idle6) (integer) 745505

    QA 1:Stream 消息太多怎么辦??| Stream 的上限

    很容易想到,要是消息積累太多,Stream 的鏈表豈不是很長(zhǎng),內(nèi)容會(huì)不會(huì)爆掉就是個(gè)問(wèn)題了。xdel?指令又不會(huì)刪除消息,它只是給消息做了個(gè)標(biāo)志位。

    Redis 自然考慮到了這一點(diǎn),所以它提供了一個(gè)定長(zhǎng) Stream 功能。在?xadd?的指令提供一個(gè)定長(zhǎng)長(zhǎng)度?maxlen,就可以將老的消息干掉,確保最多不超過(guò)指定長(zhǎng)度,使用起來(lái)也很簡(jiǎn)單:

    > XADD mystream MAXLEN 2 * value 1 1526654998691-0 > XADD mystream MAXLEN 2 * value 2 1526654999635-0 > XADD mystream MAXLEN 2 * value 3 1526655000369-0 > XLEN mystream (integer) 2 > XRANGE mystream - + 1) 1) 1526654999635-02) 1) "value"2) "2" 2) 1) 1526655000369-02) 1) "value"2) "3"

    如果使用?MAXLEN?選項(xiàng),當(dāng) Stream 的達(dá)到指定長(zhǎng)度后,老的消息會(huì)自動(dòng)被淘汰掉,因此 Stream 的大小是恒定的。目前還沒(méi)有選項(xiàng)讓 Stream 只保留給定數(shù)量的條目,因?yàn)闉榱艘恢碌剡\(yùn)行,這樣的命令必須在很長(zhǎng)一段時(shí)間內(nèi)阻塞以淘汰消息。(例如在添加數(shù)據(jù)的高峰期間,你不得不長(zhǎng)暫停來(lái)淘汰舊消息和添加新的消息)

    另外使用?MAXLEN?選項(xiàng)的花銷是很大的,Stream 為了節(jié)省內(nèi)存空間,采用了一種特殊的結(jié)構(gòu)表示,而這種結(jié)構(gòu)的調(diào)整是需要額外的花銷的。所以我們可以使用一種帶有?~?的特殊命令:

    XADD mystream MAXLEN ~ 1000 * ... entry fields here ...

    它會(huì)基于當(dāng)前的結(jié)構(gòu)合理地對(duì)節(jié)點(diǎn)執(zhí)行裁剪,來(lái)保證至少會(huì)有?1000?條數(shù)據(jù),可能是?1010?也可能是?1030。

    QA 2:PEL 是如何避免消息丟失的?

    在客戶端消費(fèi)者讀取 Stream 消息時(shí),Redis 服務(wù)器將消息回復(fù)給客戶端的過(guò)程中,客戶端突然斷開(kāi)了連接,消息就丟失了。但是 PEL 里已經(jīng)保存了發(fā)出去的消息 ID,待客戶端重新連上之后,可以再次收到 PEL 中的消息 ID 列表。不過(guò)此時(shí)?xreadgroup?的起始消息 ID 不能為參數(shù)?>?,而必須是任意有效的消息 ID,一般將參數(shù)設(shè)為?0-0,表示讀取所有的 PEL 消息以及自?last_delivered_id?之后的新消息。

    Redis Stream Vs Kafka

    Redis 基于內(nèi)存存儲(chǔ),這意味著它會(huì)比基于磁盤(pán)的 Kafka 快上一些,也意味著使用 Redis 我們?不能長(zhǎng)時(shí)間存儲(chǔ)大量數(shù)據(jù)。不過(guò)如果您想以?最小延遲?實(shí)時(shí)處理消息的話,您可以考慮 Redis,但是如果?消息很大并且應(yīng)該重用數(shù)據(jù)?的話,則應(yīng)該首先考慮使用 Kafka。

    另外從某些角度來(lái)說(shuō),Redis Stream?也更適用于小型、廉價(jià)的應(yīng)用程序,因?yàn)?Kafka?相對(duì)來(lái)說(shuō)更難配置一些。

    相關(guān)閱讀

  • Redis(1)——5種基本數(shù)據(jù)結(jié)構(gòu)

  • Redis(2)——跳躍表?

  • Redis(3)——分布式鎖深入探究

  • Reids(4)——神奇的HyperLoglog解決統(tǒng)計(jì)問(wèn)題

  • Redis(5)——億級(jí)數(shù)據(jù)過(guò)濾和布隆過(guò)濾器

  • Redis(6)——GeoHash查找附近的人

  • Redis(7)——持久化【一文了解】

  • 參考資料

  • 訂閱與發(fā)布——Redis 設(shè)計(jì)與實(shí)現(xiàn) -?https://redisbook.readthedocs.io/en/latest/feature/pubsub.html

  • 《Redis 深度歷險(xiǎn)》 - 錢(qián)文品/ 著 -?https://book.douban.com/subject/30386804/

  • Introduction to Redis Streams【官方文檔】 -?https://redis.io/topics/streams-intro

  • Kafka vs. Redis: Log Aggregation Capabilities and Performance -?https://logz.io/blog/kafka-vs-redis/

  • 總結(jié)

    以上是生活随笔為你收集整理的Redis(8)——发布/订阅与Stream的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

    如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。