谷粒商城项目篇13_分布式高级篇_订单业务模块(提交订单幂等性、分布式事务、延时MQ实现定时任务)
目錄
一、訂單業(yè)務(wù)模塊
訂單流程
購物車跳轉(zhuǎn)訂單確認(rèn)頁
提交訂單接口冪等性
二、分布式事務(wù)
- CAP定理
- BASE理論
- 強(qiáng)一致性、弱一致性、最終一致性
- 2PC模式
- 柔性事務(wù)-TCC事務(wù)補(bǔ)償方案
- 柔性事務(wù)-最大努力通知型方案
- 柔性事務(wù)-可靠消息+最終一致性方案(異步確認(rèn))
三、延時(shí)隊(duì)列實(shí)現(xiàn)定時(shí)任務(wù)
- 消息丟失
- 消息重復(fù)
- 消息積壓
一、訂單業(yè)務(wù)模塊
概述
- 電商系統(tǒng)涉及到 3 流,分別時(shí)信息流,資金流,物流,而訂單系統(tǒng)作為中樞將三者有機(jī)的集
合起來。 - 訂單模塊是電商系統(tǒng)的樞紐,在訂單這個(gè)環(huán)節(jié)上需求獲取多個(gè)模塊的數(shù)據(jù)和信息,同時(shí)對這
些信息進(jìn)行加工處理后流向下個(gè)環(huán)節(jié),這一系列就構(gòu)成了訂單的信息流通
1.訂單流程
不管類型如何訂單都包括正向流程和逆向流程,對應(yīng)的場景就是購買商品和退換貨流程,正
向流程就是一個(gè)正常的網(wǎng)購步驟:訂單生成–>支付訂單–>賣家發(fā)貨–>確認(rèn)收貨–>交易成功。
而每個(gè)步驟的背后,訂單是如何在多系統(tǒng)之間交互流轉(zhuǎn)的,可概括如下圖
1、訂單創(chuàng)建與支付
(1) 、訂單創(chuàng)建前需要預(yù)覽訂單,選擇收貨信息等
(2) 、訂單創(chuàng)建需要鎖定庫存,庫存有才可創(chuàng)建,否則不能創(chuàng)建
(3) 、訂單創(chuàng)建后超時(shí)未支付需要解鎖庫存
(4) 、支付成功后,需要進(jìn)行拆單,根據(jù)商品打包方式,所在倉庫,物流等進(jìn)行拆單
(5) 、支付的每筆流水都需要記錄,以待查賬
(6) 、訂單創(chuàng)建,支付成功等狀態(tài)都需要給 MQ 發(fā)送消息,方便其他系統(tǒng)感知訂閱
2、逆向流程
(1) 、修改訂單,用戶沒有提交訂單,可以對訂單一些信息進(jìn)行修改,比如配送信息,
優(yōu)惠信息,及其他一些訂單可修改范圍的內(nèi)容,此時(shí)只需對數(shù)據(jù)進(jìn)行變更即可。
(2) 、訂單取消,用戶主動取消訂單和用戶超時(shí)未支付,兩種情況下訂單都會取消訂
單,而超時(shí)情況是系統(tǒng)自動關(guān)閉訂單,所以在訂單支付的響應(yīng)機(jī)制上面要做支付的
限時(shí)處理,尤其是在前面說的下單減庫存的情形下面,可以保證快速的釋放庫存。
另外需要需要處理的是促銷優(yōu)惠中使用的優(yōu)惠券,權(quán)益等視平臺規(guī)則,進(jìn)行相應(yīng)補(bǔ)
回給用戶。
(3) 、退款,在待發(fā)貨訂單狀態(tài)下取消訂單時(shí),分為缺貨退款和用戶申請退款。如果是
全部退款則訂單更新為關(guān)閉狀態(tài),若只是做部分退款則訂單仍需進(jìn)行進(jìn)行,同時(shí)生
成一條退款的售后訂單,走退款流程。退款金額需原路返回用戶的賬戶。
(4) 、發(fā)貨后的退款,發(fā)生在倉儲貨物配送,在配送過程中商品遺失,用戶拒收,用戶
收貨后對商品不滿意,這樣情況下用戶發(fā)起退款的售后訴求后,需要商戶進(jìn)行退款
的審核,雙方達(dá)成一致后,系統(tǒng)更新退款狀態(tài),對訂單進(jìn)行退款操作,金額原路返
回用戶的賬戶,同時(shí)關(guān)閉原訂單數(shù)據(jù)。僅退款情況下暫不考慮倉庫系統(tǒng)變化。如果
發(fā)生雙方協(xié)調(diào)不一致情況下,可以申請平臺客服介入。在退款訂單商戶不處理的情
況下,系統(tǒng)需要做限期判斷,比如 5 天商戶不處理,退款單自動變更同意退款
2.購物車頁跳轉(zhuǎn)到訂單確認(rèn)頁
用戶登錄狀態(tài)下查看購物車商品信息,點(diǎn)擊去結(jié)算,訂單詳情頁需要顯示
- 商品最新價(jià)格、優(yōu)惠信息
- 用戶的基本信息、地址
- 支付方式等
- 訂單總價(jià)格
- …
1.登錄攔截器
首先訂單業(yè)務(wù)都需要登錄狀態(tài),設(shè)置一個(gè)全局?jǐn)r截器LoginInterceptor
package henu.soft.xiaosi.order.interceptor;import henu.soft.common.constant.AuthServerConstant; import henu.soft.common.to.MemberResponseTo; import org.springframework.util.AntPathMatcher; import org.springframework.web.servlet.HandlerInterceptor; import org.springframework.web.servlet.ModelAndView;import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpSession;/*** 登錄攔截器,未登錄的用戶不能進(jìn)入訂單服務(wù)*/ public class LoginInterceptor implements HandlerInterceptor {public static ThreadLocal<MemberResponseTo> loginUser = new ThreadLocal<>();@Overridepublic boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {// 獲取登錄狀態(tài)HttpSession session = request.getSession();MemberResponseTo memberResponseVo = (MemberResponseTo) session.getAttribute(AuthServerConstant.LOGIN_USER);//登陸了if (memberResponseVo != null) {loginUser.set(memberResponseVo);return true;}else {session.setAttribute("msg","請先登錄!");response.sendRedirect("http://auth.gulishop.cn/login.html");return false;}}@Overridepublic void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception {}@Overridepublic void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {} }注冊攔截器
package henu.soft.xiaosi.order.config;import henu.soft.xiaosi.order.interceptor.LoginInterceptor; import org.springframework.context.annotation.Configuration;import org.springframework.web.servlet.config.annotation.InterceptorRegistry; import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;@Configuration public class MyWebConfig implements WebMvcConfigurer {@Overridepublic void addInterceptors(InterceptorRegistry registry) {registry.addInterceptor(new LoginInterceptor()).addPathPatterns("/**");} }2.封裝vo
package henu.soft.xiaosi.order.vo;import lombok.Getter; import lombok.Setter;import java.math.BigDecimal; import java.util.List; import java.util.Map;public class OrderConfirmVo {@Getter@Setter/** 會員收獲地址列表 **/private List<MemberAddressVo> memberAddressVos;@Getter @Setter/** 所有選中的購物項(xiàng) **/private List<OrderItemVo> items;/** 發(fā)票記錄 **/@Getter @Setter/** 優(yōu)惠券(會員積分) **/private Integer integration;/** 防止重復(fù)提交的令牌 **/@Getter @Setterprivate String orderToken;@Getter @SetterMap<Long,Boolean> stocks;public Integer getCount() {Integer count = 0;if (items != null && items.size() > 0) {for (OrderItemVo item : items) {count += item.getCount();}}return count;}/** 訂單總額 **///BigDecimal total;//計(jì)算訂單總額public BigDecimal getTotal() {BigDecimal totalNum = BigDecimal.ZERO;if (items != null && items.size() > 0) {for (OrderItemVo item : items) {//計(jì)算當(dāng)前商品的總價(jià)格BigDecimal itemPrice = item.getPrice().multiply(new BigDecimal(item.getCount().toString()));//再計(jì)算全部商品的總價(jià)格totalNum = totalNum.add(itemPrice);}}return totalNum;}/** 應(yīng)付價(jià)格 **///BigDecimal payPrice;public BigDecimal getPayPrice() {return getTotal();} }3.Feign遠(yuǎn)程調(diào)用丟失請求頭信息
登錄信息保存在分布式session中,瀏覽器的cookie保存這些信息,直接瀏覽器訪問controller會帶上cookie
但是遠(yuǎn)程調(diào)用的方法創(chuàng)建一個(gè)新的request對應(yīng)的controller不會帶上cookie,解決辦法是加上feign的攔截器
package henu.soft.xiaosi.cart.config;import feign.RequestInterceptor; import feign.RequestTemplate; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.context.request.RequestAttributes; import org.springframework.web.context.request.RequestContextHolder; import org.springframework.web.context.request.ServletRequestAttributes;import javax.servlet.http.HttpServletRequest;@Configuration public class MyFeignConfig {@Beanpublic RequestInterceptor requestInterceptor() {return new RequestInterceptor() {@Overridepublic void apply(RequestTemplate template) {//1. 使用RequestContextHolder拿到常常請求的請求數(shù)據(jù),同一個(gè)線程內(nèi)可以獲取的到ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();if (requestAttributes != null) {HttpServletRequest request = requestAttributes.getRequest();if (request != null) {//2. 將老請求得到cookie信息放到feign請求上String cookie = request.getHeader("Cookie");template.header("Cookie", cookie);}}}};} }4.Feign遠(yuǎn)程異步調(diào)用丟失上下文信息
Feign遠(yuǎn)程方法調(diào)用異步方法,會開啟新的線程,
- 之前是 主線程---> 攔截器--->controller--->service都是一個(gè)主線程,可以拿到ThreadLocal線程共享的cookie信息
- 現(xiàn)在是 新的異步線程--->攔截器--->controller--->service 每個(gè)異步任務(wù)對應(yīng)一個(gè)線程,拿不到主線程的cookie信息
解決辦法
在異步方法內(nèi)部重新設(shè)置 上下文信息 RequestContextHolder.setRequestAttributes(requestAttributes);
3.提交訂單接口冪等性
概念
- 接口冪等性就是用戶對于同一操作發(fā)起的一次請求或者多次請求的結(jié)果是一致的,不會因
為多次點(diǎn)擊而產(chǎn)生了副作用; - 比如說支付場景,用戶購買了商品支付扣款成功,但是返回結(jié)果的時(shí)候網(wǎng)絡(luò)異常,此時(shí)錢已經(jīng)扣了,用戶再次點(diǎn)擊按鈕,此時(shí)會進(jìn)行第二次扣款,返回結(jié)果成功,用戶查詢余額返發(fā)現(xiàn)多扣錢了,流水記錄也變成了兩條...,這就沒有保證接口
的冪等性。
防止場景
- 用戶多次點(diǎn)擊按鈕
- 用戶頁面回退再次提交
- 微服務(wù)互相調(diào)用,由于網(wǎng)絡(luò)問題,導(dǎo)致請求失敗。feign 觸發(fā)重試機(jī)制
- 其他業(yè)務(wù)情況
冪等情況, 以 SQL 為例,有些操作是天然冪等的。
- SELECT * FROM table WHER id=?,無論執(zhí)行多少次都不會改變狀態(tài),是天然的冪等。
- UPDATE tab1 SET col1=1 WHERE col2=2,無論執(zhí)行成功多少次狀態(tài)都是一致的,也是冪等操作。
- delete from user where userid=1,多次操作,結(jié)果一樣,具備冪等性
- insert into user(userid,name) values(1,'a')如 userid 為唯一主鍵,即重復(fù)操作上面的業(yè)務(wù),只
會插入一條用戶數(shù)據(jù),具備冪等性。 - UPDATE tab1 SET col1=col1+1 WHERE col2=2,每次執(zhí)行的結(jié)果都會發(fā)生變化,不是冪等的。
- insert into user(userid,name) values(1,'a')如 userid 不是主鍵,可以重復(fù),那上面業(yè)務(wù)多次操
作,數(shù)據(jù)都會新增多條,不具備冪等性。
1.令牌token機(jī)制
1、服務(wù)端提供了發(fā)送 token 的接口。我們在分析業(yè)務(wù)的時(shí)候,哪些業(yè)務(wù)是存在冪等問題的,
就必須在執(zhí)行業(yè)務(wù)前,先去獲取 token,服務(wù)器會把 token 保存到 redis 中。
2、然后調(diào)用業(yè)務(wù)接口請求時(shí),把 token 攜帶過去,一般放在請求頭部。
3、服務(wù)器判斷 token 是否存在 redis 中,存在表示第一次請求,然后刪除 token,繼續(xù)執(zhí)行業(yè)
務(wù)。
4、如果判斷 token 不存在 redis 中,就表示是重復(fù)操作,直接返回重復(fù)標(biāo)記給 client,這樣
就保證了業(yè)務(wù)代碼,不被重復(fù)執(zhí)行。
危險(xiǎn)性:
1、先刪除 token 還是后刪除 token;
-
先刪除可能導(dǎo)致,業(yè)務(wù)確實(shí)沒有執(zhí)行,重試還帶上之前 token,由于防重設(shè)計(jì)導(dǎo)致,
請求還是不能執(zhí)行。 -
后刪除可能導(dǎo)致,業(yè)務(wù)處理成功,但是服務(wù)閃斷,出現(xiàn)超時(shí),沒有刪除 token,別
人繼續(xù)重試,導(dǎo)致業(yè)務(wù)被執(zhí)行兩邊 -
我們最好設(shè)計(jì)為先刪除 token,如果業(yè)務(wù)調(diào)用失敗,就重新獲取 token 再次請求。
2、Token 獲取、比較和刪除必須是原子性
-
redis.get(token) 、token.equals、redis.del(token)如果這兩個(gè)操作不是原子,可能導(dǎo)
致,高并發(fā)下,都 get 到同樣的數(shù)據(jù),判斷都成功,繼續(xù)業(yè)務(wù)并發(fā)執(zhí)行 -
可以在 redis 使用 lua 腳本完成這個(gè)操作
if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 en
1.準(zhǔn)備令牌
// 常量 package henu.soft.common.constant;public class OrderConstant {public static final String USER_ORDER_TOKEN_PREFIX = "order:token"; }//6. 防重令牌String token = UUID.randomUUID().toString().replace("-", "");// 存入redisredisTemplate.opsForValue().set(OrderConstant.USER_ORDER_TOKEN_PREFIX + memberResponseTo.getId(), token, 30, TimeUnit.MINUTES);// 返回給頁面confirmVo.setOrderToken(token); <form action="http://order.gulishop.cn/submitOrder" method="post"><input id="addrInput" type="hidden" name="addrId"/><input id="payPriceInput" type="hidden" name="payPrice"><input name="orderToken" th:value="${confirmOrder.orderToken}" type="hidden"/><button class="tijiao" type="submit">提交訂單</button></form>2.封裝實(shí)體
訂單實(shí)體
package henu.soft.xiaosi.order.entity;import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName;import java.math.BigDecimal; import java.io.Serializable; import java.util.Date; import lombok.Data;/*** 訂單* * @author xiaosi* @email 2589165806@qq.com* @date 2021-07-22 23:34:48*/ @Data @TableName("oms_order") public class OrderEntity implements Serializable {private static final long serialVersionUID = 1L;/*** id*/@TableIdprivate Long id;/*** member_id*/private Long memberId;/*** 訂單號*/private String orderSn;/*** 使用的優(yōu)惠券*/private Long couponId;/*** create_time*/private Date createTime;/*** 用戶名*/private String memberUsername;/*** 訂單總額*/private BigDecimal totalAmount;/*** 應(yīng)付總額*/private BigDecimal payAmount;/*** 運(yùn)費(fèi)金額*/private BigDecimal freightAmount;/*** 促銷優(yōu)化金額(促銷價(jià)、滿減、階梯價(jià))*/private BigDecimal promotionAmount;/*** 積分抵扣金額*/private BigDecimal integrationAmount;/*** 優(yōu)惠券抵扣金額*/private BigDecimal couponAmount;/*** 后臺調(diào)整訂單使用的折扣金額*/private BigDecimal discountAmount;/*** 支付方式【1->支付寶;2->微信;3->銀聯(lián); 4->貨到付款;】*/private Integer payType;/*** 訂單來源[0->PC訂單;1->app訂單]*/private Integer sourceType;/*** 訂單狀態(tài)【0->待付款;1->待發(fā)貨;2->已發(fā)貨;3->已完成;4->已關(guān)閉;5->無效訂單】*/private Integer status;/*** 物流公司(配送方式)*/private String deliveryCompany;/*** 物流單號*/private String deliverySn;/*** 自動確認(rèn)時(shí)間(天)*/private Integer autoConfirmDay;/*** 可以獲得的積分*/private Integer integration;/*** 可以獲得的成長值*/private Integer growth;/*** 發(fā)票類型[0->不開發(fā)票;1->電子發(fā)票;2->紙質(zhì)發(fā)票]*/private Integer billType;/*** 發(fā)票抬頭*/private String billHeader;/*** 發(fā)票內(nèi)容*/private String billContent;/*** 收票人電話*/private String billReceiverPhone;/*** 收票人郵箱*/private String billReceiverEmail;/*** 收貨人姓名*/private String receiverName;/*** 收貨人電話*/private String receiverPhone;/*** 收貨人郵編*/private String receiverPostCode;/*** 省份/直轄市*/private String receiverProvince;/*** 城市*/private String receiverCity;/*** 區(qū)*/private String receiverRegion;/*** 詳細(xì)地址*/private String receiverDetailAddress;/*** 訂單備注*/private String note;/*** 確認(rèn)收貨狀態(tài)[0->未確認(rèn);1->已確認(rèn)]*/private Integer confirmStatus;/*** 刪除狀態(tài)【0->未刪除;1->已刪除】*/private Integer deleteStatus;/*** 下單時(shí)使用的積分*/private Integer useIntegration;/*** 支付時(shí)間*/private Date paymentTime;/*** 發(fā)貨時(shí)間*/private Date deliveryTime;/*** 確認(rèn)收貨時(shí)間*/private Date receiveTime;/*** 評價(jià)時(shí)間*/private Date commentTime;/*** 修改時(shí)間*/private Date modifyTime;}封裝的訂單vo
package henu.soft.xiaosi.order.vo;import lombok.Data;import java.math.BigDecimal;@Data public class OrderSubmitVo {/** 收獲地址的id **/private Long addrId;/** 支付方式 **/private Integer payType;//無需提交要購買的商品,去購物車再獲取一遍//優(yōu)惠、發(fā)票/** 防重令牌 **/private String orderToken;/** 應(yīng)付價(jià)格 **/private BigDecimal payPrice;/** 訂單備注 **/private String remarks;//用戶相關(guān)的信息,直接去session中取出即可 }3.對應(yīng)controller
/*** 確認(rèn)訂單* @param submitVo* @param model* @param attributes* @return*/@RequestMapping("/submitOrder")public String submitOrder(OrderSubmitVo submitVo, Model model, RedirectAttributes attributes) {try {SubmitOrderResponseVo responseVo = orderService.submitOrder(submitVo);Integer code = responseVo.getCode();if (code == 0) {model.addAttribute("order", responseVo.getOrder());return "pay";} else {String msg = "下單失敗;";switch (code) {case 1:msg += "防重令牌校驗(yàn)失敗";break;case 2:msg += "商品價(jià)格發(fā)生變化";break;}attributes.addFlashAttribute("msg", msg);return "redirect:http://order.gulishop.cn/toTrade";}} catch (Exception e) {if (e instanceof NoStockException) {String msg = "下單失敗,商品無庫存";attributes.addFlashAttribute("msg", msg);}return "redirect:http://order.gulishop.cn/toTrade";}}4.對應(yīng)service
步驟
- //1. 驗(yàn)證令牌,前端傳遞的令牌和redis存儲的令牌對比
- //2. 創(chuàng)建訂單、訂單項(xiàng)
- //3. 驗(yàn)價(jià)
- //4. 保存訂單
- //5. 鎖定庫存(RabbitMQ延時(shí)隊(duì)列)
數(shù)據(jù)庫表信息
驗(yàn)價(jià)
鎖庫存
封裝vo
package henu.soft.xiaosi.order.vo;import henu.soft.common.to.OrderItemTo; import lombok.Data;import java.util.List;@Data public class WareSkuLockVo {private String OrderSn;private List<OrderItemTo> locks; } //5. 鎖定庫存List<OrderItemTo> orderItemTos = order.getOrderItems().stream().map((item) -> {OrderItemTo orderItemTo = new OrderItemTo();orderItemTo.setSkuId(item.getSkuId());orderItemTo.setCount(item.getSkuQuantity());return orderItemTo;}).collect(Collectors.toList());WareSkuLockVo lockVo = new WareSkuLockVo();lockVo.setOrderSn(order.getOrder().getOrderSn());lockVo.setLocks(orderItemTos);R r = wareFeignService.orderLockStock(lockVo);//5.1 鎖定庫存成功if (r.getCode() == 0) { // int i = 10 / 0;responseVo.setOrder(order.getOrder());responseVo.setCode(0);//發(fā)送消息到訂單延遲隊(duì)列,判斷過期訂單rabbitTemplate.convertAndSend("order-event-exchange", "order.create.order", order.getOrder());//清除購物車記錄BoundHashOperations<String, Object, Object> ops = redisTemplate.boundHashOps(CartConstant.CART_PREFIX + memberResponseTo.getId());for (OrderItemEntity orderItem : order.getOrderItems()) {ops.delete(orderItem.getSkuId().toString());}return responseVo;} else {//5.1 鎖定庫存失敗String msg = (String) r.get("msg");throw new NoStockException(msg);}2.各種鎖機(jī)制
1、數(shù)據(jù)庫悲觀鎖
-
select * from xxxx where id = 1 for update;
-
悲觀鎖使用時(shí)一般伴隨事務(wù)一起使用,數(shù)據(jù)鎖定時(shí)間可能會很長,需要根據(jù)實(shí)際情況選用。
另外要注意的是,id 字段一定是主鍵或者唯一索引,不然可能造成鎖表的結(jié)果,處理起來會
非常麻煩。
2、數(shù)據(jù)庫樂觀鎖
-
update t_goods set count = count -1 , version = version + 1 where good_id=2 and version = 1
-
這種方法適合在更新的場景中,根據(jù) version 版本,也就是在操作庫存前先獲取當(dāng)前商品的 version 版本號,然后操作的時(shí)候帶上此 version 號。我們梳理下,我們第一次操作庫存時(shí),得到 version 為 1,調(diào)用庫存服務(wù)version 變成了 2;但返回給訂單服務(wù)出現(xiàn)了問題,訂單服務(wù)又一次發(fā)起調(diào)用庫存服務(wù),當(dāng)訂單服務(wù)傳如的 version 還是 1,再執(zhí)行上面的 sql 語句時(shí),就不會執(zhí)行;因?yàn)?version 已經(jīng)變?yōu)?2 了,where 條件就不成立。這樣就保證了不管調(diào)用幾次,只會真正的處理一次。
-
樂觀鎖主要使用于處理讀多寫少的問題
3、業(yè)務(wù)層分布式鎖
- 如果多個(gè)機(jī)器可能在同一時(shí)間同時(shí)處理相同的數(shù)據(jù),比如多臺機(jī)器定時(shí)任務(wù)都拿到了相同數(shù)
據(jù)處理,我們就可以加分布式鎖,鎖定此數(shù)據(jù),處理完成后釋放鎖。獲取到鎖的必須先判斷
這個(gè)數(shù)據(jù)是否被處理過。
3.各種唯一約束
1、數(shù)據(jù)庫唯一約束
- 插入數(shù)據(jù),應(yīng)該按照唯一索引進(jìn)行插入,比如訂單號,相同的訂單就不可能有兩條記錄插入。
我們在數(shù)據(jù)庫層面防止重復(fù)。 - 這個(gè)機(jī)制是利用了數(shù)據(jù)庫的主鍵唯一約束的特性,解決了在 insert 場景時(shí)冪等問題。但主鍵
的要求不是自增的主鍵,這樣就需要業(yè)務(wù)生成全局唯一的主鍵。 - 如果是分庫分表場景下,路由規(guī)則要保證相同請求下,落地在同一個(gè)數(shù)據(jù)庫和同一表中,要
不然數(shù)據(jù)庫主鍵約束就不起效果了,因?yàn)槭遣煌臄?shù)據(jù)庫和表主鍵不相關(guān)。
2、redis set 防重
- 很多數(shù)據(jù)需要處理,只能被處理一次,比如我們可以計(jì)算數(shù)據(jù)的 MD5 將其放入 redis 的 set,
每次處理數(shù)據(jù),先看這個(gè) MD5 是否已經(jīng)存在,存在就不處理。
4.防重表
-
使用訂單號 orderNo 做為去重表的唯一索引,把唯一索引插入去重表,再進(jìn)行業(yè)務(wù)操作,且
他們在同一個(gè)事務(wù)中。這個(gè)保證了重復(fù)請求時(shí),因?yàn)槿ブ乇碛形ㄒ患s束,導(dǎo)致請求失敗,避
免了冪等問題。這里要注意的是,去重表和業(yè)務(wù)表應(yīng)該在同一庫中,這樣就保證了在同一個(gè)
事務(wù),即使業(yè)務(wù)操作失敗了,也會把去重表的數(shù)據(jù)回滾。這個(gè)很好的保證了數(shù)據(jù)一致性。 -
之前說的 redis 防重也算
5.全局請求唯一id
- 調(diào)用接口時(shí),生成一個(gè)唯一 id,redis 將數(shù)據(jù)保存到集合中(去重),存在即處理過。
- 可以使用 nginx 設(shè)置每一個(gè)請求的唯一 id;
proxy_set_header X-Request-Id $request_id
二、分布式事務(wù)(提交訂單、鎖庫存)
前面提交訂單調(diào)用遠(yuǎn)程的倉庫服務(wù)鎖庫存,加上的事務(wù)是 本地事務(wù)
- 提交訂單加上,出現(xiàn)異常回滾
- 訂單保存成功,但是鎖庫存失敗,還是回滾
但是由于不確定因素
- 訂單保存成功了,庫存也鎖成功了,但是 由于網(wǎng)絡(luò)原因 并未正常完成邏輯,導(dǎo)致訂單保存回滾,但是鎖庫存沒有回滾
- 訂單提交保存訂單之后,還可能調(diào)用多個(gè)其他的遠(yuǎn)程服務(wù),遠(yuǎn)程父事務(wù) 并不能 很好的管理 子事務(wù)
因此本地事務(wù)只能控制本地方法的調(diào)用,對于遠(yuǎn)程調(diào)用,因此要求統(tǒng)一的分布式事務(wù)管理
1.本地事務(wù)
1、事務(wù)的基本性質(zhì)
數(shù)據(jù)庫事務(wù)的幾個(gè)特性:原子性(Atomicity )、一致性( Consistency )、隔離性或獨(dú)立性( Isolation)
和持久性(Durabilily),簡稱就是 ACID;
- 原子性:一系列的操作整體不可拆分,要么同時(shí)成功,要么同時(shí)失敗
- 一致性:數(shù)據(jù)在事務(wù)的前后,業(yè)務(wù)整體一致。 轉(zhuǎn)賬。A:1000;B:1000; 轉(zhuǎn) 200 事務(wù)成功; A:800 B:1200
- 隔離性:事務(wù)之間互相隔離。
- 持久性:一旦事務(wù)成功,數(shù)據(jù)一定會落盤在數(shù)據(jù)庫
2、事務(wù)的隔離級別isolation
- READ UNCOMMITTED(讀未提交)
該隔離級別的事務(wù)會讀到其它未提交事務(wù)的數(shù)據(jù),此現(xiàn)象也稱之為臟讀。 - READ COMMITTED(讀提交)
一個(gè)事務(wù)可以讀取另一個(gè)已提交的事務(wù),多次讀取會造成不一樣的結(jié)果,此現(xiàn)象稱為不可重
復(fù)讀問題,Oracle 和 SQL Server 的默認(rèn)隔離級別。 - REPEATABLE READ(可重復(fù)讀)
該隔離級別是 MySQL 默認(rèn)的隔離級別,在同一個(gè)事務(wù)里,select 的結(jié)果是事務(wù)開始時(shí)時(shí)間
點(diǎn)的狀態(tài),因此,同樣的 select 操作讀到的結(jié)果會是一致的,但是,會有幻讀現(xiàn)象。MySQL
的 InnoDB 引擎可以通過 next-key locks 機(jī)制(參考下文"行鎖的算法"一節(jié))來避免幻讀。 - SERIALIZABLE(序列化)
在該隔離級別下事務(wù)都是串行順序執(zhí)行的,MySQL 數(shù)據(jù)庫的 InnoDB 引擎會給讀操作隱式
加一把讀共享鎖,從而避免了臟讀、不可重讀復(fù)讀和幻讀問題。
3、事務(wù)的傳播行為propagation
就加入該事務(wù),該設(shè)置是最常用的設(shè)置。
前不存在事務(wù),就以非事務(wù)執(zhí)行。
當(dāng)前不存在事務(wù),就拋出異常。
前事務(wù)掛起。
則執(zhí)行與 PROPAGATION_REQUIRED 類似的操作。
4、SpringBoot 事務(wù)關(guān)鍵點(diǎn)
- 在同一個(gè)類里面,編寫兩個(gè)方法,內(nèi)部調(diào)用的時(shí)候,會導(dǎo)致事務(wù)設(shè)置失效。原因是沒有用到
代理對象的緣故。同一個(gè)service方法子事務(wù)的調(diào)用繞過了代理對象,導(dǎo)致直接是方法調(diào)用 - 解決:
0)、導(dǎo)入 spring-boot-starter-aop
1)、@EnableTransactionManagement(proxyTargetClass = true)
2)、@EnableAspectJAutoProxy(exposeProxy=true)
3)、AopContext.currentProxy() 調(diào)用方
2.分布式事務(wù)
1、CAP 定理
CAP 原則又稱 CAP 定理,指的是在一個(gè)分布式系統(tǒng)中
- 一致性(Consistency):
在分布式系統(tǒng)中的所有數(shù)據(jù)備份,在同一時(shí)刻是否同樣的值。(等同于所有節(jié)點(diǎn)訪
問同一份最新的數(shù)據(jù)副本) - 可用性(Availability)
在集群中一部分節(jié)點(diǎn)故障后,集群整體是否還能響應(yīng)客戶端的讀寫請求。(對數(shù)據(jù)
更新具備高可用性) - 分區(qū)容錯性(Partition tolerance)
大多數(shù)分布式系統(tǒng)都分布在多個(gè)子網(wǎng)絡(luò)。每個(gè)子網(wǎng)絡(luò)就叫做一個(gè)區(qū)(partition)。
分區(qū)容錯的意思是,區(qū)間通信可能失敗。比如,一臺服務(wù)器放在中國,另一臺服務(wù)
器放在美國,這就是兩個(gè)區(qū),它們之間可能無法通信。
CAP 原則指的是,這三個(gè)要素最多只能同時(shí)實(shí)現(xiàn)兩點(diǎn),不可能三者兼顧。
-
一般來說,分區(qū)容錯無法避免,因此可以認(rèn)為 CAP 的 P 總是成立。CAP 定理告訴我們,剩下的 C 和 A 無法同時(shí)做到。
-
分布式系統(tǒng)中實(shí)現(xiàn)一致性的 raft 算法,類似redis的主從復(fù)制、哨兵模式
paxos:http://thesecretlivesofdata.com/raft/ -
對于多數(shù)大型互聯(lián)網(wǎng)應(yīng)用的場景,主機(jī)眾多、部署分散,而且現(xiàn)在的集群規(guī)模越來越大,所
以節(jié)點(diǎn)故障、網(wǎng)絡(luò)故障是常態(tài),而且要保證服務(wù)可用性達(dá)到 99.99999%(N 個(gè) 9),即保證
P 和 A,舍棄 C,即不能保證強(qiáng)一致,但是可以彌補(bǔ)強(qiáng)一致
2.BASE理論
是對 CAP 理論的延伸,思想是即使無法做到強(qiáng)一致性(CAP 的一致性就是強(qiáng)一致性),但
- 以采用適當(dāng)?shù)牟扇∪跻恢滦?#xff0c;即最終一致性。
- 即不能保證強(qiáng)一致,但是可以彌補(bǔ)強(qiáng)一致
BASE 是指
- 基本可用(Basically Available)
(1)基本可用是指分布式系統(tǒng)在出現(xiàn)故障的時(shí)候,允許損失部分可用性(例如響應(yīng)時(shí)間、
功能上的可用性),允許損失部分可用性。需要注意的是,基本可用絕不等價(jià)于系
統(tǒng)不可用。
(2)響應(yīng)時(shí)間上的損失:正常情況下搜索引擎需要在 0.5 秒之內(nèi)返回給用戶相應(yīng)的
查詢結(jié)果,但由于出現(xiàn)故障(比如系統(tǒng)部分機(jī)房發(fā)生斷電或斷網(wǎng)故障),查詢
結(jié)果的響應(yīng)時(shí)間增加到了 1~2 秒。
(3)功能上的損失:購物網(wǎng)站在購物高峰(如雙十一)時(shí),為了保護(hù)系統(tǒng)的穩(wěn)定性,
部分消費(fèi)者可能會被引導(dǎo)到一個(gè)降級頁面。 - 軟狀態(tài)( Soft State)(處于失敗、成功的中間狀態(tài))
軟狀態(tài)是指允許系統(tǒng)存在中間狀態(tài),而該中間狀態(tài)不會影響系統(tǒng)整體可用性。分布
式存儲中一般一份數(shù)據(jù)會有多個(gè)副本,允許不同副本同步的延時(shí)就是軟狀態(tài)的體
現(xiàn)。mysql replication 的異步復(fù)制也是一種體現(xiàn)。 - 最終一致性( Eventual Consistency)
最終一致性是指系統(tǒng)中的所有數(shù)據(jù)副本經(jīng)過一定時(shí)間后,最終能夠達(dá)到一致的狀
態(tài)。弱一致性和強(qiáng)一致性相反,最終一致性是弱一致性的一種特殊情況
3. 強(qiáng)一致性、弱一致性、最終一致性
從客戶端角度,多進(jìn)程并發(fā)訪問時(shí),更新過的數(shù)據(jù)在不同進(jìn)程如何獲取的不同策略,決定了
不同的一致性。
-
對于關(guān)系型數(shù)據(jù)庫,要求更新過的數(shù)據(jù)能被后續(xù)的訪問都能看到,這是強(qiáng)一致性。
-
如果能容忍后續(xù)的部分或者全部訪問不到,則是弱一致性。(容忍軟件態(tài),容忍彌補(bǔ)一致性的時(shí)間操作)
-
如果經(jīng)過一段時(shí)間后要求能訪問到更新后的數(shù)據(jù),則是最終一致性
三、分布式事務(wù)解決方案
1. 2PC 模式
數(shù)據(jù)庫支持的 2PC【2 phase commit 二階提交】,又叫做 XA Transactions。MySQL 從 5.5 版本開始支持,SQL Server 2005 開始支持,Oracle 7 開始支持。其中,XA 是一個(gè)兩階段提交協(xié)議,該協(xié)議分為以下兩個(gè)階段:
- 第一階段:事務(wù)協(xié)調(diào)器要求每個(gè)涉及到事務(wù)的數(shù)據(jù)庫預(yù)提交(precommit)此操作,并反映是
否可以提交. - 第二階段:事務(wù)協(xié)調(diào)器要求每個(gè)數(shù)據(jù)庫提交數(shù)據(jù)。
其中,如果有任何一個(gè)數(shù)據(jù)庫否決此次提交,那么所有數(shù)據(jù)庫都會被要求回滾它們在此事務(wù)
中的那部分信息。
- XA 協(xié)議比較簡單,而且一旦商業(yè)數(shù)據(jù)庫實(shí)現(xiàn)了 XA 協(xié)議,使用分布式事務(wù)的成本也比較
低。 - XA 性能不理想,特別是在交易下單鏈路,往往并發(fā)量很高,XA 無法滿足高并發(fā)場景
- XA 目前在商業(yè)數(shù)據(jù)庫支持的比較理想,在 mysql 數(shù)據(jù)庫中支持的不太理想,mysql 的XA 實(shí)現(xiàn),沒有記錄 prepare 階段日志,主備切換回導(dǎo)致主庫與備庫數(shù)據(jù)不一致。
- 許多 nosql 也沒有支持 XA,這讓 XA 的應(yīng)用場景變得非常狹隘。
- 也有 3PC,引入了超時(shí)機(jī)制(無論協(xié)調(diào)者還是參與者,在向?qū)Ψ桨l(fā)送請求后,若長時(shí)間
未收到回應(yīng)則做出相應(yīng)處理)
2. 柔性事務(wù)-TCC 事務(wù)補(bǔ)償型方案(遵循BASE原則)
- 剛性事務(wù):遵循 ACID 原則,強(qiáng)一致性。
- 柔性事務(wù):遵循 BASE 理論,最終一致性;與剛性事務(wù)不同,柔性事務(wù)允許一定時(shí)間內(nèi),不同節(jié)點(diǎn)的數(shù)據(jù)不一致,但要求最終一致。
- 一階段 prepare 行為:調(diào)用 自定義 的 prepare 邏輯。該邏輯為父事務(wù)、各個(gè)子事務(wù)調(diào)用自己的try接口方法
- 二階段 commit 行為:調(diào)用 自定義 的 commit 邏輯。該邏輯為父事務(wù)、各個(gè)子事務(wù)調(diào)用自己的confirm接口方法
- 二階段 rollback 行為:調(diào)用 自定義 的 rollback 邏輯。該邏輯為父事務(wù)、各個(gè)子事務(wù)調(diào)用自己的cancle接口方法
所謂 TCC 模式,是指支持把 自定義 的分支事務(wù)納入到全局事務(wù)的管理中(抽取一層)
3.柔性事務(wù)-最大努力通知型方案
按規(guī)律進(jìn)行通知,不保證數(shù)據(jù)一定能通知成功,但會提供可查詢操作接口進(jìn)行核對。
- 這種方案主要用在與第三方系統(tǒng)通訊時(shí),比如:調(diào)用微信或支付寶支付后的支付結(jié)果通知。這種方案也是結(jié)合 MQ 進(jìn)行實(shí)現(xiàn),例如:通過 MQ 發(fā)送 http 請求,設(shè)置最大通知次數(shù)。達(dá)到通知次數(shù)后即不再通知。
- 案例:銀行通知、商戶通知等(各大交易業(yè)務(wù)平臺間的商戶通知:多次通知、查詢校對、對賬文件),支付寶的支付成功異步回調(diào)
- 案例:不斷提醒訂閱MQ的服務(wù)父事務(wù)執(zhí)行失敗,直到作出回應(yīng)(如訂單下失敗解鎖庫存)
4.柔性事務(wù)-可靠消息+最終一致性方案(異步確保型)
- 實(shí)現(xiàn):業(yè)務(wù)處理服務(wù)在業(yè)務(wù)事務(wù)提交之前,向?qū)崟r(shí)消息服務(wù)請求發(fā)送消息,實(shí)時(shí)消息服務(wù)只
記錄消息數(shù)據(jù),而不是真正的發(fā)送。業(yè)務(wù)處理服務(wù)在業(yè)務(wù)事務(wù)提交之后,向?qū)崟r(shí)消息服務(wù)確
認(rèn)發(fā)送。只有在得到確認(rèn)發(fā)送指令后,實(shí)時(shí)消息服務(wù)才會真正發(fā)送
四、整合Seata
1.概述
- 官網(wǎng):http://seata.io/zh-cn/docs/overview/what-is-seata.html
- Seata 是一款開源的分布式事務(wù)解決方案,致力于提供高性能和簡單易用的分布式事務(wù)服務(wù)。Seata 將為用戶提供了 AT、TCC、SAGA 和 XA 事務(wù)模式,為用戶打造一站式的分布式解決方案。
- TC:協(xié)調(diào)全局
- TM:控制整個(gè)大的事務(wù)
- RM:各個(gè)微服務(wù)獨(dú)立的資源管理器,每一個(gè)微服務(wù)都需要一個(gè)回滾日志表,即使不能回滾也要補(bǔ)償修改的內(nèi)容
2.建立Seata日志表
每個(gè)微服務(wù)創(chuàng)建 UNDO_LOG 表,SEATA AT 模式需要 UNDO_LOG 表
-- 注意此處0.3.0+ 增加唯一索引 ux_undo_log CREATE TABLE `undo_log` (`id` bigint(20) NOT NULL AUTO_INCREMENT,`branch_id` bigint(20) NOT NULL,`xid` varchar(100) NOT NULL,`context` varchar(128) NOT NULL,`rollback_info` longblob NOT NULL,`log_status` int(11) NOT NULL,`log_created` datetime NOT NULL,`log_modified` datetime NOT NULL,`ext` varchar(100) DEFAULT NULL,PRIMARY KEY (`id`),UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;3.安裝事務(wù)協(xié)調(diào)器TC
- 下載地址:https://github.com/seata/seata/releases/tag/v1.2.0
- 解壓配置
配置文件file.conf
啟動nacos和seata,如果報(bào)錯參考:報(bào)錯2_Seata啟動報(bào)錯:Initialization of output ‘file=xxxlogs/seata_gc.log‘ using options ‘(null)‘ failed.
4.整合
- 導(dǎo)入依賴:https://github.com/seata/seata
- 配置DataSourceProxy代理各個(gè)微服務(wù)的數(shù)據(jù)源
- 每個(gè)微服務(wù)的resource需要放入registry.conf、file.conf,并配置分組
- 父事務(wù)加注解@GlobalTransactional,子事務(wù)不用,測試
5.說明
對于普通業(yè)務(wù)如后臺管理的業(yè)務(wù),分布式事務(wù)解決方案可以使用 Seata的 AT 分布式事務(wù)管理,也就是上面的操作
但是對于高并發(fā)下的場景,這種是不適合的,需要使用 柔性事務(wù)-可靠消息 + 最終一致性方案
- 父事務(wù)失敗,發(fā)送失敗消息給子事務(wù),子事務(wù)回滾
- 延時(shí)隊(duì)列實(shí)現(xiàn)定時(shí)任務(wù)(定時(shí)任務(wù)太耗費(fèi)資源),掃描數(shù)據(jù)庫表保存的鎖庫存記錄,根據(jù)訂單狀態(tài)判斷,然后將失敗的鎖庫存自動解鎖
五、延時(shí)隊(duì)列實(shí)現(xiàn)定時(shí)任務(wù)
1.場景分析
定時(shí)任務(wù)的缺點(diǎn)
- 耗費(fèi)系統(tǒng)內(nèi)存、數(shù)據(jù)庫資源
- 時(shí)效性不能保證,30分鐘訂單未支付被關(guān)閉可能需要多輪才能被掃描出來
父事務(wù)下訂單,子事務(wù)1鎖庫存,子事務(wù)…
-
不需要分布式事務(wù)的場景
- 訂單失敗,未進(jìn)行到鎖庫存,只需要父事務(wù)自動回滾即可。
- 訂單成功,庫存鎖定成功,其他子事務(wù)成功。無需回滾
-
需要分布式事務(wù)的場景
- 訂單成功,鎖庫存業(yè)務(wù)也成功,其他遠(yuǎn)程子事務(wù)失敗,導(dǎo)致訂單回滾,需要自動解鎖庫存。
- 訂單成功,用戶未支付、手動取消,需要自動解鎖庫存
使用RabbitMQ的延時(shí)隊(duì)列
- 訂單提交之后,先被放到消息隊(duì)列,到達(dá)指定時(shí)間30分鐘后發(fā)送給邏輯業(yè)務(wù)進(jìn)行數(shù)據(jù)庫訂單保存
- 訂單提交之后,庫存鎖定成功信息先被放到消息隊(duì)列,達(dá)到指定時(shí)間40分鐘后檢查訂單,訂單不存在的話自動解鎖庫存
- 其實(shí)延時(shí)隊(duì)列就是保證 訂單狀態(tài)更新后(已支付、手動取消),判斷庫存鎖定是否邏輯正確,不正確就更正
2.概念
- 消息的TTL(Time To Live)消息的存活時(shí)間
- 可以對隊(duì)列、消息設(shè)置TTL,前者沒有該隊(duì)列消費(fèi)者時(shí)消息保留最大時(shí)間,后者是該消息沒有消費(fèi)者是保留最大時(shí)間,超過這個(gè)時(shí)間成為死信
- 如果隊(duì)列和消息都設(shè)置了,取二者最小的
- 通過消息的expiration字段或者 x-message-ttl屬性來設(shè)置時(shí)間
死信會進(jìn)入死信路由(DLX對應(yīng)多個(gè)隊(duì)列的路由的Dead Letter Exchage是在普通的路由加上消息轉(zhuǎn)發(fā)機(jī)制)
- 被消費(fèi)者reject拒收的消息,并且參數(shù)為requeue為false,即該消息不會被重新放入隊(duì)列
- 設(shè)置TTL到期的消息
- 隊(duì)列長度限制滿了。排在前面的消息被丟棄或者扔到死路由的
使用MQ
- 1、Queue、Exchange、Binding可以@Bean進(jìn)去
- 2、監(jiān)聽消息的方法可以有三種參數(shù)(不分?jǐn)?shù)量,順序)
- Object content, Message message, Channel channel
- 3、channel可以用來拒絕消息,否則自動ack;
可以控制消息在一段時(shí)間變成死信,也可一控制死信轉(zhuǎn)到對應(yīng)的交換機(jī),結(jié)合二者,實(shí)現(xiàn)延時(shí)隊(duì)列
建議使用隊(duì)列過期時(shí)間
升級版
-
不需要分布式事務(wù)的場景
- 訂單失敗,未進(jìn)行到鎖庫存,只需要父事務(wù)自動回滾即可。
- 訂單成功,庫存鎖定成功,其他子事務(wù)成功。無需回滾。
-
需要分布式事務(wù)的場景
- 訂單成功,鎖庫存業(yè)務(wù)也成功,發(fā)送庫存鎖定信息到延時(shí)隊(duì)列,其他遠(yuǎn)程子事務(wù)失敗,導(dǎo)致訂單回滾,需要自動解鎖庫存。
- 訂單成功,鎖庫存業(yè)務(wù)也成功,發(fā)送庫存鎖定信息到延時(shí)隊(duì)列,用戶未支付、手動取消,需要自動解鎖庫存
3.代碼實(shí)現(xiàn)
1.訂單服務(wù)
訂單服務(wù)
- 訂單微服務(wù)交換機(jī)order-event-exchange
- 延時(shí)隊(duì)列order.delay.queue
- 死信消費(fèi)隊(duì)列order.release.order.queue
- 兩個(gè)隊(duì)列的綁定
2.庫存服務(wù)
庫存服務(wù)
- 一個(gè)交換機(jī)stock-event-exchange
- 普通隊(duì)列 stock.release.stock.queue,需要service監(jiān)聽,有消息證明要回滾
- 延時(shí)隊(duì)列 stock.delay.stock.queue
- 兩個(gè)綁定
保存訂單詳情日志發(fā)給mq
// 待鎖的商品、數(shù)量、倉庫id信息for (SkuLockVo lockVo : lockVos) {boolean lock = true;Long skuId = lockVo.getSkuId();List<Long> wareIds = lockVo.getWareIds();//如果沒有滿足條件的倉庫,拋出異常if (wareIds == null || wareIds.size() == 0) {throw new NoStockException(skuId);} else {for (Long wareId : wareIds) {// 一個(gè)個(gè)倉庫的鎖定// 成功返回1Long count = baseMapper.lockWareSku(skuId, lockVo.getNum(), wareId);if (count == 0) {lock = false;} else {//1. 鎖定成功,保存訂單詳情WareOrderTaskDetailEntity detailEntity = WareOrderTaskDetailEntity.builder().skuId(skuId).skuName("").skuNum(lockVo.getNum()).taskId(taskEntity.getId()).wareId(wareId).lockStatus(1).build();wareOrderTaskDetailService.save(detailEntity);//2. 發(fā)送庫存鎖定消息至延遲隊(duì)列StockLockedTo lockedTo = new StockLockedTo();lockedTo.setId(taskEntity.getId());StockDetailTo detailTo = new StockDetailTo();try {BeanUtils.copyProperties(detailEntity, detailTo);} catch (IllegalAccessException e) {e.printStackTrace();} catch (InvocationTargetException e) {e.printStackTrace();}lockedTo.setDetailTo(detailTo);rabbitTemplate.convertAndSend("stock-event-exchange", "stock.locked", lockedTo);lock = true;break;}}}if (!lock) throw new NoStockException(skuId);}package henu.soft.common.to.mq;import lombok.Data;@Data public class StockDetailTo {private Long id;/*** sku_id*/private Long skuId;/*** sku_name*/private String skuName;/*** 購買個(gè)數(shù)*/private Integer skuNum;/*** 工作單id*/private Long taskId;/*** 倉庫id*/private Long wareId;/*** 鎖定狀態(tài)*/private Integer lockStatus; }3.庫存服務(wù)監(jiān)聽死信隊(duì)列
package henu.soft.xiaosi.ware.listener;import com.rabbitmq.client.Channel;import henu.soft.common.to.mq.OrderTo; import henu.soft.common.to.mq.StockLockedTo; import henu.soft.xiaosi.ware.service.WareSkuService; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;import java.io.IOException;@Slf4j @Component @RabbitListener(queues = {"stock.release.stock.queue"}) public class StockReleaseListener {@Autowiredprivate WareSkuService wareSkuService;// 庫存回滾@RabbitHandlerpublic void handleStockLockedRelease(StockLockedTo stockLockedTo, Message message, Channel channel) throws IOException {log.info("************************收到庫存解鎖的消息********************************");try {wareSkuService.unlock(stockLockedTo);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {// 重新放到消息隊(duì)列,重試機(jī)制channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);}}}之前即使訂單失敗,父事務(wù)回滾了,但是分布式事務(wù)鎖庫存并未回滾,但是保存了
- 成功的庫存鎖定信息到消息隊(duì)列(只有庫存鎖定成功才會發(fā)消息到延時(shí)隊(duì)列)
- 數(shù)據(jù)庫中間表中也保存了訂單、鎖庫存關(guān)系(便于回滾)
現(xiàn)在需要庫存服務(wù)監(jiān)聽 用于 實(shí)現(xiàn)回滾,即從死信隊(duì)列獲取訂單詳情日志信息
- 再次查詢訂單表,無該訂單號,證明父事務(wù)已經(jīng)回滾,說明必須回滾鎖定的庫存,調(diào)用unlock()方法再次查詢中間表,回滾庫存
- 再次查詢訂單表,有該訂單號,說明沒有被取消,變成了支付的訂單(因?yàn)橛唵嗡佬抨?duì)列30分鐘會被訂單模塊監(jiān)聽,若是待付款則訂單號直接被關(guān)閉了,此時(shí)有訂單號,證明一定是支付過了),則無需回滾
4.訂單服務(wù)監(jiān)聽死信隊(duì)列關(guān)單
分析
- 下訂單業(yè)務(wù)成功之后,會被放到延時(shí)隊(duì)列,30分鐘后進(jìn)入死信隊(duì)列,訂單模塊監(jiān)聽死信隊(duì)列,此時(shí)需要先判斷訂單狀態(tài)
- 訂單狀態(tài)為代付款狀態(tài):需要關(guān)閉訂單
- 訂單狀態(tài)為已付款、已發(fā)貨、已完成、已取消、售后中、售后完成等狀態(tài):不需要關(guān)閉訂單
訂單狀態(tài)枚舉類
package henu.soft.xiaosi.order.enume;public enum OrderStatusEnume {CREATE_NEW(0,"待付款"),PAYED(1,"已付款"),SENDED(2,"已發(fā)貨"),RECIEVED(3,"已完成"),CANCLED(4,"已取消"),SERVICING(5,"售后中"),SERVICED(6,"售后完成");private String msg;private Integer code;public String getMsg() {return msg;}public Integer getCode() {return code;}OrderStatusEnume(Integer code, String msg){this.msg = msg;this.code = code;} }判斷關(guān)閉訂單
/*** 收到過期的訂單信息,準(zhǔn)備關(guān)閉訂單* @param orderEntity*//*** 關(guān)閉過期的的訂單* @param orderEntity*/@Overridepublic void closeOrder(OrderEntity orderEntity) throws InvocationTargetException, IllegalAccessException {//因?yàn)橄l(fā)送過來的訂單已經(jīng)是很久前的了,中間可能被改動,因此要查詢最新的訂單OrderEntity newOrderEntity = this.getById(orderEntity.getId());//如果訂單還處于新創(chuàng)建的狀態(tài),說明超時(shí)未支付,進(jìn)行關(guān)單if (newOrderEntity.getStatus() == OrderStatusEnume.CREATE_NEW.getCode()) {OrderEntity updateOrder = new OrderEntity();updateOrder.setId(newOrderEntity.getId());updateOrder.setStatus(OrderStatusEnume.CANCLED.getCode());this.updateById(updateOrder);//關(guān)單后發(fā)送消息通知其他服務(wù)進(jìn)行關(guān)單相關(guān)的操作,如解鎖庫存OrderTo orderTo = new OrderTo();BeanUtils.copyProperties(newOrderEntity,orderTo);rabbitTemplate.convertAndSend("order-event-exchange", "order.release.other",orderTo);}}現(xiàn)在出現(xiàn)的問題是
- 理論上庫存死信隊(duì)列的時(shí)長比訂單死信隊(duì)列的時(shí)長要長,即 庫存是否解鎖在 訂單狀態(tài)關(guān)閉前判斷
- 現(xiàn)在可能出現(xiàn)網(wǎng)絡(luò)延遲的原因,導(dǎo)致 庫存服務(wù)監(jiān)聽的死信隊(duì)列 先 獲得數(shù)據(jù),這時(shí)候判斷訂單狀態(tài)還存在,就沒解鎖庫存,過了一段時(shí)間才收到訂單關(guān)閉的信息,這時(shí)候庫存就會一直鎖定
解決辦法
- 在訂單模塊在設(shè)置一個(gè)路由鍵order.release.order.other.# 該路由鍵直接 轉(zhuǎn)發(fā)到 庫存的死信隊(duì)列stock.release.stock.queue,庫存服務(wù)監(jiān)聽用于解鎖
- 訂單釋放和庫存釋放直接綁定,也就是當(dāng)有訂單關(guān)閉的時(shí)候通知庫存服務(wù),再次判斷是否進(jìn)行庫存解鎖
訂單服務(wù)
庫存服務(wù)
5.訂單提交保存
4.如何保證消息的可靠性
1.消息丟失
- 消息發(fā)送出去,由于網(wǎng)絡(luò)問題沒有抵達(dá)服務(wù)器
? 做好容錯方法(try-catch),發(fā)送消息可能會網(wǎng)絡(luò)失敗,失敗后要有重試機(jī)制,可記錄到數(shù)據(jù)庫,采用定期掃描重發(fā)的方式
? 做好日志記錄,每個(gè)消息狀態(tài)是否都被服務(wù)器收到都應(yīng)該記錄
? 做好定期重發(fā),如果消息沒有發(fā)送成功,定期去數(shù)據(jù)庫掃描未成功的消息進(jìn)行重發(fā) - 消息抵達(dá)Broker,Broker要將消息寫入queue、磁盤(持久化)才算成功。此時(shí)Broker尚未持久化完成,宕機(jī)。 publisher也必須加入確認(rèn)回調(diào)機(jī)制,確認(rèn)成功的消息,修改數(shù)據(jù)庫消息狀態(tài)。
- 自動ACK的狀態(tài)下。消費(fèi)者收到消息,但沒來得及消息然后宕機(jī)
? 一定開啟手動ACK,消費(fèi)成功才移除,失敗或者沒來得及處理就noAck并重新入隊(duì)
2.消息重復(fù)
- 消息消費(fèi)成功,事務(wù)已經(jīng)提交,ack時(shí),機(jī)器宕機(jī)。導(dǎo)致沒有ack成功,Broker的消息重新由unack變?yōu)閞eady,并發(fā)送給其他消費(fèi)者
? 消息消費(fèi)失敗,由于重試機(jī)制,自動又將消息發(fā)送出去
? 成功消費(fèi),ack時(shí)宕機(jī),消息由unack變?yōu)閞eady,Broker又重新發(fā)送
? 消費(fèi)者的業(yè)務(wù)消費(fèi)接口應(yīng)該設(shè)計(jì)為冪等性的。比如扣庫存有工作單的狀態(tài)標(biāo)志 - 使用防重表(redis/mysql),發(fā)送消息每一個(gè)都有業(yè)務(wù)的唯一標(biāo)識,處理過就不用處理
- rabbitMQ的每一個(gè)消息都有redelivered字段,可以獲取是否是被重新投遞過來的,而不是第一次投遞過來的
3.消息積壓
- 消費(fèi)者宕機(jī)積壓
- 消費(fèi)者消費(fèi)能力不足積壓
- 發(fā)送者發(fā)送流量太大
? 上線更多的消費(fèi)者,進(jìn)行正常消費(fèi)
? 上線專門的隊(duì)列消費(fèi)服務(wù),將消息先批量取出來,記錄數(shù)據(jù)庫,離線慢慢處理
總結(jié)
以上是生活随笔為你收集整理的谷粒商城项目篇13_分布式高级篇_订单业务模块(提交订单幂等性、分布式事务、延时MQ实现定时任务)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 江苏大学885程序设计历年代码题题型整理
- 下一篇: 20考研北理885软工经验贴——初试篇