日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

Redis进阶-Stream多播的可持久化的消息队列

發(fā)布時間:2025/3/21 数据库 38 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Redis进阶-Stream多播的可持久化的消息队列 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

文章目錄

  • Pre
  • Stream簡介
  • Stream特性
  • 消息 ID
  • 消息內容
  • 命令預覽
  • 獨立消費
  • 創(chuàng)建消費組
  • 消費
  • Stream 消息積壓怎么處理
  • 消息如果忘記 ACK 會怎樣?
  • PEL 如何避免消息丟失?
  • Stream 的高可用
  • 分區(qū) Partition
  • 小結
  • 數據結構 RadixTree


Pre

Redis-13Redis發(fā)布訂閱 中提到了PubSub的不足之處 。

PubSub 的生產者傳遞過來一個消息,Redis 會直接找到相應的消費者傳遞過去。如果一個消費者都沒有,那么消息直接丟棄。

如果開始有三個消費者,一個消費者突然掛掉了,生產者會繼續(xù)發(fā)送消息,另外兩個消費者可以持續(xù)收到消息。但是掛掉的消費者重新連上的時候,這斷連期間生產者發(fā)送的消息,對于這個消費者來說就是徹底丟失了。

如果 Redis 停機重啟,PubSub 的消息是不會持久化的,畢竟 Redis 宕機就相當于一個消費者都沒有,所有的消息直接被丟棄。

正是因為 PubSub 有這些缺點,它幾乎找不到合適的應用場景。Redis5.0 新增了 Stream 數據結構,這個功能給 Redis 帶來了持久化消息隊列,從此 PubSub 可以消失了。


Stream簡介

Redis5.0 最大的新特性就是多出了一個數據結構 Stream,它是一個新的強大的支持多播的可持久化的消息隊列。 Redis Stream 借鑒了 Kafka 的設計。


Stream特性

  • Redis Stream 的結構如上圖所示,它有一個消息鏈表,將所有加入的消息都串起來,每個消息都有一個唯一的 ID 和對應的內容

  • 消息是持久化的,Redis 重啟后,內容還在

  • 每個 Stream 都有唯一的名稱,它就是 Redis 的 key,首次使用 xadd 指令追加消息時自動創(chuàng)建

  • 每個 Stream 都可以掛多個消費組,每個消費組會有個游標last_delivered_id 在 Stream數組之上往前移動,表示當前消費組已經消費到哪條消息了

  • 每個消費組都有一個 Stream內唯一的名稱,消費組不會自動創(chuàng)建,它需要單獨的指令 xgroup create 進行創(chuàng)建,需要指定從 Stream 的某個消息 ID 開始消費,這個 ID 用來初始化 last_delivered_id 變量。

  • 每個消費組 (Consumer Group) 的狀態(tài)都是獨立的,相互不受影響,即同一份Stream 內部的消息會被每個消費組都消費到

  • 同一個消費組 (Consumer Group) 可以掛接多個消費者 (Consumer),這些消費者之間是競爭關系,任意一個消費者讀取了消息都會使游標 last_delivered_id 往前移動。每個消費者有一個組內唯一名稱。

  • 消費者 (Consumer) 內部會有個狀態(tài)變量 pending_ids,它記錄了當前已經被客戶端讀取的消息,但是還沒有 ack。如果客戶端沒有 ack,這個變量里面的消息 ID 會越來越多,一旦某個消息被 ack,它就開始減少。這個 pending_ids 變量在 Redis 官方被稱之為 PEL ( Pending Entries List),這是一個很核心的數據結構,它用來確保客戶端至少消費了消息一次,而不會在網絡傳輸的中途丟失了沒處理。


消息 ID

消息 ID 的形式是 timestampInMillis-sequence,例如 1587877430819-3,它表示當前的消息在毫米時間戳 1587877430819時產生,并且是該毫秒內產生的第 3條消息。

消息 ID 可以由服務器自動生成,也可以由客戶端自己指定,但是形式必須是整數-整數,而且必須是后面加入的消息的 ID 要大于前面的消息 ID。


消息內容

消息內容就是鍵值對,形如 hash 結構的鍵值對,這沒什么特別之處。


命令預覽

Redis Version _ 5.0.3

  • xadd 追加消息
  • xdel 刪除消息,這里的刪除僅僅是設置了標志位,不影響消息總長度
  • xrange 獲取消息列表,會自動過濾已經刪除的消息
  • xlen 消息長度
  • del 刪除 Stream
  • 。。。。
# * 號表示服務器自動生成 ID,后面順序跟著一堆 key/value 127.0.0.1:6379> XADD artisankey * name fonia sex female "1587877430819-0" ---> 生成的消息 ID 127.0.0.1:6379> XADD artisankey * name jeff sex male "1587877454849-0"# XLEN 消息長度 127.0.0.1:6379> XLEN artisankey (integer) 2# -表示最小值 , + 表示最大值 127.0.0.1:6379> xrange artisankey - + 1) 1) "1587877430819-0"2) 1) "name"2) "fonia"3) "sex"4) "female" 2) 1) "1587877454849-0"2) 1) "name"2) "jeff"3) "sex"4) "male"# 指定最大消息 ID 的列表 127.0.0.1:6379> XRANGE artisankey - 1587877430819-0 1) 1) "1587877430819-0"2) 1) "name"2) "fonia"3) "sex"4) "female"# 再加入一條數據,格式任意 多了個age字段 127.0.0.1:6379> XADD artisankey * name jeff sex male age 20 "1587877808930-0" 127.0.0.1:6379> xrange artisankey - + 1) 1) "1587877430819-0"2) 1) "name"2) "fonia"3) "sex"4) "female" 2) 1) "1587877454849-0"2) 1) "name"2) "jeff"3) "sex"4) "male" 3) 1) "1587877808930-0"2) 1) "name"2) "jeff"3) "sex"4) "male"5) "age"6) "20"# 刪除掉剛才新加的這條數據 127.0.0.1:6379> XDEL artisankey 1587877808930-0 (integer) 1# 長度變?yōu)?span id="ozvdkddzhkzd" class="token number">2 127.0.0.1:6379> XLEN artisankey (integer) 2# 被刪除的消息已經沒了 127.0.0.1:6379> XRANGE artisankey - + 1) 1) "1587877430819-0"2) 1) "name"2) "fonia"3) "sex"4) "female" 2) 1) "1587877454849-0"2) 1) "name"2) "jeff"3) "sex"4) "male" 127.0.0.1:6379> # 刪除整個 Stream 127.0.0.1:6379> DEL artisankey (integer) 1 127.0.0.1:6379> XRANGE artisankey - + (empty list or set) 127.0.0.1:6379>

獨立消費

我們可以在不定義消費組的情況下進行 Stream 消息的獨立消費,當 Stream 沒有新消息時,甚至可以阻塞等待。

Redis 設計了一個單獨的消費指令 xread,可以將 Stream 當成普通的消息隊列 (list) 來使用。

使用 xread 時,我們可以完全忽略消費組 (Consumer Group)的存在,就好比 Stream 就是一個普通的列表 (list)。

演示一下

#先通過xadd向artisan這個隊列寫入5條數據 127.0.0.1:6379> XADD artisan * name artisan1 age 25 "1587886601587-0" 127.0.0.1:6379> XADD artisan * name artisan2 age 26 "1587886610449-0" 127.0.0.1:6379> XADD artisan * name artisan3 age 27 "1587886617014-0" 127.0.0.1:6379> XADD artisan * name artisan4 age 28 "1587886622590-0" 127.0.0.1:6379> XADD artisan * name artisan6 age 25 "1587886631932-0" 127.0.0.1:6379> XLEN artisan (integer) 5 127.0.0.1:6379> # 從 Stream 頭部讀取兩條消息 127.0.0.1:6379> XREAD COUNT 2 STREAMS artisan 0-0 1) 1) "artisan"2) 1) 1) "1587886601587-0"2) 1) "name"2) "artisan1"3) "age"4) "25"2) 1) "1587886610449-0"2) 1) "name"2) "artisan2"3) "age"4) "26" 127.0.0.1:6379> # 從 Stream 尾部讀取一條消息,毫無疑問,這里不會返回任何消息 127.0.0.1:6379> XREAD COUNT 1 STREAMS artisan $ (nil) 127.0.0.1:6379> # 從尾部阻塞等待新消息到來,下面的指令會堵住,直到新消息到來 127.0.0.1:6379> XREAD block 0 count 1 streams artisan $

# 重新打開一個窗口,在這個窗口往 Stream 里塞消息 127.0.0.1:6379> XADD artisan * name artisan7 age 27 "1587886999018-0" 127.0.0.1:6379>

# 再切換到前面的窗口,我們可以看到阻塞解除了,返回了新的消息內容 # 而且還顯示了一個等待時間,這里我們等待了 136.42s 127.0.0.1:6379> XREAD block 0 count 1 streams artisan $ 1) 1) "artisan"2) 1) 1) "1587886999018-0"2) 1) "name"2) "artisan7"3) "age"4) "27" (136.42s) 127.0.0.1:6379>

  • 客戶端如果想要使用 xread 進行順序消費,一定要記住當前消費到哪里了,也就是返回的消息 ID。下次繼續(xù)調用 xread 時,將上次返回的最后一個消息 ID 作為參數傳遞進去,就可以繼續(xù)消費后續(xù)的消息。

  • block 0 表示永遠阻塞,直到消息到來,block 1000 表示阻塞 1s,如果 1s 內沒有任何消息到來,就返回 nil

127.0.0.1:6379> XREAD count 1 block 1000 streams artisan $ (nil) (1.03s) 127.0.0.1:6379>

創(chuàng)建消費組

Stream 通過 xgroup create 指令創(chuàng)建消費組 (Consumer Group),需要傳遞起始消息 ID 參數用來初始化 last_delivered_id 變量

# 表示從頭開始消費 127.0.0.1:6379> XGROUP create artisan artisanGroup 0-0 OK 127.0.0.1:6379> # $ 表示從尾部開始消費,只接受新消息,當前 Stream 消息會全部忽略 127.0.0.1:6379> XGROUP create artisan artisanGroup2 $ OK 127.0.0.1:6379> 127.0.0.1:6379> XINFO stream artisan1) "length"2) (integer) 6 # 共 6 個消息3) "radix-tree-keys"4) (integer) 15) "radix-tree-nodes"6) (integer) 27) "groups"8) (integer) 2 # 兩個消費組9) "last-generated-id" 10) "1587886999018-0" 11) "first-entry" # 第一個消息 12) 1) "1587886601587-0"2) 1) "name"2) "artisan1"3) "age"4) "25" 13) "last-entry" # 最后一個消息 14) 1) "1587886999018-0"2) 1) "name"2) "artisan7"3) "age"4) "27" 127.0.0.1:6379> # 獲取 Stream 的消費組信息 127.0.0.1:6379> XINFO groups artisan 1) 1) "name"2) "artisanGroup"3) "consumers"4) (integer) 0 # 該消費組還沒有消費者5) "pending"6) (integer) 0 # 該消費組沒有正在處理的消息7) "last-delivered-id"8) "0-0" 2) 1) "name"2) "artisanGroup2"3) "consumers"4) (integer) 0 # 該消費組還沒有消費者5) "pending"6) (integer) 0 # 該消費組沒有正在處理的消息7) "last-delivered-id"8) "1587886999018-0"

消費

Stream 提供了 xreadgroup 指令可以進行消費組的組內消費,需要提供消費組名稱、消費者名稱和起始消息 ID。

它同 xread 一樣,也可以阻塞等待新消息。讀到新消息后,對應的消息 ID 就會進入消費者的 PEL(正在處理的消息) 結構里,客戶端處理完畢后使用 xack指令通知服務器,本條消息已經處理完畢,該消息 ID 就會從 PEL 中移除。

先看下目前隊列中的數據

127.0.0.1:6379> XRANGE artisan - + 1) 1) "1587886601587-0"2) 1) "name"2) "artisan1"3) "age"4) "25" 2) 1) "1587886610449-0"2) 1) "name"2) "artisan2"3) "age"4) "26" 3) 1) "1587886617014-0"2) 1) "name"2) "artisan3"3) "age"4) "27" 4) 1) "1587886622590-0"2) 1) "name"2) "artisan4"3) "age"4) "28" 5) 1) "1587886631932-0"2) 1) "name"2) "artisan6"3) "age"4) "25" 6) 1) "1587886999018-0"2) 1) "name"2) "artisan7"3) "age"4) "27" 127.0.0.1:6379> # > 號表示從當前消費組的 last_delivered_id 后面開始讀 # 每當消費者讀取一條消息,last_delivered_id 變量就會前進 127.0.0.1:6379> XREADGROUP group artisanGroup artisanGroup2 count 1 streams artisan > 1) 1) "artisan"2) 1) 1) "1587886601587-0"2) 1) "name"2) "artisan1"3) "age"4) "25" 127.0.0.1:6379> 127.0.0.1:6379> XREADGROUP group artisanGroup artisanGroup2 count 1 streams artisan > 1) 1) "artisan"2) 1) 1) "1587886610449-0"2) 1) "name"2) "artisan2"3) "age"4) "26" 127.0.0.1:6379> XREADGROUP group artisanGroup artisanGroup2 count 1 streams artisan > 1) 1) "artisan"2) 1) 1) "1587886617014-0"2) 1) "name"2) "artisan3"3) "age"4) "27" 127.0.0.1:6379> XREADGROUP group artisanGroup artisanGroup2 count 1 streams artisan > 1) 1) "artisan"2) 1) 1) "1587886622590-0"2) 1) "name"2) "artisan4"3) "age"4) "28" 127.0.0.1:6379> XREADGROUP group artisanGroup artisanGroup2 count 1 streams artisan > 1) 1) "artisan"2) 1) 1) "1587886631932-0"2) 1) "name"2) "artisan6"3) "age"4) "25" 127.0.0.1:6379> XREADGROUP group artisanGroup artisanGroup2 count 1 streams artisan > 1) 1) "artisan"2) 1) 1) "1587886999018-0"2) 1) "name"2) "artisan7"3) "age"4) "27"# 再繼續(xù)讀取,就沒有新消息了 127.0.0.1:6379> XREADGROUP group artisanGroup artisanGroup2 count 1 streams artisan > (nil) 127.0.0.1:6379> # 那就阻塞等待吧 127.0.0.1:6379> XREADGROUP group artisanGroup artisanGroup2 block 0 count 1 streams artisan > # 開啟另一個窗口,往里塞消息 127.0.0.1:6379> XADD artisan * name artisan8 age 28 "1587889553099-0" 127.0.0.1:6379>

# 回到前一個窗口,發(fā)現阻塞解除,收到新消息了 127.0.0.1:6379> XREADGROUP group artisanGroup artisanGroup2 block 0 count 1 streams artisan > 1) 1) "artisan"2) 1) 1) "1587889553099-0"2) 1) "name"2) "artisan8"3) "age"4) "28" (37.63s) 127.0.0.1:6379>

# 觀察消費組信息 127.0.0.1:6379> XINFO groups artisan 1) 1) "name"2) "artisanGroup"3) "consumers"4) (integer) 1 # 一個消費者5) "pending"6) (integer) 7 # 共 7 條正在處理的信息還有沒有 ack7) "last-delivered-id"8) "1587889553099-0" 2) 1) "name"2) "artisanGroup2"3) "consumers"4) (integer) 0 # 消費組 artisanGroup2沒有任何變化,因為前面我們一直在操縱 artisanGroup5) "pending"6) (integer) 07) "last-delivered-id"8) "1587886999018-0" 127.0.0.1:6379> # 如果同一個消費組有多個消費者,我們可以通過 xinfo consumers 指令觀察每個消費者的狀態(tài) 127.0.0.1:6379> XINFO consumers artisan artisanGroup 1) 1) "name"2) "artisanGroup2"3) "pending"4) (integer) 7 # 共 7 條待處理消息5) "idle"6) (integer) 179922 # 空閑了多長時間 ms 沒有讀取消息了 127.0.0.1:6379> XINFO consumers artisan artisanGroup2 (empty list or set) 127.0.0.1:6379> # 接下來我們 ack 一條消息 127.0.0.1:6379> XACK artisan artisanGroup 1587886601587-0 (integer) 1 127.0.0.1:6379> XINFO consumers artisan artisanGroup 1) 1) "name"2) "artisanGroup2"3) "pending"4) (integer) 6 # 變成了 6 條待處理的消息5) "idle"6) (integer) 359659 127.0.0.1:6379> # 下面 ack 所有消息127.0.0.1:6379> XACK artisan artisanGroup 1587886610449-0 1587886617014-0 1587886622590-0 1587886631932-0 1587886999018-0 1587889553099-0 (integer) 6 127.0.0.1:6379> XINFO consumers artisan artisanGroup 1) 1) "name"2) "artisanGroup2"3) "pending"4) (integer) 0 # pel 空了5) "idle"6) (integer) 528547 127.0.0.1:6379>

Stream 消息積壓怎么處理

消息積累太多,Stream 的鏈表豈不是很長,內容會不會爆掉?xdel指令又不會刪除消息,它只是給消息做了個標志位。

Redis考慮到了這一點,所以它提供了一個定長 Stream 功能。在 xadd 的指令提供一個定長長度 maxlen,就可以將老的消息干掉,確保最多不超過指定長度

127.0.0.1:6379> xlen artisan (integer) 7 127.0.0.1:6379> xadd artisan maxlen 3 * name artisan89 age 90 "1587890235506-0" 127.0.0.1:6379> XLEN artisan (integer) 3 127.0.0.1:6379>

我們看到 Stream 的長度被砍掉了,通過指定 maxlen,僅保留了 maxlen的長度數據。


消息如果忘記 ACK 會怎樣?

Stream 在每個消費者結構中保存了正在處理中的消息 ID 列表 PEL,如果消費者收到了消息處理完了但是沒有回復 ack,就會導致 PEL 列表不斷增長,如果有很多消費組的話,那么這個 PEL 占用的內存就會放大


PEL 如何避免消息丟失?

在客戶端消費者讀取 Stream 消息時,Redis 服務器將消息回復給客戶端的過程中,客戶端突然斷開了連接,消息就丟失了.

但是 PEL 里已經保存了發(fā)出去的消息 ID。待客戶端重新連上之后,可以再次收到 PEL 中的消息 ID 列表。不過此時 xreadgroup 的起始消息ID 不能為參數>,而必須是任意有效的消息 ID,一般將參數設為 0-0,表示讀取所有的PEL 消息以及自 last_delivered_id 之后的新消息。


Stream 的高可用

Stream 的高可用是建立主從復制基礎上的,它和其它數據結構的復制機制沒有區(qū)別,也就是說在 Sentinel 和 Cluster 集群環(huán)境下 Stream 是可以支持高可用的。不過鑒于 Redis 的指令復制是異步的,在 failover 發(fā)生時,Redis 可能會丟失極小部分數據,這點 Redis 的其它數據結構也是一樣的。


分區(qū) Partition

Redis 的服務器沒有原生支持分區(qū)能力,如果想要使用分區(qū),那就需要分配多個Stream,然后在客戶端使用一定的策略來生產消息到不同的 Stream。

Kafka 是原生支持 Partition 的,但也是客戶端做的。Kafka 的客戶端存在 HashStrategy ,因為它也是通過客戶端的 hash 算法來將不同的消息塞入不同分區(qū)
的。

另外,Kafka 還支持動態(tài)增加分區(qū)數量的能力,但是這種調整能力也是很蹩腳的,它不會把之前已經存在的內容進行 rehash,不會重新分區(qū)歷史數據。這種簡單的動態(tài)調整的能力Redis Stream 通過增加新的 Stream 就可以做到。


小結

Stream 的消費模型借鑒了 Kafka 的消費分組的概念,它彌補了 Redis Pub/Sub 不能持久化消息的缺陷。但是它又不同于 kafka,Kafka 的消息可以分 partition,而 Stream 不行。如果非要分 parition 的話,得在客戶端做,提供不同的 Stream 名稱,對消息進行 hash 取模來選擇往哪個 Stream 里塞。

數據結構 RadixTree

參考:radix tree,基數樹


總結

以上是生活随笔為你收集整理的Redis进阶-Stream多播的可持久化的消息队列的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 欧美第一页草草影院 | 东北少妇不带套对白 | 伊人春色视频 | 无码人妻精品一区二区三区夜夜嗨 | 快色视频| 成人黄色短视频在线观看 | 好吊操免费视频 | 高清无码一区二区在线观看吞精 | 日本午夜小视频 | 中文字幕国产视频 | 国产欧美综合一区二区三区 | 超碰在线观看91 | 韩国主播青草200vip视频 | 91精品国产高清一区二区三区蜜臀 | 国产精品不卡一区 | 色视频导航 | 成人日批 | 天堂中文在线最新 | 一级黄色性片 | 欧美 日韩 中文 | 91在线视频免费 | 麻豆精品视频在线观看 | 国内精品久久久久久久久久 | 91激情视频在线 | 日本老小玩hd老少配 | 亚洲欧美日韩国产 | 疯狂做爰的爽文多肉小说王爷 | 2019天天干| 亚洲综合在线视频 | 麻豆69xxnxxporn| 亚洲24p | 一级黄色录像大片 | 麻豆成人在线观看 | 91美女在线观看 | 久久夜色精品国产欧美乱极品 | 日韩中文在线视频 | 少妇3p视频 | 国产老女人精品毛片久久 | 91大神精品在线 | 成人在线观看免费爱爱 | 乱色视频| 国产suv精品一区二区68 | 色诱av | 亚洲va国产va天堂va久久 | 熟妇五十路六十路息与子 | 黑人中文字幕一区二区三区 | 国产成人免费av一区二区午夜 | 亚洲精品国产无码 | 高潮毛片又色又爽免费 | 国产成人在线视频免费观看 | 亚洲黄色免费观看 | www青青草| 午夜激情一区 | 国产成人久久婷婷精品流白浆 | 一级做a爱片性色毛片 | 美女污软件 | 国产日韩欧美精品一区 | 在线观看日韩欧美 | 人妻无码一区二区三区久久 | 日韩欧美国产一区二区 | 亚洲精品国产成人久久av盗摄 | av一区二区三区在线 | 色七七桃花综合影院 | 欧美久草 | 青草青草视频 | 国产又爽又黄的视频 | 久久久久久久久福利 | 中文字幕1区2区3区 www.com黄色片 | 日日夜夜狠狠干 | 日韩人妻一区 | 99精品视频在线看 | 黄色喷水视频 | 日日碰狠狠添天天爽无码av | 国产99对白在线播放 | 亚洲色图av在线 | 91免费看网站 | 好吊妞在线观看 | 免费一级欧美片在线播放 | 色噜噜一区二区 | www.桃色 | 久久影院午夜理论片无码 | 99久久久无码国产精品不卡 | 玖草在线观看 | 免费黄在线看 | 91狠狠爱| 国产精品丝袜一区 | 国产一级视频在线播放 | 俺来也在线视频 | 动漫精品一区一码二码三码四码 | 男生吃小头头的视频 | 永久免费在线视频 | 国产一区二区视频免费观看 | wwwjavhd| 实拍澡堂美女洗澡av | 丝袜美腿亚洲综合 | 性欧美free| 亚洲成熟少妇视频在线观看 | 国产精品免费一区二区 | 免费在线欧美 |