mysql和redis库存扣减和优化
前言
大流量情況下的庫存是老生常談的問題了,在這里我整理一下mysql和redis應對扣除庫存的方案,采用jmeter進行壓測。
JMETER設置
庫存初始值50,線程數量1000個,1秒以內啟動全部,一個線程循環2次,共2000個請求
MySQL方案
初始方案
<update id="decreaseStock">
UPDATE stock
SET stock_num = stock_num - 1
WHERE id = #{id}
</update>
這種情況下,在并發條件肯定會出現超賣的
進行修改:
<update id="decreaseStock">
UPDATE stock
SET stock_num = stock_num - 1
WHERE id = #{id} AND stock_num >= 1
</update>
增加AND stock_num >= 1條件,即可避免超賣。
相關代碼:
@PostMapping(value = "/decreaseStock/{id}")
public ResponseEntity<Object> decreaseStock(@PathVariable("id") Integer id) {
int result = stockService.decreaseStock(id);
return result == 1 ? new ResponseEntity<>("decreaseStock successfully", HttpStatus.OK) : new ResponseEntity<>("decreaseStock failed", HttpStatus.OK);
}
壓測情況:
根據Throught可知一秒可以處理200個事務(TPS)
如果說系統的并發量不高,則可以以這種方案進行防止庫存超賣,但要注意,在可重復讀隔離級別情況下,如果where的條件字段沒有索引的話,進行update語句會使整個表被鎖住,如果這里使用的where條件不是主鍵id而是product_name,那么需要給這個字段加索引。
在RR可重復讀隔離級別下,如果where條件沒有命中索引,那么會基于next-key lock(記錄鎖和間隙鎖的組合)對整個表的所有記錄加上這個鎖,進行全表掃描,這個時候其他記錄想要更新就會被阻塞。
但是不一定是有了索引就不會鎖住整個表,這是由優化器決定的,可以使用Explain語句來查看當前語句是走的索引還是全表掃描,如果優化器走的還是全標掃描,可以使用 force index([index_name]) 強制使用某個索引。
改進
在MySQL情況下還能有其他方案來提升性能嗎,在不借助Redis的情況(曾經面試招銀網絡被問了這道題)
我當時給出的回答是,把單個商品的庫存比如50個庫存,拆分成好幾份,一份10個,5份庫存,由于秒殺情況下流量很大,可以把這五份庫存分別放到五個數據庫里面,這樣性能至少是原先方案的5倍,那么還會出現新的問題,就是有些問題,負載均衡上的問題,可能會出現某些庫里還存在庫存,但是請求卻沒有打進這個數據庫,而是打到庫存已經沒有的數據庫里面。我當時的想法是再搞個庫存表,這個庫存表采集各個商品的總庫存以及商品在各個分庫里面的庫存數量,然后再寫個服務,包含負載均衡的算法,將用戶的請求平均打到各個分庫去,當某個分庫的庫存達到0的時候,去通知該服務,服務將這個庫剔除,使新的請求不會轉發過去。實際這種情況也是存在問題的,高并發下庫存為0的庫來不及被剔除,也會導致請求被打到庫存0的庫。
Redis方案
將庫存暫時放到Redis,然后從Redis進行庫存扣減,能大大提升性能
壓測結果:
可見性能幾乎是MySQL的10倍了,但是這樣子在Redis里面會導致超賣
要確保Redis不超買,需要先查詢當前的數量,如果大于0則進行扣減,并且查詢和扣減需要為原子性,這里就需要借助lua腳本,將這兩次操作寫到一起。
加了Lua腳本的代碼:
private static final String LUA_DECRESE_STOCK_PATH = "lua/decreseStock.lua";
@PostMapping(value = "/decreaseStockByRedis/{id}")
public ResponseEntity<Object> decreaseStockByRedis(@PathVariable("id") Integer id) {
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource(LUA_DECRESE_STOCK_PATH)));
redisScript.setResultType(Long.class);
// 執行Lua腳本
Long result = (Long) redisTemplate.execute(redisScript, Collections.singletonList(id));
// 返回結果判斷
return (result != null && result == 1) ? new ResponseEntity<>("decreaseStock successfully", HttpStatus.OK) : new ResponseEntity<>("decreaseStock failed", HttpStatus.OK);
}
lua腳本放在resource/lua/decreseStock.lua
local key = KEYS[1]
-- 檢查鍵是否存在
local exists = redis.call('EXISTS', key)
if exists == 1 then
-- 鍵存在,獲取值
local value = redis.call('GET', key)
if tonumber(value) > 0 then
-- 如果值大于0,則遞減
redis.call('DECR', key)
return 1 -- 表示遞減成功
else
return 0 -- 表示遞減失敗,值不大于0
end
else
return -1 -- 表示遞減失敗,鍵不存在
end
Redis同步庫存到MySQL
但是在Redis扣減了庫存,總需要同步到MySQL里面
@PostMapping(value = "/decreaseStockByRedis/{id}")
public ResponseEntity<Object> decreaseStockByRedis(@PathVariable("id") Integer id) {
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource(LUA_DECRESE_STOCK_PATH)));
redisScript.setResultType(Long.class);
// 執行Lua腳本
Long redisResult = (Long) redisTemplate.execute(redisScript, Collections.singletonList(id));
int dataBaselResult = 0;
if (redisResult == 1) {
dataBaselResult = stockService.decreaseStock(id);
}
// 返回結果判斷
return (dataBaselResult == 1 && redisResult == 1) ? new ResponseEntity<>("decreaseStock successfully", HttpStatus.OK) : new ResponseEntity<>("decreaseStock failed", HttpStatus.OK);
}
直接按照上述代碼來寫,刪Redis后同時將庫存同步到MySQL,相當于使用了Redis性能又沒有提升。
其實選擇了Redis來進行庫存扣減,那么MySQL的庫存并不需要去實時進行更新,只需要庫存達到最終一致性即可,即先對Redis的庫存進行更新,然后再異步同步到MySQL的庫存。
如果使用spring的異步線程來解決,會不會出現同步MySQL失敗導致數據最終不一致呢,在流量很多的情況下,系統本身就處于壓力大的情況,再使用異步線程會占用額外的資源,最好的方法是引入MQ,把庫存的同步信息交給MQ,MQ再交到消費系統,進行減庫存的操作,由MQ保證消息被消費,實現最終一致性。
部分代碼如下,由MQ product發出,再由consumer進行消費:
private final DecreaseStockProduce decreaseStockProduce;
@PostMapping(value = "/decreaseStockByRedis/{id}")
public ResponseEntity<Object> decreaseStockByRedis(@PathVariable("id") String id) {
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource(LUA_DECRESE_STOCK_PATH)));
redisScript.setResultType(Long.class);
// 執行Lua腳本
Long redisResult = (Long) redisTemplate.execute(redisScript, Collections.singletonList(id));
if (redisResult == 1) {
// 發送消息
try {
DecreaseStockEvent decreaseStockEvent = DecreaseStockEvent.builder()
.id(id)
.build();
SendResult sendResult = decreaseStockProduce.sendMessage(decreaseStockEvent);
if (!Objects.equals(sendResult.getSendStatus(), SendStatus.SEND_OK)) {
log.error("消息發送錯誤,請求參數:{}", id);
}
} catch (Exception e) {
log.error("消息發送錯誤,請求參數:{}", id, e);
}
}
// 返回結果判斷
return (redisResult == 1) ? new ResponseEntity<>("decreaseStock successfully", HttpStatus.OK) : new ResponseEntity<>("decreaseStock failed", HttpStatus.OK);
}
MQ [TIMEOUT_CLEAN_QUEUE] broker busy問題
這里直接壓測會報下面的錯誤,并且這個時候查看redis庫存已經減到0,到是MySQL只減到了37
針對MQ [TIMEOUT_CLEAN_QUEUE] broker busy問題,需要去修改MQ的broker.conf文件
針對TIMEOUT_CLEAN_QUEUE broker busy問題,需要去修改MQ的broker.conf文件,上述的201ms超時了,我這里將等待時間改為400,并且將線程數設置為64,這個線程數可以根據實際壓測情況進行調整。
# 發消息線程池數量
sendMessageThreadPoolNums=64
# 拉消息線程池數量
pullMessageThreadPoolNums=64
waitTimeMillsInSendQueue=400
現在再進行壓測,發現tps能跑到1000,相比直接入庫mysql的200已經是提升很大了。
雖然性能提高,也實現庫存的同步,但這個性能下還是會存在一些問題:
比如MQ消息發送失敗、或者MySQL庫存扣減失敗,并且實際情況還有訂單的生成和庫存之間的一致性也要考慮。
對于上述這些問題,可以查看我的另外一篇博客:
RocketMQ事務消息在訂單創建和庫存扣減的使用 - Scotyzh - 博客園 (cnblogs.com)
總結
以上是生活随笔為你收集整理的mysql和redis库存扣减和优化的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 目录文件夹和根目录
- 下一篇: Redis系列:使用 Redis Mod