优惠券秒杀的优化
參考之前的文章優惠券秒殺功能,我們完成了基于單體或者集群項目的秒殺業務。
Redis解決優惠券秒殺_兜兜轉轉m的博客-CSDN博客
黑馬點評項目Redis實現分布式鎖_兜兜轉轉m的博客-CSDN博客
但分析時其吞吐量并不是很高,延遲也有點高。
我們來回顧一下下單流程
當用戶發起請求,此時會請求nginx,nginx會訪問到tomcat,而tomcat中的程序,會進行串行操作,分成如下幾個步驟
- 查詢優惠卷
- 判斷秒殺庫存是否足夠
- 查詢訂單
- 校驗是否是一人一單
- 扣減庫存
- 創建訂單
【結構圖】
?
其中扣減庫存和創建訂單兩個業務是比較耗費時間的,我們之前是在主線程中進行操作的,因此系統的延遲會很高,可以通過消息隊列的方式進行異步處理來提高系統的并發能力。
其次,查詢優惠券,判斷秒殺庫存是否足夠和校驗一人一單的業務也是查詢數據庫來進行實現的,我們能否將這些業務在Redis中進行實現?
基于上述分析,
- 校驗一人一單業務在Redis中進行處理,
- 扣減庫存和創建訂單業務通過消息隊列進行異步處理
【結構圖】
【業務邏輯圖】
- 新增秒殺優惠券的同時,將優惠券信息保存到Redis中
- 基于Lua腳本,判斷秒殺庫存、一人一單,決定用戶是否搶購成功
- 如果搶購成功,將優惠券id和用戶id封裝后存入阻塞隊列
- 開啟線程任務,不斷從阻塞隊列中獲取信息,實現異步下單功能
?VoucherServiceImpl:
在最后增加【新增秒殺優惠券的同時,將優惠券信息保存到Redis中業務】。
@Override @Transactional public void addSeckillVoucher(Voucher voucher) {// 保存優惠券save(voucher);// 保存秒殺信息SeckillVoucher seckillVoucher = new SeckillVoucher();seckillVoucher.setVoucherId(voucher.getId());seckillVoucher.setStock(voucher.getStock());seckillVoucher.setBeginTime(voucher.getBeginTime());seckillVoucher.setEndTime(voucher.getEndTime());seckillVoucherService.save(seckillVoucher);// 保存秒殺庫存到Redis中//SECKILL_STOCK_KEY 這個變量定義在RedisConstans中//private static final String SECKILL_STOCK_KEY ="seckill:stock:"stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString()); }- 基于Lua腳本,判斷秒殺庫存、一人一單,決定用戶是否搶購成功
當以上lua表達式執行完畢后,剩下的就是根據步驟3,4來執行我們接下來的任務了
VoucherOrderServiceImpl
@Override public Result seckillVoucher(Long voucherId) {//獲取用戶Long userId = UserHolder.getUser().getId();long orderId = redisIdWorker.nextId("order");// 1.執行lua腳本Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(), userId.toString(), String.valueOf(orderId));int r = result.intValue();// 2.判斷結果是否為0if (r != 0) {// 2.1.不為0 ,代表沒有購買資格return Result.fail(r == 1 ? "庫存不足" : "不能重復下單");}// 3.返回訂單idreturn Result.ok(orderId); }采用Redis中Stream來作為消息隊列。
Redis消息隊列-基于Stream的消息隊列-消費者組
消費者組(Consumer Group):將多個消費者劃分到一個組中,監聽同一個隊列。具備下列特點:
?
創建消費者組:
key:隊列名稱 groupName:消費者組名稱 ID:起始ID標示,$代表隊列中最后一個消息,0則代表隊列中第一個消息 MKSTREAM:隊列不存在時自動創建隊列 其它常見命令:
?
刪除指定的消費者組
XGROUP DESTORY key groupName給指定的消費者組添加消費者
XGROUP CREATECONSUMER key groupname consumername刪除消費者組中的指定消費者
XGROUP DELCONSUMER key groupname consumername從消費者組讀取消息:
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]-
group:消費組名稱
-
consumer:消費者名稱,如果消費者不存在,會自動創建一個消費者
-
count:本次查詢的最大數量
-
BLOCK milliseconds:當沒有消息時最長等待時間
-
NOACK:無需手動ACK,獲取到消息后自動確認
-
STREAMS key:指定隊列名稱
-
ID:獲取消息的起始ID:
">":從下一個未消費的消息開始 其它:根據指定id從pending-list中獲取已消費但未確認的消息,例如0,是從pending-list中的第一個消息開始
消費者監聽消息的基本思路:
?
STREAM類型消息隊列的XREADGROUP命令特點:
-
消息可回溯
-
可以多消費者爭搶消息,加快消費速度
-
可以阻塞讀取
-
沒有消息漏讀的風險
-
有消息確認機制,保證消息至少被消費一次
?基于Redis的Stream結構作為消息隊列,實現異步秒殺下單
需求:
- 創建一個Stream類型的消息隊列,名為stream.orders
- 修改之前的秒殺下單Lua腳本,在認定有搶購資格后,直接向stream.orders中添加消息,內容包含voucherId、userId、orderId
- 項目啟動時,開啟一個線程任務,嘗試獲取stream.orders中的消息,完成下單
修改lua表達式,新增3.6?
?Java實現如下:
VoucherOrderServiceImpl
private class VoucherOrderHandler implements Runnable {@Overridepublic void run() {while (true) {try {// 1.獲取消息隊列中的訂單信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create("stream.orders", ReadOffset.lastConsumed()));// 2.判斷訂單信息是否為空if (list == null || list.isEmpty()) {// 如果為null,說明沒有消息,繼續下一次循環continue;}// 解析數據MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> value = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);// 3.創建訂單createVoucherOrder(voucherOrder);// 4.確認消息 XACKstringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());} catch (Exception e) {log.error("處理訂單異常", e);//處理異常消息handlePendingList();}}}private void handlePendingList() {while (true) {try {// 1.獲取pending-list中的訂單信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 0List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1),StreamOffset.create("stream.orders", ReadOffset.from("0")));// 2.判斷訂單信息是否為空if (list == null || list.isEmpty()) {// 如果為null,說明沒有異常消息,結束循環break;}// 解析數據MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> value = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);// 3.創建訂單createVoucherOrder(voucherOrder);// 4.確認消息 XACKstringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());} catch (Exception e) {log.error("處理pendding訂單異常", e);try{Thread.sleep(20);}catch(Exception e){e.printStackTrace();}}}} }總結