日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

redis之如何实现消息队列

發(fā)布時(shí)間:2023/12/29 编程问答 36 豆豆
生活随笔 收集整理的這篇文章主要介紹了 redis之如何实现消息队列 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

寫在前面

本文一起來看下使用redis如何實(shí)現(xiàn)消息隊(duì)列的功能。目前在redis想要實(shí)現(xiàn)消息隊(duì)列的功能有如下的兩種方案:

1:基于List的lpush和rpop 2:Streams

這里不將pub/sub考慮在內(nèi),因?yàn)槠洳痪邆涑志没哪芰?#xff0c;消息會(huì)丟失。

其中1是利用其有的先進(jìn)先出特性實(shí)現(xiàn),2是redis為了實(shí)現(xiàn)消息隊(duì)列專門在redis5版本中定義的一種新的數(shù)據(jù)結(jié)構(gòu),這里注意,其也是一種數(shù)據(jù)結(jié)構(gòu),和String,Set等處于同等位置的數(shù)據(jù)結(jié)構(gòu),只不過內(nèi)部增加了一些針對(duì)消息隊(duì)列的一些特有操作來實(shí)現(xiàn)消息隊(duì)列的功能,后續(xù)我們會(huì)詳細(xì)分析其用法。

1:消息隊(duì)列需要滿足哪些要求

  • 消息保序
    即要保證消息發(fā)送的順序和消費(fèi)的順序是一致的,不一致的話可能會(huì)導(dǎo)致業(yè)務(wù)上的錯(cuò)誤。
  • 重復(fù)消息處理
    消息的重復(fù)處理,準(zhǔn)確來說是應(yīng)該通過消費(fèi)者自身來實(shí)現(xiàn)的,在應(yīng)用程序內(nèi)部實(shí)現(xiàn)這些邏輯,但是對(duì)于消息中間件來說也要有此類機(jī)制,即對(duì)于一個(gè)已經(jīng)被消費(fèi)的消息(已經(jīng)收到ACK)不能再次被消費(fèi)。
  • 消息可靠性
    換句話說,要具有持久化的能力,避免消息丟失,這樣當(dāng)消費(fèi)者異常宕機(jī)導(dǎo)致再次重啟后需要重新消費(fèi)消息時(shí)可以再次獲取。

接下來我們先來看下使用List如何實(shí)現(xiàn)消息隊(duì)列。

2:List實(shí)現(xiàn)消息隊(duì)列

生產(chǎn)消息我們可以使用lpush,消費(fèi)消息可以使用rpop,如下圖生產(chǎn)和消費(fèi)消息的過程:

但是注意,這里我們使用rpop消費(fèi)消息時(shí),如果沒有消息則會(huì)直接返回, 且有新消息時(shí)也不會(huì)通知,此時(shí)就需要通過諸如while(true),for(;;)之類的死循環(huán)來不斷查詢,這樣就會(huì)不斷的消耗CPU資源,造成不必要的CPU資源浪費(fèi),為了解決這個(gè)問題,redis提供了rpop阻塞版本命令brpop,其中bblock就代表了阻塞的特性,當(dāng)有消息可以消費(fèi)時(shí),這樣就解決了浪費(fèi)CPU資源的問題。

到這里,對(duì)于消息中間件的3個(gè)需求,List都滿足哪些呢?首先第一個(gè)消息保序,是天然支持的,對(duì)于重復(fù)消息處理List本身是不支持的,但是我們?cè)谇懊嬉卜治隽?#xff0c;這準(zhǔn)確來說更應(yīng)該是消費(fèi)者本身來實(shí)現(xiàn)的,因此,生產(chǎn)者在生產(chǎn)消息時(shí)只要給每一個(gè)消息一個(gè)唯一的標(biāo)識(shí),然后消費(fèi)者通過此來避免消息重復(fù)處理就可以了,比如LPUSH mq "101030001:stock:5",其中的101030001就是其唯一標(biāo)識(shí)。最后是消息可靠性,為了滿足消息可靠性,redis進(jìn)一步對(duì)rpop進(jìn)行了增強(qiáng),提供了BRPOPLPUSH命令,消息彈出后,會(huì)重新壓到一個(gè)新的隊(duì)列中,這樣當(dāng)消費(fèi)者異常重啟后就可以從這里重新消費(fèi)消息了,如下圖:

到這里,List在一定程序上都滿足了消息中間件需要滿足的3個(gè)條件,這里還需要考慮另外一種情況,即當(dāng)消費(fèi)者的消費(fèi)能力嚴(yán)重不足,或者是生產(chǎn)者生產(chǎn)的消息量非常大,即生產(chǎn)能力遠(yuǎn)大于消費(fèi)能力的時(shí)候List就沒有什么辦法了,因?yàn)槠渲荒芤粋€(gè)一個(gè)的彈出消息,對(duì)于這個(gè)問題就需要依靠redis專門針對(duì)消息隊(duì)列的場景實(shí)現(xiàn)的新數(shù)據(jù)結(jié)構(gòu)Streams,注意這也是一種新的數(shù)據(jù)結(jié)構(gòu)。

3:Streams

我們前面說了,Streams是一種redis專門為消息隊(duì)列定義的一種數(shù)據(jù)結(jié)構(gòu),所以自然的我們是先要看如何定義這種數(shù)據(jù)結(jié)構(gòu)了,和其它的數(shù)據(jù)結(jié)構(gòu)一樣,我們不需要顯式的創(chuàng)建,在執(zhí)行第一次數(shù)據(jù)添加的時(shí)候自動(dòng)創(chuàng)建,添加數(shù)據(jù)的命令是XADD,語法格式是XADD key ID field value [field value ...],參數(shù)說明如下:

key:redis的key ID:消息的唯一標(biāo)識(shí),可以指定,也可以設(shè)置為*,設(shè)置為*時(shí)id會(huì)自動(dòng)生成,id是遞增 field value:消息的字段和值

如下生產(chǎn)(創(chuàng)建)若干條消息:

redis> XADD mystream * name Sara surname OConnor "1601372323627-0" redis> XADD mystream * field1 value1 field2 value2 field3 value3 "1601372323627-1" redis> XLEN mystream (integer) 2 redis> XRANGE mystream - + 1) 1) "1601372323627-0"2) 1) "name"2) "Sara"3) "surname"4) "OConnor" 2) 1) "1601372323627-1"2) 1) "field1"2) "value1"3) "field2"4) "value2"5) "field3"6) "value3" redis>

其中XLEN用來查看消息的個(gè)數(shù),XRANGE用來通過范圍查詢基于遞增ID獲取消息,-相當(dāng)于是負(fù)無窮,+相當(dāng)于是正無窮,即獲取所有消息。我們接著再來看下其它一些命令。

3.1:XDEL

根據(jù)ID刪除消息,如下測試:

> XADD mystream * a 1 1538561698944-0 > XADD mystream * b 2 1538561700640-0 > XADD mystream * c 3 1538561701744-0 > XDEL mystream 1538561700640-0 (integer) 1 127.0.0.1:6379> XRANGE mystream - + 1) 1) 1538561698944-02) 1) "a"2) "1" 2) 1) 1538561701744-02) 1) "c"2) "3"

3.2:XLEN

獲取消息的數(shù)量,語法格式xlen key,如下:

redis> XADD mystream * item 1 "1601372563177-0" redis> XADD mystream * item 2 "1601372563178-0" redis> XADD mystream * item 3 "1601372563178-1" redis> XLEN mystream (integer) 3 redis>

3.3:XRANGE

查詢指定范圍的消息,語法格式XRANGE key start end [COUNT count],解釋如下:

key :隊(duì)列名 start :開始值, - 表示最小值 end :結(jié)束值, + 表示最大值 count :數(shù)量

測試如下:

redis> XADD writers * name Virginia surname Woolf "1601372577811-0" redis> XADD writers * name Jane surname Austen "1601372577811-1" redis> XADD writers * name Toni surname Morrison "1601372577811-2" redis> XADD writers * name Agatha surname Christie "1601372577812-0" redis> XADD writers * name Ngozi surname Adichie "1601372577812-1" redis> XLEN writers (integer) 5 redis> XRANGE writers - + COUNT 2 1) 1) "1601372577811-0"2) 1) "name"2) "Virginia"3) "surname"4) "Woolf" 2) 1) "1601372577811-1"2) 1) "name"2) "Jane"3) "surname"4) "Austen" redis>

3.4:XREVRANGE

從后往前獲取消息,語法格式XREVRANGE key end start [COUNT count],解釋如下:

key :隊(duì)列名 end :結(jié)束值, + 表示最大值 start :開始值, - 表示最小值 count :數(shù)量

實(shí)例如下:

redis> XADD writers * name Virginia surname Woolf "1601372731458-0" redis> XADD writers * name Jane surname Austen "1601372731459-0" redis> XADD writers * name Toni surname Morrison "1601372731459-1" redis> XADD writers * name Agatha surname Christie "1601372731459-2" redis> XADD writers * name Ngozi surname Adichie "1601372731459-3" redis> XLEN writers (integer) 5 redis> XREVRANGE writers + - COUNT 1 1) 1) "1601372731459-3"2) 1) "name"2) "Ngozi"3) "surname"4) "Adichie"

3.5:XREAD

以阻塞或者是非阻塞的方式獲取消息,即消費(fèi)消息的命令,語法格式XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...],解釋如下:

count :數(shù)量 milliseconds :可選,阻塞毫秒數(shù),沒有設(shè)置就是非阻塞模式 key :隊(duì)列名 id :消息 ID

測試如下:

# 從 Stream 頭部讀取兩條消息 > XREAD COUNT 2 STREAMS mystream writers 0-0 0-0 1) 1) "mystream"2) 1) 1) 1526984818136-02) 1) "duration"2) "1532"3) "event-id"4) "5"5) "user-id"6) "7782813"2) 1) 1526999352406-02) 1) "duration"2) "812"3) "event-id"4) "9"5) "user-id"6) "388234" 2) 1) "writers"2) 1) 1) 1526985676425-02) 1) "name"2) "Virginia"3) "surname"4) "Woolf"2) 1) 1526985685298-02) 1) "name"2) "Jane"3) "surname"4) "Austen"

3.6:XGROUP CREATE

創(chuàng)建消費(fèi)者組,使用消費(fèi)者可以對(duì)消息進(jìn)行并發(fā)的消費(fèi),解決消費(fèi)者消費(fèi)能力不足的問題,語法格式為XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername],解釋如下:

key :隊(duì)列名稱,如果不存在就創(chuàng)建 groupname :組名。 $ : 表示從尾部開始消費(fèi),只接受新消息,當(dāng)前 Stream 消息會(huì)全部忽略。

如下從頭開始消費(fèi):

XGROUP CREATE mystream consumer-group-name 0-0

如下從尾部開始消費(fèi):

XGROUP CREATE mystream consumer-group-name $

在實(shí)際的場景中我們可以通過設(shè)置多個(gè)消費(fèi)者組的不同開始消費(fèi)的位置來實(shí)現(xiàn)并發(fā)消費(fèi)的效果,此時(shí)可能如下圖:

圖中主要元素解釋如下:

每個(gè) Stream 都有唯一的名稱,它就是 Redis 的 key,在我們首次使用 xadd 指令追加消息時(shí)自動(dòng)創(chuàng)建。 Consumer Group :消費(fèi)組,使用 XGROUP CREATE 命令創(chuàng)建,一個(gè)消費(fèi)組有多個(gè)消費(fèi)者(Consumer)。 last_delivered_id :游標(biāo),每個(gè)消費(fèi)組會(huì)有個(gè)游標(biāo) last_delivered_id,任意一個(gè)消費(fèi)者讀取了消息都會(huì)使游標(biāo) last_delivered_id 往前移動(dòng)。 pending_ids :消費(fèi)者(Consumer)的狀態(tài)變量,作用是維護(hù)消費(fèi)者的未確認(rèn)的 id。 pending_ids 記錄了當(dāng)前已經(jīng)被客戶端讀取的消息,但是還沒有 ack (Acknowledge character:確認(rèn)字符)。

3.7:XREADGROUP GROUP

讀取消費(fèi)者組中的消息,語法格式如下:

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]

解釋如下:

group :消費(fèi)組名 consumer :消費(fèi)者名。 count : 讀取數(shù)量。 milliseconds : 阻塞毫秒數(shù)。 key : 隊(duì)列名。 ID : 消息 ID。

如下測試:

XREADGROUP GROUP consumer-group-name consumer-name COUNT 1 STREAMS mystream >

符號(hào)>標(biāo)識(shí)從第一條尚未被消費(fèi)的消息開始消費(fèi)。

寫在后面

參考文章列表:

Redis Stream 。

總結(jié)

以上是生活随笔為你收集整理的redis之如何实现消息队列的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 久久依人 | 欧美综合视频在线 | av中文网| 蜜臀久久99精品久久久画质超高清 | 色婷婷亚洲一区二区三区 | 成人另类小说 | 精品乱子一区二区三区 | 长篇高h肉爽文丝袜 | 国产精品一二三区在线观看 | 影音先锋久久 | 欧美精品一级二级三级 | 成人在线视频免费播放 | 天堂а√在线中文在线鲁大师 | 久久久久久天堂 | 日日草日日干 | 亚洲国产综合av | 爱爱视频免费看 | 永久免费快色 | 亚洲午夜精品久久 | 男人资源网站 | 岛国一区二区 | 99国产热| 欧美亚洲在线观看 | 精品福利电影 | av片久久 | 中文字幕理论片 | 韩国av三级 | 奇米影视在线观看 | 久久久久久网址 | 中文字幕在线观看第二页 | 天堂影视av | 亚洲av无码久久精品狠狠爱浪潮 | 成人里番精品一区二区 | 日韩电影三级 | 国产aⅴ激情无码久久久无码 | 天天综合干 | 亚洲综合视频网 | 欧美自拍视频在线观看 | 国产91丝袜在线播放 | 日本熟妇浓毛 | 在线一二三区 | 一区二区三区波多野结衣 | 国产麻豆xxxvideo实拍 | 爱爱免费视频 | 91丨九色丨蝌蚪丨老版 | 亚洲一区二区三区四区五区午夜 | 欧美性色网站 | 老太太av| 国产成人久久精品 | www.久久色 | 欧美性做爰猛烈叫床潮 | 欧美一级特黄aa大片 | 视频在线a | 诱人的乳峰奶水hd | 日本公妇乱偷中文字幕 | 日韩激情 | 国产tv在线观看 | 水牛影视av一区二区免费 | 日少妇的逼 | 久久涩涩 | 伊人伊人伊人伊人 | 99久国产 | 日本在线一区二区 | 欧美人与野 | 国产aⅴ精品 | 香蕉成视频人app下载安装 | 天天拍夜夜拍 | 免费观看一区二区 | 97精品人人妻人人 | 亚洲人成色777777精品音频 | 精品国产一区二区在线观看 | 香蕉在线视频播放 | 老熟女高潮一区二区三区 | 你懂的网址在线 | 欧美大片一级 | 一区二区手机在线 | 免费观看a毛片 | 91激情 | 福利小视频在线播放 | 国产色区 | 美女色网站 | 国产一区视频在线播放 | 亚洲精品国产成人无码 | 亚洲理论片在线观看 | 香蕉一区二区三区四区 | 91视频在线观看免费 | 欧美人体一区二区三区 | 手机在线免费视频 | 日韩视频免费观看高清 | 国产精品成人va在线观看 | 日韩欧美在线中文字幕 | 欧美黄一级 | 日韩美女免费线视频 | 欧美精品免费一区二区三区 | 国产老头户外野战xxxxx | 国产精品国产精品国产专区不卡 | 日本电影大尺度免费观看 | 色图av| 人与嘼交av免费 |