日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) >

redis stream学习总结

發(fā)布時(shí)間:2024/8/23 38 豆豆
生活随笔 收集整理的這篇文章主要介紹了 redis stream学习总结 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

文章目錄

  • stream
    • Stream基本概念
    • 消息id
    • 消息內(nèi)容
    • 增刪查改
    • 消息生產(chǎn)
      • 添加消息 xadd
      • 查看消息長(zhǎng)度 xlen
      • 限制stream最大長(zhǎng)度
        • 1.xadd 中添加**maxlen**:
        • 2.xtrim
      • 查詢消息 xrange
      • 正向排序:消費(fèi)id從小到大排
        • 反向查詢:消費(fèi)id從大到小排
      • 刪除消息
    • 消息消費(fèi)
      • 獨(dú)立消費(fèi) xread
      • 消費(fèi)組
        • stream中出現(xiàn)很多特殊Ids解釋
        • 創(chuàng)建消費(fèi)組
        • 消息消費(fèi)
    • 查看stream信息
    • 場(chǎng)景問(wèn)題

stream

Stream基本概念

Redis 5.0 被作者 Antirez 突然發(fā)布出來(lái),增加了很多新的特色功能,其最大的新特性就是多出了一個(gè)數(shù)據(jù)結(jié)構(gòu) Stream,它是一個(gè)新的強(qiáng)大的支持多播的可持久化消息隊(duì)列,作者坦言 Redis Stream 極大地借鑒了 Kafka 的設(shè)計(jì)。

Redis Stream 的結(jié)構(gòu)如圖:


它有一個(gè)消息鏈表,將所有加入的消息都串起來(lái),每個(gè)消息都有一個(gè)唯一的 ID 和對(duì)應(yīng)的內(nèi)容。消息是持久化的,Redis 重啟后,內(nèi)容還在。

每個(gè) Stream 都有唯一的名稱,它就是 Redis 的 Key,在首次使用 xadd 執(zhí)行追加消息時(shí)自動(dòng)創(chuàng)建。

每個(gè) Stream 都可以掛多個(gè)消費(fèi)組(Consumer Group),每個(gè)消費(fèi)組會(huì)有個(gè)游標(biāo) last_delivered_id 在 Stream 數(shù)組之上往前移動(dòng)表示當(dāng)前消費(fèi)組已經(jīng)消費(fèi)到哪條消息了。每個(gè)消費(fèi)組都有一個(gè) Stream 內(nèi)唯一的名稱,消費(fèi)組不會(huì)自動(dòng)創(chuàng)建,它需要單獨(dú)的指令 xgroup create 進(jìn)行創(chuàng)建,需要指定從 Stream 的某個(gè)消息 ID 開(kāi)始消費(fèi),這個(gè) ID 用來(lái)初始化 last_delivered_id 變量。

每個(gè)消費(fèi)組的狀態(tài)都是獨(dú)立的,相互不受影響。也就是說(shuō)同一份 Stream 內(nèi)部的消息會(huì)被每個(gè)消費(fèi)組都消費(fèi)到

同一個(gè)消費(fèi)組可以掛接多個(gè)消費(fèi)者(Consumer),這些消費(fèi)者之間是競(jìng)爭(zhēng)關(guān)系,任意一個(gè)消費(fèi)者讀取了消息都會(huì)使游標(biāo) last_delivered_id 往前移動(dòng)。每個(gè)消費(fèi)者有一個(gè)組內(nèi)唯一名稱。

消費(fèi)者內(nèi)部會(huì)有一個(gè)狀態(tài)變量 pending_ids,它記錄了當(dāng)前已經(jīng)被客戶端讀取,但是還沒(méi)有 ack 的消息。如果客戶端沒(méi)有 ack,這個(gè)變量里面的消息 ID 就會(huì)越來(lái)越多,一旦某個(gè)消息被 ack,它就開(kāi)始減少。這個(gè) pending_ids 變量在 Redis 官方被稱為 PEL,也就是 Pending Entries List,這是一個(gè)核心的數(shù)據(jù)結(jié)構(gòu),它用來(lái)確保客戶端至少消費(fèi)了消息一次,而不會(huì)在網(wǎng)絡(luò)傳輸?shù)闹型緛G失了而沒(méi)被處理。

消息id

消息 ID 的形式是 TimestampInMillis-sequence,例如 1527846880572-5,它表示當(dāng)前的消息再毫秒時(shí)間戳 1527846880572 時(shí)產(chǎn)生,并且是該毫秒內(nèi)產(chǎn)生的第 5 條消息。消息 ID 可以由服務(wù)器自動(dòng)生成,也可以由客戶端自己指定,但是形式必須是 “整數(shù)-整數(shù)”,而且后面加入的消息的 ID 必須要大于前面的消息 ID。

消息內(nèi)容

消息內(nèi)容就是鍵值對(duì),形如 hash 結(jié)構(gòu)的鍵值對(duì),這沒(méi)什么特別之處。

增刪查改

增刪改查指令說(shuō)明如下:

1)xadd:向 Stream 追加消息。

2)xdel:向 Stream 中刪除消息,這里的刪除僅僅是設(shè)置標(biāo)志位,不影響消息總長(zhǎng)度。

3)xrange:獲取 Stream 中的消息列表,會(huì)自動(dòng)過(guò)濾已經(jīng)刪除的消息。

4)xlen:獲取 Stream 消息長(zhǎng)度。

5)del:刪除整個(gè) Stream 消息列表的所有消息。

消息生產(chǎn)

添加消息 xadd

語(yǔ)法:xadd stream_name Id field value(field value)
記住stream中存儲(chǔ)的消息必須是kv類型,不允許是單個(gè)String,如下操作都是鍵值對(duì)存儲(chǔ)的
使用:

xadd news_live * "2020-08-21 10:20" "國(guó)航CA1704航班遇顛簸下降千米,業(yè)內(nèi)人士解釋:正常操作 "xadd news_live * "2020-08-21 04:12" "5G成電信業(yè)務(wù)增長(zhǎng)“動(dòng)力擔(dān)當(dāng)” "

*:表示由redis自己生成key,key是由當(dāng)前時(shí)間戳(ms)-0格式

查看消息長(zhǎng)度 xlen

語(yǔ)法:*xlen stream_name *
使用:xlen news_live

添加成功后會(huì)顯示字符串1597979205554-0,改字符串表示時(shí)間戳(毫秒)+計(jì)數(shù),有點(diǎn)類似于雪花算法

限制stream最大長(zhǎng)度

1.xadd 中添加maxlen:

最多存儲(chǔ)消息數(shù),依據(jù)FIFO原則,自動(dòng)刪除超過(guò)最長(zhǎng)長(zhǎng)度的消息

語(yǔ)法:xadd stream_name maxlen n id field value(field,value)
記住:每次生成消息都需要帶這個(gè)參數(shù),如果maxlen不帶等于沒(méi)有限制,支持動(dòng)態(tài)改變maxlen
使用:

圖片最多存儲(chǔ)3條,每次添加xlen一直是3條

疑問(wèn):
1.如果我不知道消息具體的大小,我又如何利用maxlen達(dá)到自動(dòng)刪除?

解決:在MAXLEN選項(xiàng)個(gè)實(shí)際技術(shù)之間的~參數(shù)意味著:我并不真的需要這恰好1000個(gè)項(xiàng)目,它可以是1000或1010或1030,只需確保至少保存1000個(gè)項(xiàng)目。使用此參數(shù),僅在我們可以刪除整個(gè)節(jié)點(diǎn)時(shí)執(zhí)行修剪。這使它更有效率,通常是你想要的。

2.每次創(chuàng)建消息的時(shí)候都需要帶上比較麻煩,有沒(méi)有什么更好的辦法?

maxlen,,當(dāng)然有了,xtrim!!!

2.xtrim

XTRIM命令,它執(zhí)行與上面的MAXLEN選項(xiàng)非常相似的操作,但是此命令不需要添加任何內(nèi)容,可以以獨(dú)立方式對(duì)任何Stream運(yùn)行。
XTRIM mystream MAXLEN 10
XTRIM mystream MAXLEN ~ 10

查詢消息 xrange

查詢是生產(chǎn)者查詢自己生產(chǎn)的消息,和消費(fèi)者的消費(fèi)不是一回事

正向排序:消費(fèi)id從小到大排

1.查詢所有消息:xrange stream_name - +
使用:xrange news_live - +

2.指定起始id查詢:xrange news_live 1597980701728-0 +
查詢的消息id >= 起始id

3.指定最大id查詢:xrange news_live 1597980701728-0 +
查詢的消息id <= 結(jié)束id

反向查詢:消費(fèi)id從大到小排

1.查詢所有消息:srevrange stream_name + -
2.指定起始id查詢: xrevrange news + 2
消息id >= 2 倒排
3.指定最大id查詢: xrevrange news 2 -
消息id <= 2 倒排

練習(xí)命令:

刪除消息

語(yǔ)法:xdel stream_name id

消息消費(fèi)

獨(dú)立消費(fèi) xread

類似于List,生產(chǎn)者往list中寫(xiě)數(shù)據(jù),消費(fèi)者從list中讀數(shù)據(jù),只能有一個(gè)消費(fèi)者

1.頭部讀取 0-0
語(yǔ)法:xread count n stream stream_name 0-0

記住:0-0表示從頭開(kāi)啟讀取數(shù)據(jù),這里數(shù)據(jù)消費(fèi)記錄不會(huì)保存,每次都是從頭開(kāi)始,如果接著消費(fèi)必須自己制定其實(shí)id

制定起始id讀取:xread count n stream stream_name id

2.尾部讀取最新消息 $

從尾部讀取最新的一條消息

語(yǔ)法:
1.xread count n streams stream_name $
此時(shí)默認(rèn)不返回任何消息 ???用途

2.xread block time count n streams stream_name $
time為ms,如果time=0表示一直阻塞

切記:客戶端如果想要使用 xread 進(jìn)行順序消費(fèi),那么一定要記住當(dāng)前消費(fèi)到了那里,也就是返回的消息 ID。下次繼續(xù)調(diào)用 xread 時(shí),將上次返回的最后一個(gè)消息 ID 作為參數(shù)傳遞過(guò)去,就可以繼續(xù)消費(fèi)后續(xù)的消息。

block 0 表示永遠(yuǎn)阻塞,直到消息到來(lái);block 1000 表示阻塞 1s,如果 1s 內(nèi)沒(méi)有任何消息到來(lái),就返回 nil。

消費(fèi)組

stream中出現(xiàn)很多特殊Ids解釋

  • “-”:xrange中使用,等同于 小于某個(gè)id作用 標(biāo)識(shí)最小id
  • “+”:xrange中使用,等同于 大于某個(gè)id作用 標(biāo)識(shí)最大id
  • “$”:在給定Stream中已經(jīng)包含的最大ID,在xread、xcreategroup中標(biāo)識(shí)消費(fèi)著只能消費(fèi)最新消息
  • “>”:在消費(fèi)者組的上下文中使用,在xread、xreadgroup總標(biāo)識(shí)消費(fèi)未消費(fèi)過(guò)的消息
  • “*”:只能與XADD命令一起使用,意味新創(chuàng)建消息自動(dòng)由redis生成ID。
  • “0-0”:表示id從0開(kāi)始讀取,xcreategroup標(biāo)識(shí)消費(fèi)組從id為0開(kāi)始讀取
  • “0”:表示id從0開(kāi)始讀取,xreadgroup標(biāo)識(shí)消費(fèi)組消費(fèi)所有stream中的數(shù)據(jù)
  • 創(chuàng)建消費(fèi)組

    語(yǔ)法: xgroup create stream_name group_name last_delivered_id

    解釋:last_delivered_id為0-0 表示從頭開(kāi)始消費(fèi),last_delivered_id為 $ 表示從尾部開(kāi)始消費(fèi),只接收新消息,當(dāng)前stream消息全部忽略

    使用:xgroup create news goup1 0-0

    注意:xread、xreadgroup中都可以加上 block指令,標(biāo)識(shí)阻塞等待,直到接受到新的消息或者等待超時(shí)

    消息消費(fèi)

    語(yǔ)法:xreadgroup group group_name consumer_name count n streams news ids
    注意:ids不理解可以看上面的stream中出現(xiàn)很多特殊Ids解釋,沒(méi)有加count標(biāo)識(shí)消費(fèi)所有的消息

    解釋:
    1.xreadgroup group group_name consumer_name count n streams stream_name >
    消費(fèi)組開(kāi)始消費(fèi)未消費(fèi)的數(shù)據(jù)

    2.xreadgroup group group_name consumer_name count n streams stream_name 0
    表示id從0開(kāi)始消費(fèi),意味著消息stream中保留的所有消息,包括已經(jīng)消費(fèi)過(guò)的

    使用:
    1.xreadgroup group goup1 consumer1 streams news >

    2.xreadgroup count 0 group gb c1 streams news 0使用注意

    xgroup create news gb 0-0 xreadgroup count 0 group gb c1 streams news 0 xreadgroup count 0 group gb c1 streams news > xreadgroup count 0 group gb c1 streams news 0

    第一次xreadgroup 0無(wú)法查詢到數(shù)據(jù),知道 > 消費(fèi)完以后,xreadgroup 0才能獲取到數(shù)據(jù),說(shuō)明該指令獲取消費(fèi)組中消費(fèi)者為c1已經(jīng)消息過(guò)的數(shù)據(jù),如果把消費(fèi)者c1改成其他值返回?zé)o數(shù)據(jù)。

    使用注意:
    1.xcreategroup指定ids如果為$,如果xreadgroup ids利用>、0未消費(fèi)數(shù)據(jù)、無(wú)法讀取之前歷史,只能讀取最新數(shù)據(jù)
    2.xcreategroup指定ids為0-0,則xreadgroup ids利用>、0等都可使用

    查看stream信息

    1.xinfo stream stream_name

    2.xinfo groups stream_name

    場(chǎng)景問(wèn)題

    1.Stream消息太多時(shí)怎么辦?

    2.ack作用,如果忘記ack會(huì)怎樣?

    3.PEL 如何避免消息丟失?

    引用博客:

    原理篇:挑戰(zhàn) Kafka!Redis5.0 重量級(jí)特性 Stream 嘗鮮

    使用手冊(cè)篇:不是特別全的手冊(cè)

    翻譯官方文檔篇:翻譯的比較硬核,不過(guò)也不錯(cuò)

    命令行:redis5.0的Stream實(shí)現(xiàn)消息隊(duì)列(springboot+jedis簡(jiǎn)單例子)

    pending、消息轉(zhuǎn)移、死信隊(duì)列

    結(jié)合應(yīng)用場(chǎng)景聯(lián)系篇:直播場(chǎng)景下使用

    總結(jié)

    以上是生活随笔為你收集整理的redis stream学习总结的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

    如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。