redis stream学习总结
文章目錄
- stream
- Stream基本概念
- 消息id
- 消息內容
- 增刪查改
- 消息生產
- 添加消息 xadd
- 查看消息長度 xlen
- 限制stream最大長度
- 1.xadd 中添加**maxlen**:
- 2.xtrim
- 查詢消息 xrange
- 正向排序:消費id從小到大排
- 反向查詢:消費id從大到小排
- 刪除消息
- 消息消費
- 獨立消費 xread
- 消費組
- stream中出現很多特殊Ids解釋
- 創建消費組
- 消息消費
- 查看stream信息
- 場景問題
stream
Stream基本概念
Redis 5.0 被作者 Antirez 突然發布出來,增加了很多新的特色功能,其最大的新特性就是多出了一個數據結構 Stream,它是一個新的強大的支持多播的可持久化消息隊列,作者坦言 Redis Stream 極大地借鑒了 Kafka 的設計。
Redis Stream 的結構如圖:
它有一個消息鏈表,將所有加入的消息都串起來,每個消息都有一個唯一的 ID 和對應的內容。消息是持久化的,Redis 重啟后,內容還在。
每個 Stream 都有唯一的名稱,它就是 Redis 的 Key,在首次使用 xadd 執行追加消息時自動創建。
每個 Stream 都可以掛多個消費組(Consumer Group),每個消費組會有個游標 last_delivered_id 在 Stream 數組之上往前移動,表示當前消費組已經消費到哪條消息了。每個消費組都有一個 Stream 內唯一的名稱,消費組不會自動創建,它需要單獨的指令 xgroup create 進行創建,需要指定從 Stream 的某個消息 ID 開始消費,這個 ID 用來初始化 last_delivered_id 變量。
每個消費組的狀態都是獨立的,相互不受影響。也就是說同一份 Stream 內部的消息會被每個消費組都消費到。
同一個消費組可以掛接多個消費者(Consumer),這些消費者之間是競爭關系,任意一個消費者讀取了消息都會使游標 last_delivered_id 往前移動。每個消費者有一個組內唯一名稱。
消費者內部會有一個狀態變量 pending_ids,它記錄了當前已經被客戶端讀取,但是還沒有 ack 的消息。如果客戶端沒有 ack,這個變量里面的消息 ID 就會越來越多,一旦某個消息被 ack,它就開始減少。這個 pending_ids 變量在 Redis 官方被稱為 PEL,也就是 Pending Entries List,這是一個核心的數據結構,它用來確保客戶端至少消費了消息一次,而不會在網絡傳輸的中途丟失了而沒被處理。
消息id
消息 ID 的形式是 TimestampInMillis-sequence,例如 1527846880572-5,它表示當前的消息再毫秒時間戳 1527846880572 時產生,并且是該毫秒內產生的第 5 條消息。消息 ID 可以由服務器自動生成,也可以由客戶端自己指定,但是形式必須是 “整數-整數”,而且后面加入的消息的 ID 必須要大于前面的消息 ID。
消息內容
消息內容就是鍵值對,形如 hash 結構的鍵值對,這沒什么特別之處。
增刪查改
增刪改查指令說明如下:
1)xadd:向 Stream 追加消息。
2)xdel:向 Stream 中刪除消息,這里的刪除僅僅是設置標志位,不影響消息總長度。
3)xrange:獲取 Stream 中的消息列表,會自動過濾已經刪除的消息。
4)xlen:獲取 Stream 消息長度。
5)del:刪除整個 Stream 消息列表的所有消息。
消息生產
添加消息 xadd
語法:xadd stream_name Id field value(field value)
記住stream中存儲的消息必須是kv類型,不允許是單個String,如下操作都是鍵值對存儲的
使用:
*:表示由redis自己生成key,key是由當前時間戳(ms)-0格式
查看消息長度 xlen
語法:*xlen stream_name *
使用:xlen news_live
添加成功后會顯示字符串1597979205554-0,改字符串表示時間戳(毫秒)+計數,有點類似于雪花算法
限制stream最大長度
1.xadd 中添加maxlen:
最多存儲消息數,依據FIFO原則,自動刪除超過最長長度的消息
語法:xadd stream_name maxlen n id field value(field,value)
記住:每次生成消息都需要帶這個參數,如果maxlen不帶等于沒有限制,支持動態改變maxlen
使用:
圖片最多存儲3條,每次添加xlen一直是3條
疑問:
1.如果我不知道消息具體的大小,我又如何利用maxlen達到自動刪除?
解決:在MAXLEN選項個實際技術之間的~參數意味著:我并不真的需要這恰好1000個項目,它可以是1000或1010或1030,只需確保至少保存1000個項目。使用此參數,僅在我們可以刪除整個節點時執行修剪。這使它更有效率,通常是你想要的。
2.每次創建消息的時候都需要帶上比較麻煩,有沒有什么更好的辦法?
maxlen,,當然有了,xtrim!!!
2.xtrim
XTRIM命令,它執行與上面的MAXLEN選項非常相似的操作,但是此命令不需要添加任何內容,可以以獨立方式對任何Stream運行。
XTRIM mystream MAXLEN 10
XTRIM mystream MAXLEN ~ 10
查詢消息 xrange
查詢是生產者查詢自己生產的消息,和消費者的消費不是一回事
正向排序:消費id從小到大排
1.查詢所有消息:xrange stream_name - +
使用:xrange news_live - +
2.指定起始id查詢:xrange news_live 1597980701728-0 +
查詢的消息id >= 起始id
3.指定最大id查詢:xrange news_live 1597980701728-0 +
查詢的消息id <= 結束id
反向查詢:消費id從大到小排
1.查詢所有消息:srevrange stream_name + -
2.指定起始id查詢: xrevrange news + 2
消息id >= 2 倒排
3.指定最大id查詢: xrevrange news 2 -
消息id <= 2 倒排
練習命令:
刪除消息
語法:xdel stream_name id
消息消費
獨立消費 xread
類似于List,生產者往list中寫數據,消費者從list中讀數據,只能有一個消費者
1.頭部讀取 0-0
語法:xread count n stream stream_name 0-0
記住:0-0表示從頭開啟讀取數據,這里數據消費記錄不會保存,每次都是從頭開始,如果接著消費必須自己制定其實id
制定起始id讀取:xread count n stream stream_name id
2.尾部讀取最新消息 $
從尾部讀取最新的一條消息
語法:
1.xread count n streams stream_name $
此時默認不返回任何消息 ???用途
2.xread block time count n streams stream_name $
time為ms,如果time=0表示一直阻塞
切記:客戶端如果想要使用 xread 進行順序消費,那么一定要記住當前消費到了那里,也就是返回的消息 ID。下次繼續調用 xread 時,將上次返回的最后一個消息 ID 作為參數傳遞過去,就可以繼續消費后續的消息。
block 0 表示永遠阻塞,直到消息到來;block 1000 表示阻塞 1s,如果 1s 內沒有任何消息到來,就返回 nil。
消費組
stream中出現很多特殊Ids解釋
創建消費組
語法: xgroup create stream_name group_name last_delivered_id
解釋:last_delivered_id為0-0 表示從頭開始消費,last_delivered_id為 $ 表示從尾部開始消費,只接收新消息,當前stream消息全部忽略
使用:xgroup create news goup1 0-0
注意:xread、xreadgroup中都可以加上 block指令,標識阻塞等待,直到接受到新的消息或者等待超時
消息消費
語法:xreadgroup group group_name consumer_name count n streams news ids
注意:ids不理解可以看上面的stream中出現很多特殊Ids解釋,沒有加count標識消費所有的消息
解釋:
1.xreadgroup group group_name consumer_name count n streams stream_name >
消費組開始消費未消費的數據
2.xreadgroup group group_name consumer_name count n streams stream_name 0
表示id從0開始消費,意味著消息stream中保留的所有消息,包括已經消費過的
使用:
1.xreadgroup group goup1 consumer1 streams news >
2.xreadgroup count 0 group gb c1 streams news 0使用注意
第一次xreadgroup 0無法查詢到數據,知道 > 消費完以后,xreadgroup 0才能獲取到數據,說明該指令獲取消費組中消費者為c1已經消息過的數據,如果把消費者c1改成其他值返回無數據。
使用注意:
1.xcreategroup指定ids如果為$,如果xreadgroup ids利用>、0未消費數據、無法讀取之前歷史,只能讀取最新數據
2.xcreategroup指定ids為0-0,則xreadgroup ids利用>、0等都可使用
查看stream信息
1.xinfo stream stream_name
2.xinfo groups stream_name
場景問題
1.Stream消息太多時怎么辦?
2.ack作用,如果忘記ack會怎樣?
3.PEL 如何避免消息丟失?
引用博客:
原理篇:挑戰 Kafka!Redis5.0 重量級特性 Stream 嘗鮮
使用手冊篇:不是特別全的手冊
翻譯官方文檔篇:翻譯的比較硬核,不過也不錯
命令行:redis5.0的Stream實現消息隊列(springboot+jedis簡單例子)
pending、消息轉移、死信隊列
結合應用場景聯系篇:直播場景下使用
總結
以上是生活随笔為你收集整理的redis stream学习总结的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 根据经纬度显示地图轨迹
- 下一篇: java中如何使用反射调用方法以及获得类