redis stream学习总结
文章目錄
- stream
- Stream基本概念
- 消息id
- 消息內(nèi)容
- 增刪查改
- 消息生產(chǎn)
- 添加消息 xadd
- 查看消息長(zhǎng)度 xlen
- 限制stream最大長(zhǎng)度
- 1.xadd 中添加**maxlen**:
- 2.xtrim
- 查詢消息 xrange
- 正向排序:消費(fèi)id從小到大排
- 反向查詢:消費(fèi)id從大到小排
- 刪除消息
- 消息消費(fèi)
- 獨(dú)立消費(fèi) xread
- 消費(fèi)組
- stream中出現(xiàn)很多特殊Ids解釋
- 創(chuàng)建消費(fèi)組
- 消息消費(fèi)
- 查看stream信息
- 場(chǎng)景問(wèn)題
stream
Stream基本概念
Redis 5.0 被作者 Antirez 突然發(fā)布出來(lái),增加了很多新的特色功能,其最大的新特性就是多出了一個(gè)數(shù)據(jù)結(jié)構(gòu) Stream,它是一個(gè)新的強(qiáng)大的支持多播的可持久化消息隊(duì)列,作者坦言 Redis Stream 極大地借鑒了 Kafka 的設(shè)計(jì)。
Redis Stream 的結(jié)構(gòu)如圖:
它有一個(gè)消息鏈表,將所有加入的消息都串起來(lái),每個(gè)消息都有一個(gè)唯一的 ID 和對(duì)應(yīng)的內(nèi)容。消息是持久化的,Redis 重啟后,內(nèi)容還在。
每個(gè) Stream 都有唯一的名稱,它就是 Redis 的 Key,在首次使用 xadd 執(zhí)行追加消息時(shí)自動(dòng)創(chuàng)建。
每個(gè) Stream 都可以掛多個(gè)消費(fèi)組(Consumer Group),每個(gè)消費(fèi)組會(huì)有個(gè)游標(biāo) last_delivered_id 在 Stream 數(shù)組之上往前移動(dòng),表示當(dāng)前消費(fèi)組已經(jīng)消費(fèi)到哪條消息了。每個(gè)消費(fèi)組都有一個(gè) Stream 內(nèi)唯一的名稱,消費(fèi)組不會(huì)自動(dòng)創(chuàng)建,它需要單獨(dú)的指令 xgroup create 進(jìn)行創(chuàng)建,需要指定從 Stream 的某個(gè)消息 ID 開(kāi)始消費(fèi),這個(gè) ID 用來(lái)初始化 last_delivered_id 變量。
每個(gè)消費(fèi)組的狀態(tài)都是獨(dú)立的,相互不受影響。也就是說(shuō)同一份 Stream 內(nèi)部的消息會(huì)被每個(gè)消費(fèi)組都消費(fèi)到。
同一個(gè)消費(fèi)組可以掛接多個(gè)消費(fèi)者(Consumer),這些消費(fèi)者之間是競(jìng)爭(zhēng)關(guān)系,任意一個(gè)消費(fèi)者讀取了消息都會(huì)使游標(biāo) last_delivered_id 往前移動(dòng)。每個(gè)消費(fèi)者有一個(gè)組內(nèi)唯一名稱。
消費(fèi)者內(nèi)部會(huì)有一個(gè)狀態(tài)變量 pending_ids,它記錄了當(dāng)前已經(jīng)被客戶端讀取,但是還沒(méi)有 ack 的消息。如果客戶端沒(méi)有 ack,這個(gè)變量里面的消息 ID 就會(huì)越來(lái)越多,一旦某個(gè)消息被 ack,它就開(kāi)始減少。這個(gè) pending_ids 變量在 Redis 官方被稱為 PEL,也就是 Pending Entries List,這是一個(gè)核心的數(shù)據(jù)結(jié)構(gòu),它用來(lái)確保客戶端至少消費(fèi)了消息一次,而不會(huì)在網(wǎng)絡(luò)傳輸?shù)闹型緛G失了而沒(méi)被處理。
消息id
消息 ID 的形式是 TimestampInMillis-sequence,例如 1527846880572-5,它表示當(dāng)前的消息再毫秒時(shí)間戳 1527846880572 時(shí)產(chǎn)生,并且是該毫秒內(nèi)產(chǎn)生的第 5 條消息。消息 ID 可以由服務(wù)器自動(dòng)生成,也可以由客戶端自己指定,但是形式必須是 “整數(shù)-整數(shù)”,而且后面加入的消息的 ID 必須要大于前面的消息 ID。
消息內(nèi)容
消息內(nèi)容就是鍵值對(duì),形如 hash 結(jié)構(gòu)的鍵值對(duì),這沒(méi)什么特別之處。
增刪查改
增刪改查指令說(shuō)明如下:
1)xadd:向 Stream 追加消息。
2)xdel:向 Stream 中刪除消息,這里的刪除僅僅是設(shè)置標(biāo)志位,不影響消息總長(zhǎng)度。
3)xrange:獲取 Stream 中的消息列表,會(huì)自動(dòng)過(guò)濾已經(jīng)刪除的消息。
4)xlen:獲取 Stream 消息長(zhǎng)度。
5)del:刪除整個(gè) Stream 消息列表的所有消息。
消息生產(chǎn)
添加消息 xadd
語(yǔ)法:xadd stream_name Id field value(field value)
記住stream中存儲(chǔ)的消息必須是kv類型,不允許是單個(gè)String,如下操作都是鍵值對(duì)存儲(chǔ)的
使用:
*:表示由redis自己生成key,key是由當(dāng)前時(shí)間戳(ms)-0格式
查看消息長(zhǎng)度 xlen
語(yǔ)法:*xlen stream_name *
使用:xlen news_live
添加成功后會(huì)顯示字符串1597979205554-0,改字符串表示時(shí)間戳(毫秒)+計(jì)數(shù),有點(diǎn)類似于雪花算法
限制stream最大長(zhǎng)度
1.xadd 中添加maxlen:
最多存儲(chǔ)消息數(shù),依據(jù)FIFO原則,自動(dòng)刪除超過(guò)最長(zhǎng)長(zhǎng)度的消息
語(yǔ)法:xadd stream_name maxlen n id field value(field,value)
記住:每次生成消息都需要帶這個(gè)參數(shù),如果maxlen不帶等于沒(méi)有限制,支持動(dòng)態(tài)改變maxlen
使用:
圖片最多存儲(chǔ)3條,每次添加xlen一直是3條
疑問(wèn):
1.如果我不知道消息具體的大小,我又如何利用maxlen達(dá)到自動(dòng)刪除?
解決:在MAXLEN選項(xiàng)個(gè)實(shí)際技術(shù)之間的~參數(shù)意味著:我并不真的需要這恰好1000個(gè)項(xiàng)目,它可以是1000或1010或1030,只需確保至少保存1000個(gè)項(xiàng)目。使用此參數(shù),僅在我們可以刪除整個(gè)節(jié)點(diǎn)時(shí)執(zhí)行修剪。這使它更有效率,通常是你想要的。
2.每次創(chuàng)建消息的時(shí)候都需要帶上比較麻煩,有沒(méi)有什么更好的辦法?
maxlen,,當(dāng)然有了,xtrim!!!
2.xtrim
XTRIM命令,它執(zhí)行與上面的MAXLEN選項(xiàng)非常相似的操作,但是此命令不需要添加任何內(nèi)容,可以以獨(dú)立方式對(duì)任何Stream運(yùn)行。
XTRIM mystream MAXLEN 10
XTRIM mystream MAXLEN ~ 10
查詢消息 xrange
查詢是生產(chǎn)者查詢自己生產(chǎn)的消息,和消費(fèi)者的消費(fèi)不是一回事
正向排序:消費(fèi)id從小到大排
1.查詢所有消息:xrange stream_name - +
使用:xrange news_live - +
2.指定起始id查詢:xrange news_live 1597980701728-0 +
查詢的消息id >= 起始id
3.指定最大id查詢:xrange news_live 1597980701728-0 +
查詢的消息id <= 結(jié)束id
反向查詢:消費(fèi)id從大到小排
1.查詢所有消息:srevrange stream_name + -
2.指定起始id查詢: xrevrange news + 2
消息id >= 2 倒排
3.指定最大id查詢: xrevrange news 2 -
消息id <= 2 倒排
練習(xí)命令:
刪除消息
語(yǔ)法:xdel stream_name id
消息消費(fèi)
獨(dú)立消費(fèi) xread
類似于List,生產(chǎn)者往list中寫(xiě)數(shù)據(jù),消費(fèi)者從list中讀數(shù)據(jù),只能有一個(gè)消費(fèi)者
1.頭部讀取 0-0
語(yǔ)法:xread count n stream stream_name 0-0
記住:0-0表示從頭開(kāi)啟讀取數(shù)據(jù),這里數(shù)據(jù)消費(fèi)記錄不會(huì)保存,每次都是從頭開(kāi)始,如果接著消費(fèi)必須自己制定其實(shí)id
制定起始id讀取:xread count n stream stream_name id
2.尾部讀取最新消息 $
從尾部讀取最新的一條消息
語(yǔ)法:
1.xread count n streams stream_name $
此時(shí)默認(rèn)不返回任何消息 ???用途
2.xread block time count n streams stream_name $
time為ms,如果time=0表示一直阻塞
切記:客戶端如果想要使用 xread 進(jìn)行順序消費(fèi),那么一定要記住當(dāng)前消費(fèi)到了那里,也就是返回的消息 ID。下次繼續(xù)調(diào)用 xread 時(shí),將上次返回的最后一個(gè)消息 ID 作為參數(shù)傳遞過(guò)去,就可以繼續(xù)消費(fèi)后續(xù)的消息。
block 0 表示永遠(yuǎn)阻塞,直到消息到來(lái);block 1000 表示阻塞 1s,如果 1s 內(nèi)沒(méi)有任何消息到來(lái),就返回 nil。
消費(fèi)組
stream中出現(xiàn)很多特殊Ids解釋
創(chuàng)建消費(fèi)組
語(yǔ)法: xgroup create stream_name group_name last_delivered_id
解釋:last_delivered_id為0-0 表示從頭開(kāi)始消費(fèi),last_delivered_id為 $ 表示從尾部開(kāi)始消費(fèi),只接收新消息,當(dāng)前stream消息全部忽略
使用:xgroup create news goup1 0-0
注意:xread、xreadgroup中都可以加上 block指令,標(biāo)識(shí)阻塞等待,直到接受到新的消息或者等待超時(shí)
消息消費(fèi)
語(yǔ)法:xreadgroup group group_name consumer_name count n streams news ids
注意:ids不理解可以看上面的stream中出現(xiàn)很多特殊Ids解釋,沒(méi)有加count標(biāo)識(shí)消費(fèi)所有的消息
解釋:
1.xreadgroup group group_name consumer_name count n streams stream_name >
消費(fèi)組開(kāi)始消費(fèi)未消費(fèi)的數(shù)據(jù)
2.xreadgroup group group_name consumer_name count n streams stream_name 0
表示id從0開(kāi)始消費(fèi),意味著消息stream中保留的所有消息,包括已經(jīng)消費(fèi)過(guò)的
使用:
1.xreadgroup group goup1 consumer1 streams news >
2.xreadgroup count 0 group gb c1 streams news 0使用注意
第一次xreadgroup 0無(wú)法查詢到數(shù)據(jù),知道 > 消費(fèi)完以后,xreadgroup 0才能獲取到數(shù)據(jù),說(shuō)明該指令獲取消費(fèi)組中消費(fèi)者為c1已經(jīng)消息過(guò)的數(shù)據(jù),如果把消費(fèi)者c1改成其他值返回?zé)o數(shù)據(jù)。
使用注意:
1.xcreategroup指定ids如果為$,如果xreadgroup ids利用>、0未消費(fèi)數(shù)據(jù)、無(wú)法讀取之前歷史,只能讀取最新數(shù)據(jù)
2.xcreategroup指定ids為0-0,則xreadgroup ids利用>、0等都可使用
查看stream信息
1.xinfo stream stream_name
2.xinfo groups stream_name
場(chǎng)景問(wèn)題
1.Stream消息太多時(shí)怎么辦?
2.ack作用,如果忘記ack會(huì)怎樣?
3.PEL 如何避免消息丟失?
引用博客:
原理篇:挑戰(zhàn) Kafka!Redis5.0 重量級(jí)特性 Stream 嘗鮮
使用手冊(cè)篇:不是特別全的手冊(cè)
翻譯官方文檔篇:翻譯的比較硬核,不過(guò)也不錯(cuò)
命令行:redis5.0的Stream實(shí)現(xiàn)消息隊(duì)列(springboot+jedis簡(jiǎn)單例子)
pending、消息轉(zhuǎn)移、死信隊(duì)列
結(jié)合應(yīng)用場(chǎng)景聯(lián)系篇:直播場(chǎng)景下使用
總結(jié)
以上是生活随笔為你收集整理的redis stream学习总结的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 根据经纬度显示地图轨迹
- 下一篇: java中如何使用反射调用方法以及获得类