使用 Redis Stream 实现消息队列
使用 Redis Stream 實(shí)現(xiàn)消息隊(duì)列
Intro
Redis 5.0 中增加了 Stream 的支持,利用 Stream 我們可以實(shí)現(xiàn)可靠的消息隊(duì)列,并且支持一個(gè)消息被多個(gè)消費(fèi)者所消費(fèi),可以很好的實(shí)現(xiàn)消息隊(duì)列
Simple Usage
首先我們來看一個(gè)簡單版本的 Stream 使用,我們在代碼里使用一個(gè)發(fā)布者,一個(gè)消費(fèi)者來模擬一個(gè)簡單的消息隊(duì)列的場景
來看下面的測試代碼:
private?const?string?StreamKey?=?"test-simple-stream";public?static?async?Task?MainTest() {await?RedisHelper.GetDatabase().KeyDeleteAsync(StreamKey);//?register?background?consumer_?=?Task.Factory.StartNew(Consume).ConfigureAwait(false);//await?Publish(); }private?static?async?Task?Publish() {Console.WriteLine("Press?Enter?to?publish?messages,?Press?Q?to?exit");var?input?=?Console.ReadLine();while?(input?is?not?"q"?and?not?"Q"){var?redis?=?RedisHelper.GetDatabase();for?(var?i?=?0;?i?<?10;?i++){await?redis.StreamAddAsync(StreamKey,?"message",?$"test_message_{i}");}input?=?Console.ReadLine();} }private?static?async?Task?Consume() {var?lastMsgId?=?"0-0";while?(true){await?InvokeHelper.TryInvokeAsync(async?()?=>{var?redis?=?RedisHelper.GetDatabase();var?entries?=?await?redis.StreamReadAsync(StreamKey,?lastMsgId,?2);if?(entries.Length?==?0){return;}foreach?(var?entry?in?entries){Console.WriteLine(entry.Id);entry.Values.Dump();//?delete?message?if?you?want//?redis.StreamDelete(StreamKey,?new[]?{?entry.Id?});}lastMsgId?=?entries[^1].Id;});await?Task.Delay(200);} }上面的代碼會(huì)使用一個(gè)后臺(tái)線程來運(yùn)行一個(gè) Consumer 來從 Stream 中讀取消息,有兩種消費(fèi)消息的模式,一種是自己維護(hù)一個(gè)處理的消息 offset,每次從這個(gè) offset 之后讀取新消息,另外一種模式不需要維護(hù)本地的 offset,可以在處理完消息之后直接刪掉消息,默認(rèn)消息是不會(huì)刪消息的,所以如果不刪消息的話需要維護(hù)
Publisher 每次會(huì)發(fā)布 10 條消息,Consumer 每次會(huì)讀取兩條消息,處理之后會(huì)等待 200 ms,之后再查詢消息
來看一下運(yùn)行效果吧:
Consumer Group
上面的示例會(huì)相對來說比較簡單,只有一個(gè) Consumer,但是在比較常用的場景下往往會(huì)有多個(gè)消費(fèi)者處理,
比如說用戶注冊成功之后,發(fā)布一條消息可能會(huì)有多個(gè) Consumer 同時(shí)給用戶發(fā)郵件或短信以及給用戶加積分等操作,這種場景下使用上面的模式就不合適了,Redis Stream 中增加了 Consumer Group 的概念(有的人甚至稱 Redis 內(nèi)置了一個(gè) Kafka),在創(chuàng)建了 Consumer Group 之后,向 Stream 發(fā)布消息的時(shí)候會(huì)廣播到各個(gè) Consumer Group 中,每個(gè) Consumer Group 的消息消費(fèi)是獨(dú)立的,不同的 Consumer Group 的消費(fèi)速度可以不一致,一個(gè) Consumer Group 也可以有多個(gè) Consumer 同時(shí)運(yùn)行,同一個(gè) Group 內(nèi)的多個(gè) Consumer 是會(huì)共享一個(gè) Consumer Group 的消息消費(fèi),而且我們可以手動(dòng)進(jìn)行消息的 ACK
來看下面的示例代碼吧:
private?const?string?StreamKey?=?"test-stream-group"; private?static?int?_consumerCount;public?static?async?Task?MainTest() {await?RedisHelper.GetDatabase().KeyDeleteAsync(StreamKey);//?register?background?consumer_?=?await?Task.Factory.StartNew(Consume).ConfigureAwait(false);_?=?await?Task.Factory.StartNew(Consume).ConfigureAwait(false);//await?Publish(); }private?static?async?Task?Publish() {Console.WriteLine("Press?Enter?to?publish?messages,?Press?Q?to?exit");var?input?=?Console.ReadLine();while?(input?is?not?"q"?and?not?"Q"){var?redis?=?RedisHelper.GetDatabase();for?(var?i?=?0;?i?<?10;?i++){await?redis.StreamAddAsync(StreamKey,?"message",?$"test_message_{i}");}input?=?Console.ReadLine();} }private?static?async?Task?Consume() {Interlocked.Increment(ref?_consumerCount);var?groupName?=?$"group-{_consumerCount}";var?consumerName?=?$"consumer-{_consumerCount}";var?redis?=?RedisHelper.GetDatabase();redis.StreamCreateConsumerGroup(StreamKey,?groupName);while?(true){await?InvokeHelper.TryInvokeAsync(async?()?=>{var?messages?=?await?redis.StreamReadGroupAsync(StreamKey,?groupName,?consumerName,?count:?SecurityHelper.Random.Next(1,?4));if?(messages.Length?==?0){return;}foreach?(var?message?in?messages){Console.WriteLine($"{groupName}-{message.Id}-{message.Values.ToJson()}");await?redis.StreamAcknowledgeAsync(StreamKey,?groupName,?message.Id);}});await?Task.Delay(200);} }上面的示例代碼會(huì)先注冊兩個(gè) Consumer Group,兩個(gè) Consumer Group 內(nèi)各有一個(gè) consumer,你也可以使用多個(gè) consumer,為了體現(xiàn)各個(gè) Consumer Group 是獨(dú)立的,每次獲取消息的 Count 是會(huì)隨機(jī)指定的,在讀取的消息之后會(huì)輸出消息內(nèi)容來代替處理消息的邏輯,處理完成之后進(jìn)行消息的 ACK,消息的發(fā)布邏輯和上面的示例是類似的
上述代碼執(zhí)行輸出示例:
可以看到我們發(fā)布的消息,每一個(gè) consumer group 都會(huì)處理消息,而且處理消息的速度是獨(dú)立的,互不影響
通過 XINFO 命令我們可以對 Stream 做一些監(jiān)控
More
利用 Redis 的 Stream 我們可以實(shí)現(xiàn)可靠的一個(gè)消息機(jī)制,stream 的每一條消息都會(huì)有一個(gè)消息 Id,默認(rèn)是兩個(gè)部分,一個(gè)部分是時(shí)間戳,另一個(gè)部分是一個(gè)序列號,消息 Id 可以自定義,但是通常情況下推薦用默認(rèn)的 id
Redis 中的 List、HashSet、Set、ZSet 這些數(shù)據(jù)類型中沒有元素的時(shí)候會(huì)把對應(yīng)的 Key 也會(huì)刪掉,但是 Stream 是不會(huì)的,Stream 允許沒有消息的時(shí)候依然存在
Redis Stream 使用的時(shí)候需要注意我們是可以指定 Stream 的消息長度的,如果我們指定了最大消息長度 10000,超出 10000 的時(shí)候舊消息就會(huì)被擠出隊(duì)列,可能會(huì)出現(xiàn)消息的丟失,需要對 Stream 做必要的監(jiān)控和報(bào)警
References
https://redis.io/topics/streams-intro
https://redis.io/commands
https://github.com/WeihanLi/SamplesInPractice/tree/master/RedisSample
總結(jié)
以上是生活随笔為你收集整理的使用 Redis Stream 实现消息队列的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: github star破13k,Dapr
- 下一篇: MySql 数据库基本设计规范