生活随笔
收集整理的這篇文章主要介紹了
聊聊Redis消息队列-实现异步秒杀
小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
一、前言
消息隊(duì)列(Message Queue), 字面意思就是存放消息的隊(duì)列,最簡(jiǎn)單的消息隊(duì)列模型包括3個(gè)角色:
消息隊(duì)列:存儲(chǔ)和管理消息,也被稱為消息代理(Message Broker); 生產(chǎn)者:發(fā)送消息到消息隊(duì)列; 消費(fèi)者:從消息隊(duì)列獲取消息并處理消息。 Redis提供了三種不同的方式來(lái)實(shí)現(xiàn)消息隊(duì)列: list結(jié)構(gòu):基于List結(jié)構(gòu)模擬消息隊(duì)列; PubSub: 基本的點(diǎn)對(duì)點(diǎn)消息模型; Stream: 比較完善的消息隊(duì)列模型
二、基于List結(jié)構(gòu)模擬消息隊(duì)列
消息隊(duì)列(Message Queue),字面意思就是存放消息的隊(duì)列。而Redis的list數(shù)據(jù)結(jié)構(gòu)是一個(gè)雙向鏈表,很容易模擬出隊(duì)列效果。 隊(duì)列是入口和出口不在一邊,因此我們可以利用:LPUSH結(jié)合RPOP、或者RPUSH結(jié)合LPOP來(lái)實(shí)現(xiàn); 不過(guò)要注意的是,當(dāng)隊(duì)列中沒(méi)有消息時(shí)RPOP或LPOP操作會(huì)返回null,并不像JVM的阻塞隊(duì)列那樣會(huì)阻塞并等待消息。因此這里應(yīng)該適用BRPOP或者BLPOP來(lái)實(shí)現(xiàn)阻塞效果。
2.1 基于List的消息隊(duì)列有哪些優(yōu)缺點(diǎn)?
優(yōu)點(diǎn): 利用Redis存儲(chǔ),不受限于JVM內(nèi)存上限; 基于Redis的持久化機(jī)制,數(shù)據(jù)安全性有保證; 可以滿足消息有序性; 缺點(diǎn):
三、基于PubSub的消息隊(duì)列
PubSub(發(fā)布訂閱)是Redis2.0版本引入的消息傳遞模型。顧名思義,消費(fèi)者可以訂閱一個(gè)或多個(gè)channel,生產(chǎn)者向?qū)?yīng)channel發(fā)送消息后,所有訂閱者都能收到相關(guān)消息。
SUBSCCRIBE channel [channel]: 訂閱一個(gè)或多個(gè)頻道; PUBLISH channel msg: 向一個(gè)頻道發(fā)送消息; PSUBSCRIBE pattern[pattern]: 訂閱與pattern格式匹配的所有頻道
3.1 基于PubSub的消息隊(duì)列有哪些優(yōu)缺點(diǎn)?
優(yōu)點(diǎn): 采用發(fā)布訂閱模型,支持多生產(chǎn)、多消息; 缺點(diǎn): 不支持?jǐn)?shù)據(jù)持久化; 無(wú)法避免消息丟失; 消息堆積有上限,超出時(shí)數(shù)據(jù)丟失
四、基于Stream的消息隊(duì)列
Stream 是 Redis 5.0 引入的一種新數(shù)據(jù)類型,可以實(shí)現(xiàn)一個(gè)功能非常完善的消息隊(duì)列;
發(fā)送消息的命令: 讀取消息的方式之一:XREAD XREAD阻塞方式,讀取最新的消息: 在業(yè)務(wù)開(kāi)發(fā)中,我們可以循環(huán)的調(diào)用XREAD阻塞方式來(lái)查詢最新消息,從而實(shí)現(xiàn)持續(xù)監(jiān)聽(tīng)隊(duì)列的效果,偽代碼如下:
4.1 STREAM類型消息隊(duì)列的XREAD命令特點(diǎn):
消息可回溯; 一個(gè)消息可以被多個(gè)消費(fèi)者讀取; 可以阻塞讀取; 有消息漏讀的風(fēng)險(xiǎn)
五、基于Stream的消息隊(duì)列-消費(fèi)者組
消費(fèi)者組(Consumer Group):將多個(gè)消費(fèi)者劃分到一個(gè)組中,監(jiān)聽(tīng)同一個(gè)隊(duì)列。具備下列特點(diǎn):
消息分流:隊(duì)列中的消息會(huì)分流給組內(nèi)的不同消費(fèi)者,而不是重復(fù)的消費(fèi),從而加快消息處理的速度; 消息標(biāo)示:消費(fèi)者組會(huì)維護(hù)一個(gè)標(biāo)識(shí),記錄最后一個(gè)被處理的消息,哪怕消費(fèi)者宕機(jī)重啟,還會(huì)從標(biāo)識(shí)之后讀取消息。確保每一個(gè)消息都會(huì)被消費(fèi); 消息確認(rèn):消費(fèi)者獲取消費(fèi)后,消息處于pending狀態(tài),并存入一個(gè)pending-list。當(dāng)處理完成后需要通過(guò)XACK來(lái)確認(rèn)消息,標(biāo)記消息為已處理,才會(huì)從pending-list移除。
通俗的講,就是多個(gè)消費(fèi)者在一個(gè)隊(duì)列中處于競(jìng)爭(zhēng)關(guān)系,多個(gè)消費(fèi)者來(lái)處理隊(duì)列消息,加快消息處理的速度。而且消費(fèi)者組會(huì)給消息加上一個(gè)標(biāo)識(shí),記錄最新讀到的消息。如果中途消息處理完未提交,消息還會(huì)進(jìn)入pending狀態(tài)。進(jìn)入pending-list中,不會(huì)造成數(shù)據(jù)的丟失。
5.1 STREAM類型消息隊(duì)列的XREADGROUP命令特點(diǎn):
消息可回溯; 可以多消費(fèi)者爭(zhēng)搶消息,加快消費(fèi)速度; 可以阻塞讀取; 沒(méi)有消息漏讀的風(fēng)險(xiǎn); 有消息確認(rèn)機(jī)制,保證消息至少被消費(fèi)一次
總結(jié)
六、案例
基于Redis的Stream結(jié)構(gòu)作為消息隊(duì)列,實(shí)現(xiàn)異步秒殺下單
需求: ① 創(chuàng)建一個(gè)Stream類型的消息隊(duì)列,名為stream.orders; ② 修改之前的秒殺下單Lua腳本,在認(rèn)定有搶購(gòu)資格后,直接向stream.orders中添加消息,內(nèi)容包含voucherId、userId、orderId; ③ 項(xiàng)目啟動(dòng)時(shí),開(kāi)啟一個(gè)線程任務(wù),嘗試獲取stream.orders中的消息,完成下單。
local voucherId
= ARGV
[ 1 ]
local userId
= ARGV
[ 2 ]
local orderId
= ARGV
[ 3 ]
local stockKey
= "seckill:stock:" .. voucherId
local orderKey
= "seckill:order:" .. userId
local stock
= redis
. call ( 'get' , stockKey
)
local stockNumber
= tonumber ( stock
)
if ( stockNumber
<= 0 ) then return 1
end
if ( redis
. call ( 'sismember' , orderKey
, userId
) == 1 ) then return 2
end
redis
. call ( 'incrby' , stockKey
, - 1 )
redis
. call ( 'sadd' , orderKey
, userId
)
redis
. call ( 'xadd' , 'stream.orders' , '*' , 'userId' , userId
, 'voucherId' , voucherId
, 'id' , orderId
)
return 0
private IVoucherOrderService proxy
; private static final DefaultRedisScript < Long > SECKILL_SCRIPT ; static { SECKILL_SCRIPT = new DefaultRedisScript < > ( ) ; SECKILL_SCRIPT . setLocation ( new ClassPathResource ( "seckill.lua" ) ) ; SECKILL_SCRIPT . setResultType ( Long . class ) ; } private final BlockingQueue < VoucherOrder > orderTasks
= new ArrayBlockingQueue < > ( 1024 * 1024 ) ; private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors . newSingleThreadExecutor ( ) ; @PostConstruct private void init ( ) { SECKILL_ORDER_EXECUTOR . submit ( new VoucherOrderHandler ( ) ) ; } private class VoucherOrderHandler implements Runnable { String queueName
= "stream.orders" ; @Override public void run ( ) { while ( true ) { try { List < MapRecord < String , Object , Object > > list
= stringRedisTemplate
. opsForStream ( ) . read ( Consumer . from ( "g1" , "c1" ) , StreamReadOptions . empty ( ) . count ( 1 ) . block ( Duration . ofSeconds ( 2 ) ) , StreamOffset . create ( queueName
, ReadOffset . lastConsumed ( ) ) ) ; if ( CollectionUtils . isEmpty ( list
) ) { continue ; } MapRecord < String , Object , Object > record
= list
. get ( 0 ) ; Map < Object , Object > values
= record
. getValue ( ) ; VoucherOrder voucherOrder
= BeanUtil . fillBeanWithMap ( values
, new VoucherOrder ( ) , true ) ; handleVoucherOrder ( voucherOrder
) ; stringRedisTemplate
. opsForStream ( ) . acknowledge ( queueName
, "g1" , record
. getId ( ) ) ; } catch ( Exception e
) { log
. error ( "處理訂單異常" , e
) ; handlePendingList ( ) ; } } } private void handlePendingList ( ) { while ( true ) { try { List < MapRecord < String , Object , Object > > list
= stringRedisTemplate
. opsForStream ( ) . read ( Consumer . from ( "g1" , "c1" ) , StreamReadOptions . empty ( ) . count ( 1 ) , StreamOffset . create ( queueName
, ReadOffset . from ( "0" ) ) ) ; if ( CollectionUtils . isEmpty ( list
) ) { break ; } MapRecord < String , Object , Object > record
= list
. get ( 0 ) ; Map < Object , Object > values
= record
. getValue ( ) ; VoucherOrder voucherOrder
= BeanUtil . fillBeanWithMap ( values
, new VoucherOrder ( ) , true ) ; handleVoucherOrder ( voucherOrder
) ; stringRedisTemplate
. opsForStream ( ) . acknowledge ( queueName
, "g1" , record
. getId ( ) ) ; } catch ( Exception e
) { log
. error ( "處理訂單異常" , e
) ; try { TimeUnit . MILLISECONDS . sleep ( 20 ) ; } catch ( InterruptedException ex
) { ex
. printStackTrace ( ) ; } } } } } private void handleVoucherOrder ( VoucherOrder voucherOrder
) { Long userId
= voucherOrder
. getUserId ( ) ; RLock lock
= redissonClient
. getLock ( "lock:order:" + userId
) ; boolean isLock
= lock
. tryLock ( ) ; if ( ! isLock
) { log
. error ( "不允許重復(fù)下單" ) ; return ; } try { proxy
. createVoucherOrder ( voucherOrder
) ; } finally { lock
. unlock ( ) ; } } @Override public Result seckillVoucher ( Long voucherId
) { Long userId
= UserHolder . getUser ( ) . getId ( ) ; Long orderId
= redisIdWorker
. nextId ( "orderId" ) ; Long result
= stringRedisTemplate
. execute ( SECKILL_SCRIPT , Collections . emptyList ( ) , voucherId
. toString ( ) , userId
. toString ( ) , String . valueOf ( orderId
) ) ; int r
= Objects . requireNonNull ( result
) . intValue ( ) ; if ( r
!= 0 ) { return Result . fail ( r
== 1 ? "庫(kù)存不足" : "不能重復(fù)下單" ) ; } VoucherOrder voucherOrder
= VoucherOrder . builder ( ) . id ( orderId
) . userId ( userId
) . voucherId ( voucherId
) . build ( ) ; orderTasks
. add ( voucherOrder
) ; proxy
= ( IVoucherOrderService ) AopContext . currentProxy ( ) ; return Result . ok ( orderId
) ; }
總結(jié)
以上是生活随笔 為你收集整理的聊聊Redis消息队列-实现异步秒杀 的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
如果覺(jué)得生活随笔 網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔 推薦給好友。