日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 运维知识 > 数据库 >内容正文

数据库

聊聊Redis消息队列-实现异步秒杀

發(fā)布時(shí)間:2023/12/29 数据库 33 豆豆
生活随笔 收集整理的這篇文章主要介紹了 聊聊Redis消息队列-实现异步秒杀 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

一、前言

消息隊(duì)列(Message Queue), 字面意思就是存放消息的隊(duì)列,最簡(jiǎn)單的消息隊(duì)列模型包括3個(gè)角色:

  • 消息隊(duì)列:存儲(chǔ)和管理消息,也被稱為消息代理(Message Broker);
  • 生產(chǎn)者:發(fā)送消息到消息隊(duì)列;
  • 消費(fèi)者:從消息隊(duì)列獲取消息并處理消息。

    Redis提供了三種不同的方式來(lái)實(shí)現(xiàn)消息隊(duì)列:
  • list結(jié)構(gòu):基于List結(jié)構(gòu)模擬消息隊(duì)列;
  • PubSub: 基本的點(diǎn)對(duì)點(diǎn)消息模型;
  • Stream: 比較完善的消息隊(duì)列模型

二、基于List結(jié)構(gòu)模擬消息隊(duì)列

消息隊(duì)列(Message Queue),字面意思就是存放消息的隊(duì)列。而Redis的list數(shù)據(jù)結(jié)構(gòu)是一個(gè)雙向鏈表,很容易模擬出隊(duì)列效果。
隊(duì)列是入口和出口不在一邊,因此我們可以利用:LPUSH結(jié)合RPOP、或者RPUSH結(jié)合LPOP來(lái)實(shí)現(xiàn);
不過(guò)要注意的是,當(dāng)隊(duì)列中沒(méi)有消息時(shí)RPOP或LPOP操作會(huì)返回null,并不像JVM的阻塞隊(duì)列那樣會(huì)阻塞并等待消息。因此這里應(yīng)該適用BRPOP或者BLPOP來(lái)實(shí)現(xiàn)阻塞效果。

2.1 基于List的消息隊(duì)列有哪些優(yōu)缺點(diǎn)?

  • 優(yōu)點(diǎn):
    • 利用Redis存儲(chǔ),不受限于JVM內(nèi)存上限;
    • 基于Redis的持久化機(jī)制,數(shù)據(jù)安全性有保證;
    • 可以滿足消息有序性;
  • 缺點(diǎn):
    • 無(wú)法避免消息丟失;
    • 只支持消費(fèi)者

三、基于PubSub的消息隊(duì)列

PubSub(發(fā)布訂閱)是Redis2.0版本引入的消息傳遞模型。顧名思義,消費(fèi)者可以訂閱一個(gè)或多個(gè)channel,生產(chǎn)者向?qū)?yīng)channel發(fā)送消息后,所有訂閱者都能收到相關(guān)消息。

  • SUBSCCRIBE channel [channel]: 訂閱一個(gè)或多個(gè)頻道;
  • PUBLISH channel msg: 向一個(gè)頻道發(fā)送消息;
  • PSUBSCRIBE pattern[pattern]: 訂閱與pattern格式匹配的所有頻道

3.1 基于PubSub的消息隊(duì)列有哪些優(yōu)缺點(diǎn)?

  • 優(yōu)點(diǎn):
    • 采用發(fā)布訂閱模型,支持多生產(chǎn)、多消息;
  • 缺點(diǎn):
    • 不支持?jǐn)?shù)據(jù)持久化;
    • 無(wú)法避免消息丟失;
    • 消息堆積有上限,超出時(shí)數(shù)據(jù)丟失

四、基于Stream的消息隊(duì)列

Stream 是 Redis 5.0 引入的一種新數(shù)據(jù)類型,可以實(shí)現(xiàn)一個(gè)功能非常完善的消息隊(duì)列;

  • 發(fā)送消息的命令:
  • 讀取消息的方式之一:XREAD
    • 例如:適用XREAD讀取第一個(gè)消息:
  • XREAD阻塞方式,讀取最新的消息:

    在業(yè)務(wù)開(kāi)發(fā)中,我們可以循環(huán)的調(diào)用XREAD阻塞方式來(lái)查詢最新消息,從而實(shí)現(xiàn)持續(xù)監(jiān)聽(tīng)隊(duì)列的效果,偽代碼如下:

4.1 STREAM類型消息隊(duì)列的XREAD命令特點(diǎn):

  • 消息可回溯;
  • 一個(gè)消息可以被多個(gè)消費(fèi)者讀取;
  • 可以阻塞讀取;
  • 有消息漏讀的風(fēng)險(xiǎn)

五、基于Stream的消息隊(duì)列-消費(fèi)者組

消費(fèi)者組(Consumer Group):將多個(gè)消費(fèi)者劃分到一個(gè)組中,監(jiān)聽(tīng)同一個(gè)隊(duì)列。具備下列特點(diǎn):

  • 消息分流:隊(duì)列中的消息會(huì)分流給組內(nèi)的不同消費(fèi)者,而不是重復(fù)的消費(fèi),從而加快消息處理的速度;
  • 消息標(biāo)示:消費(fèi)者組會(huì)維護(hù)一個(gè)標(biāo)識(shí),記錄最后一個(gè)被處理的消息,哪怕消費(fèi)者宕機(jī)重啟,還會(huì)從標(biāo)識(shí)之后讀取消息。確保每一個(gè)消息都會(huì)被消費(fèi);
  • 消息確認(rèn):消費(fèi)者獲取消費(fèi)后,消息處于pending狀態(tài),并存入一個(gè)pending-list。當(dāng)處理完成后需要通過(guò)XACK來(lái)確認(rèn)消息,標(biāo)記消息為已處理,才會(huì)從pending-list移除。
  • 通俗的講,就是多個(gè)消費(fèi)者在一個(gè)隊(duì)列中處于競(jìng)爭(zhēng)關(guān)系,多個(gè)消費(fèi)者來(lái)處理隊(duì)列消息,加快消息處理的速度。而且消費(fèi)者組會(huì)給消息加上一個(gè)標(biāo)識(shí),記錄最新讀到的消息。如果中途消息處理完未提交,消息還會(huì)進(jìn)入pending狀態(tài)。進(jìn)入pending-list中,不會(huì)造成數(shù)據(jù)的丟失。

    5.1 STREAM類型消息隊(duì)列的XREADGROUP命令特點(diǎn):

    • 消息可回溯;
    • 可以多消費(fèi)者爭(zhēng)搶消息,加快消費(fèi)速度;
    • 可以阻塞讀取;
    • 沒(méi)有消息漏讀的風(fēng)險(xiǎn);
    • 有消息確認(rèn)機(jī)制,保證消息至少被消費(fèi)一次

    總結(jié)

    六、案例

    基于Redis的Stream結(jié)構(gòu)作為消息隊(duì)列,實(shí)現(xiàn)異步秒殺下單

    需求:
    ① 創(chuàng)建一個(gè)Stream類型的消息隊(duì)列,名為stream.orders;
    ② 修改之前的秒殺下單Lua腳本,在認(rèn)定有搶購(gòu)資格后,直接向stream.orders中添加消息,內(nèi)容包含voucherId、userId、orderId;
    ③ 項(xiàng)目啟動(dòng)時(shí),開(kāi)啟一個(gè)線程任務(wù),嘗試獲取stream.orders中的消息,完成下單。

    • lua腳本
    local voucherId = ARGV[1] --1.2用戶id local userId = ARGV[2] --1.3訂單id local orderId = ARGV[3] --1.4--2.數(shù)據(jù)key --2.1庫(kù)存key local stockKey = "seckill:stock:" .. voucherId --2.2訂單key local orderKey = "seckill:order:" .. userId--3.腳本業(yè)務(wù) local stock = redis.call('get', stockKey) local stockNumber = tonumber(stock) --3.1判斷庫(kù)存是否充足 get stock if (stockNumber <= 0) then--庫(kù)存不足返回1return 1 end --3.2判斷用戶是否下單 if (redis.call('sismember', orderKey, userId) == 1) then--存在,返回2return 2 end --3.3扣庫(kù)存incrby stockKey -1 redis.call('incrby', stockKey, -1) --3.4下單,保存用戶信息 sadd orderKey userId redis.call('sadd', orderKey, userId) --3.5 發(fā)送消息到redis stream隊(duì)列,xadd stream.orders * k1 v1 k2 v2...... redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId) return 0
    • 主要業(yè)務(wù)代碼
    private IVoucherOrderService proxy;private static final DefaultRedisScript<Long> SECKILL_SCRIPT;static {SECKILL_SCRIPT = new DefaultRedisScript<>();SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));SECKILL_SCRIPT.setResultType(Long.class);}private final BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();@PostConstructprivate void init() {SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());}private class VoucherOrderHandler implements Runnable {String queueName = "stream.orders";@Overridepublic void run() {while (true) {try {// 1.獲取消息隊(duì)列中的訂單信息 XREADGROUP GROUP q1 c1 COUNT 1 BLOCK 2000 STREAMS streams.orderList<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create(queueName, ReadOffset.lastConsumed()));// 2.判斷消息獲取是否成功if (CollectionUtils.isEmpty(list)) {// 如果獲取失敗,說(shuō)明沒(méi)有消息,繼續(xù)下一次循環(huán)continue;}// 3. 解析消息中的訂單信息MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> values = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);// 3.如果獲取成功,可以下單handleVoucherOrder(voucherOrder);// 4.ACK確認(rèn) SACK streams.orders g1 idstringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());} catch (Exception e) {log.error("處理訂單異常", e);handlePendingList();}}}private void handlePendingList() {while (true) {try {// 1.獲取pending-list中的訂單信息 XREADGROUP GROUP q1 c1 COUNT 1 STREAMS streams.order 0List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1),StreamOffset.create(queueName, ReadOffset.from("0")));// 2.判斷消息獲取是否成功if (CollectionUtils.isEmpty(list)) {// 如果獲取失敗,說(shuō)明pending-list沒(méi)有異常消息,結(jié)束循環(huán)break;}// 3. 解析消息中的訂單信息MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> values = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);// 3.如果獲取成功,可以下單handleVoucherOrder(voucherOrder);// 4.ACK確認(rèn) SACK streams.orders g1 idstringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());} catch (Exception e) {log.error("處理訂單異常", e);try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException ex) {ex.printStackTrace();}}}}}private void handleVoucherOrder(VoucherOrder voucherOrder) {//1.1獲取用戶idLong userId = voucherOrder.getUserId();//自己定義的SimpleRedisLock鎖類//SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);//使用的redisson獲取的鎖類//1.2創(chuàng)建鎖對(duì)象RLock lock = redissonClient.getLock("lock:order:" + userId);//1.3獲取鎖boolean isLock = lock.tryLock();//1.4判斷獲取鎖是否成功if (!isLock) {//獲取鎖失敗,返回錯(cuò)誤log.error("不允許重復(fù)下單");return;}try {// 獲取代理對(duì)象(事務(wù))proxy.createVoucherOrder(voucherOrder);} finally {//3.釋放鎖lock.unlock();}}@Overridepublic Result seckillVoucher(Long voucherId) {// 獲取用戶Long userId = UserHolder.getUser().getId();// 獲取訂單idLong orderId = redisIdWorker.nextId("orderId");// 1.執(zhí)行l(wèi)ua腳本Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(),userId.toString(),String.valueOf(orderId));// 2. 判斷結(jié)果是否為0int r = Objects.requireNonNull(result).intValue();if (r != 0) {// 2.1 不為0,代表沒(méi)有購(gòu)買資格return Result.fail(r == 1 ? "庫(kù)存不足" : "不能重復(fù)下單");}// 2.2 為0,有購(gòu)買資格,把下單信息保存到阻塞隊(duì)列// TODO 保存阻塞隊(duì)列VoucherOrder voucherOrder = VoucherOrder.builder().id(orderId).userId(userId).voucherId(voucherId).build();// 放入阻塞隊(duì)列orderTasks.add(voucherOrder);// 3. 獲取代理對(duì)象proxy = (IVoucherOrderService) AopContext.currentProxy();// 4. 返回訂單idreturn Result.ok(orderId);}

    總結(jié)

    以上是生活随笔為你收集整理的聊聊Redis消息队列-实现异步秒杀的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

    如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。