日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

Redis消息队列 | 黑马点评

發布時間:2023/12/29 数据库 35 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Redis消息队列 | 黑马点评 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

目錄

一、認識消息隊列

二、List模擬消息隊列

三、PubSub的消息隊列

四、Stream的消息隊列(重點)

????????1、單消費模式

????????2、消費者組

五、redis三種消息隊列對比?

?六、優化秒殺實戰

1、創建消息隊列

2、修改下單腳本

?3、接收消息處理


一、認識消息隊列

消息隊列,字面意思就存放消息的隊列。最簡單的消息隊列模型包括3個角色:

消息隊列:存儲和管理消息,也被稱為消息代理

生產者:發送消息到消息隊列

消費者:從消息隊列獲取消息并處理消息

解決了jvm堵塞隊列內存不足的問題,而且消息隊列是可以持久化的,宕機了依然能夠保存。

redis提供三種不同方式實現消息隊列:

  • list結構:基于list結構模擬消息隊列
  • PubSub:基于的點對點消息隊列
  • Stream:比較完善的消息隊列模型(推薦)

二、List模擬消息隊列

redis的list結構是一個雙向鏈表,很容易模擬出隊列效果

隊列是入口和出口不在一邊,因此可以用LPUSH結合RPOP、或者RPUSH結合LPOP實現

但是,當隊列沒有消息時pop就會返回null,并不會jvm堵塞隊列那樣堵塞并等待消息,因此這里應該使用BRPOP或者BLPOP來實現堵塞隊列。

缺點:

無法避免消息丟失。從消息隊列取到消息,還沒來得及處理就掛掉了,這個消息就消失了。

只支持單消費者。一個人拿走就從隊列里面彈出了。

三、PubSub的消息隊列

PubSub(發布訂閱)是redis2.0版本引入的消息傳遞模型,消費者可以訂閱一個或多個channel(頻道),生產者向對應channel發送消息后,所有訂閱者都能收到相關消息。

支持多生產、多消費

缺點:

不支持數據持久化(剛剛的list本質是做存儲的我們拿來當隊列所以可以持久化)

無法避免消息丟失。

消息堆積有上限,超出時數據丟失。(緩存空間是有上限的)

四、Stream的消息隊列(重點)

Stream是redis5.0引入的一種新數據類型,可以實現一個功能非常完善的消息隊列。

1、單消費模式

特點:

  • 消息可回溯。不消失永久保存在隊列里。
  • 一個消息可以被多個消費者讀取。讀完不消失的,可以多個讀
  • 可以堵塞讀取
  • 有消息漏讀的風險

2、消費者組

消費者組(Consumer Group):將多個消費者劃分到一個組,監聽同一個隊列。

?消費者監聽消息的基本思路

stream類型消息隊列的消費者組特點:

  • 消息可回溯
  • 可以多消費者爭搶消息,加快消費速度
  • 可以阻塞讀取
  • 沒有消息漏鍍的風險
  • 有消息確認機制,保證消息至少被消費一次

五、redis三種消息隊列對比?

?六、優化秒殺實戰

1、創建消息隊列

創建一個stream類型的消息隊列,名為stream.orders

2、修改下單腳本

修改之前秒殺下單lua腳本,認定有搶購資格后,直接向steam.orders中添加消息,內容包含voucher、userId、orderId

-- 優惠券id local voucherId = ARGV[1] -- 用戶id local userId = ARGV[2] -- 訂單id local orderId = ARGV[3]-- 庫存key local stockKey = "seckill:stock:"..voucherId -- 訂單key local orderKey = "seckill:order:"..voucherId-- 判斷庫存是否充足 if(tonumber(redis.call('get', stockKey)) <= 0) thenreturn 1 end-- 判斷用戶是否已經下過單 if(redis.call('sismember', orderKey, userId) == 1) thenreturn 2 end-- 扣減庫存 redis.call('incrby', stockKey, -1)-- 將 userId 存入當前優惠券的 set 集合 redis.call('sadd', orderKey, userId)-- 將訂單信息存入到消息隊列中 xadd stream.orders * k1 v1 k2 v2 redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId) return 0

?3、接收消息處理

項目啟動時,開啟一個線程任務,嘗試獲取stream.orders中的消息,完成下單

/**** 創建線程池*/private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();/**** 容器啟動時,便開始創建獨立線程,從隊列中讀取數據,創建訂單*/@PostConstructprivate void init(){SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());}private class VoucherOrderHandler implements Runnable {@Overridepublic void run() {while(true){try {// 獲取消息隊列中的訂單信息 xreadgroup group g1 c1 count 1 block 2000 streams s1 0List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2000)),StreamOffset.create("stream.orders", ReadOffset.lastConsumed()));// 判斷訂單信息是否為空if(list == null || list.isEmpty()){// 如果為 null,說明沒有消息,繼續下一次循環continue;}// 解析消息MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> value = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);// 創建訂單createVoucherOrder(voucherOrder);// 確認消息 xack s1 g1 idstringRedisTemplate.opsForStream().acknowledge("stream.orders", "g1", record.getId());} catch (Exception e) {log.error("處理訂單異常!", e);handlePendingList();}}}private void handlePendingList() {while(true){try {// 獲取 pending-list 中的訂單信息 xreadgroup group g1 c1 count 1 block 2000 streams s1 0List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create("stream.orders", ReadOffset.lastConsumed()));// 判斷訂單信息是否為空if(list == null || list.isEmpty()){break;}// 解析消息MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> value = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);// 創建訂單createVoucherOrder(voucherOrder);// 確認消息 xack s1 g1 idstringRedisTemplate.opsForStream().acknowledge("stream.orders", "g1", record.getId());} catch (Exception e) {log.error("處理訂單異常!", e);try {Thread.sleep(100);} catch (InterruptedException interruptedException) {interruptedException.printStackTrace();}}}}}

總結

以上是生活随笔為你收集整理的Redis消息队列 | 黑马点评的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。