Java生鲜电商平台-SpringCloud微服务架构中分布式事务解决方案
Java生鮮電商平臺-SpringCloud微服務架構中分布式事務解決方案
?
說明:Java生鮮電商平臺中由于采用了微服務架構進行業(yè)務的處理,買家,賣家,配送,銷售,供應商等進行服務化,但是不可避免存在分布式事務的問題
業(yè)界有很多的解決方案,對此我相信大家都百度一下子就有很多,但是我巨人大哥想說的是:微服務架構中應當盡量避免分布式事務。
?
下面就是來討論下,分布式事務中主要聚焦于強一致性和最終一致性的解決方案。
微服務的發(fā)展
微服務倡導將復雜的單體應用拆分為若干個功能簡單、松耦合的服務,這樣可以降低開發(fā)難度、增強擴展性、便于敏捷開發(fā)。當前被越來越多的開發(fā)者推崇,很多互聯(lián)網(wǎng)行業(yè)巨頭、開源社區(qū)等都開始了微服務的討論和實踐。
微服務落地存在的問題
雖然微服務現(xiàn)在如火如荼,但對其實踐其實仍處于探索階段。很多中小型互聯(lián)網(wǎng)公司,鑒于經(jīng)驗、技術實力等問題,微服務落地比較困難。
如著名架構師Chris Richardson所言,目前存在的主要困難有如下幾方面:
- 單體應用拆分為分布式系統(tǒng)后,進程間的通訊機制和故障處理措施變的更加復雜。
- 系統(tǒng)微服務化后,一個看似簡單的功能,內(nèi)部可能需要調用多個服務并操作多個數(shù)據(jù)庫實現(xiàn),服務調用的分布式事務問題變的非常突出。
- 微服務數(shù)量眾多,其測試、部署、監(jiān)控等都變的更加困難。
隨著RPC框架的成熟,第一個問題已經(jīng)逐漸得到解決。例如springcloud可以非常好的支持restful調用,dubbo可以支持多種通訊協(xié)議。
對于第三個問題,隨著docker、devops技術的發(fā)展以及各公有云paas平臺自動化運維工具的推出,微服務的測試、部署與運維會變得越來越容易。
而對于第二個問題,現(xiàn)在還沒有通用方案很好的解決微服務產(chǎn)生的事務問題。分布式事務已經(jīng)成為微服務落地最大的阻礙,也是最具挑戰(zhàn)性的一個技術難題。
ACID
-
原子性(Atomicity): 一個事務的所有系列操作步驟被看成是一個動作,所有的步驟要么全部完成要么一個也不會完成,如果事務過程中任何一點失敗,將要被改變的數(shù)據(jù)庫記錄就不會被真正被改變。
-
一致性(Consistency): 數(shù)據(jù)庫的約束 級聯(lián)和觸發(fā)機制Trigger都必須滿足事務的一致性。也就是說,通過各種途徑包括外鍵約束等任何寫入數(shù)據(jù)庫的數(shù)據(jù)都是有效的,不能發(fā)生表與表之間存在外鍵約束,但是有數(shù)據(jù)卻違背這種約束性。所有改變數(shù)據(jù)庫數(shù)據(jù)的動作事務必須完成,沒有事務會創(chuàng)建一個無效數(shù)據(jù)狀態(tài),這是不同于CAP理論的一致性"consistency".
-
隔離性(Isolation): 主要用于實現(xiàn)并發(fā)控制, 隔離能夠確保并發(fā)執(zhí)行的事務能夠順序一個接一個執(zhí)行,通過隔離,一個未完成事務不會影響另外一個未完成事務。
-
持久性(Durability): 一旦一個事務被提交,它應該持久保存,不會因為和其他操作沖突而取消這個事務。很多人認為這意味著事務是持久在磁盤上,但是規(guī)范沒有特別定義這點。
一致性理論
分布式事務的目的是保障分庫數(shù)據(jù)一致性,而跨庫事務會遇到各種不可控制的問題,如個別節(jié)點永久性宕機,像單機事務一樣的 ACID 是無法奢望的。
另外,業(yè)界著名的 CAP 理論也告訴我們,對分布式系統(tǒng),需要將數(shù)據(jù)一致性和系統(tǒng)可用性、分區(qū)容忍性放在天平上一起考慮。
兩階段提交協(xié)議(簡稱2PC)是實現(xiàn)分布式事務較為經(jīng)典的方案,但 2PC 的可擴展性很差,在分布式架構下應用代價較大,eBay 架構師 Dan Pritchett 提出了 BASE 理論,用于解決大規(guī)模分布式系統(tǒng)下的數(shù)據(jù)一致性問題。
BASE 理論告訴我們:可以通過放棄系統(tǒng)在每個時刻的強一致性來換取系統(tǒng)的可擴展性。
CAP 理論
在分布式系統(tǒng)中,一致性(Consistency)、可用性(Availability)和分區(qū)容忍性(Partition Tolerance)3 個要素最多只能同時滿足兩個,不可兼得。其中,分區(qū)容忍性又是不可或缺的。
- 一致性:分布式環(huán)境下,多個節(jié)點的數(shù)據(jù)是否強一致。
- 可用性:分布式服務能一直保證可用狀態(tài)。當用戶發(fā)出一個請求后,服務能在有限時間內(nèi)返回結果。
- 分區(qū)容忍性:特指對網(wǎng)絡分區(qū)的容忍性。
舉例:Cassandra、Dynamo 等,默認優(yōu)先選擇 AP,弱化 C;HBase、MongoDB 等,默認優(yōu)先選擇 CP,弱化 A。
BASE 理論
核心思想:
- 基本可用(Basically Available):指分布式系統(tǒng)在出現(xiàn)故障時,允許損失部分的可用性來保證核心可用;
- 軟狀態(tài)(Soft state):指允許分布式系統(tǒng)存在中間狀態(tài),該中間狀態(tài)不會影響到系統(tǒng)的整體可用性;
- 最終一致性(Eventual consistency):指分布式系統(tǒng)中的所有副本數(shù)據(jù)經(jīng)過一定時間后,最終能夠達到一致的狀態(tài);
- 原子性(A)與持久性(D)必須根本保障;
- 為了可用性、性能與降級服務的需要,只有降低一致性( C ) 與 隔離性( I ) 的要求;
- 酸堿平衡(ACID-BASE Balance);
BASE 是對 CAP 中 AP 的一個擴展
一致性模型
數(shù)據(jù)的一致性模型可以分成以下三類:
- 強一致性:數(shù)據(jù)更新成功后,任意時刻所有副本中的數(shù)據(jù)都是一致的,一般采用同步的方式實現(xiàn)。
- 弱一致性:數(shù)據(jù)更新成功后,系統(tǒng)不承諾立即可以讀到最新寫入的值,也不承諾具體多久之后可以讀到。
- 最終一致性:弱一致性的一種形式,數(shù)據(jù)更新成功后,系統(tǒng)不承諾立即可以返回最新寫入的值,但是保證最終會返回上一次更新操作的值。
分布式系統(tǒng)數(shù)據(jù)的強一致性、弱一致性和最終一致性可以通過 Quorum NRW 算法分析。
本地事務
- 在單個數(shù)據(jù)庫的本地并且限制在單個進程內(nèi)的事務
- 本地事務不涉及多個數(shù)據(jù)來源
分布式事務典型方案
- 兩階段提交(2PC, Two Phase Commit)方案;
- 本地消息表 (eBay 事件隊列方案);
- TCC 補償模式;
分類:
- 兩階段型
- 補償型
- 異步確保型
- 最大努力通知型
服務模式:
- 可查詢操作
- 冪等操作
- TCC操作
- 可補償操作
兩階段提交2PC(強一致性)
基于XA協(xié)議的兩階段提交:
- 第一階段是表決階段,所有參與者都將本事務能否成功的信息反饋發(fā)給協(xié)調者;
- 第二階段是執(zhí)行階段,協(xié)調者根據(jù)所有參與者的反饋,通知所有參與者,步調一致地在所有分支上提交或者回滾;
缺點:
- 單點問題:事務管理器在整個流程中扮演的角色很關鍵,如果其宕機,比如在第一階段已經(jīng)完成,在第二階段正準備提交的時候事務管理器宕機,資源管理器就會一直阻塞,導致數(shù)據(jù)庫無法使用。
- 同步阻塞:在準備就緒之后,資源管理器中的資源一直處于阻塞,直到提交完成,釋放資源。
- 數(shù)據(jù)不一致:兩階段提交協(xié)議雖然為分布式數(shù)據(jù)強一致性所設計,但仍然存在數(shù)據(jù)不一致性的可能。比如:在第二階段中,假設協(xié)調者發(fā)出了事務 Commit 的通知,但是因為網(wǎng)絡問題該通知僅被一部分參與者所收到并執(zhí)行了 Commit 操作,其余的參與者則因為沒有收到通知一直處于阻塞狀態(tài),這時候就產(chǎn)生了數(shù)據(jù)的不一致性。
總的來說,XA 協(xié)議比較簡單,成本較低,但是其單點問題,以及不能支持高并發(fā)(由于同步阻塞)依然是其最大的弱點。
本地消息表(最終一致性)
eBay 的架構師 Dan Pritchett,曾在一篇解釋 BASE 原理的論文《Base:An Acid Alternative》中提到一個 eBay 分布式系統(tǒng)一致性問題的解決方案。
?
它的核心思想是將需要分布式處理的任務通過消息或者日志的方式來異步執(zhí)行,消息或日志可以存到本地文件、數(shù)據(jù)庫或消息隊列,再通過業(yè)務規(guī)則進行失敗重試,它要求各服務的接口是冪等的。
本地消息表與業(yè)務數(shù)據(jù)表處于同一個數(shù)據(jù)庫中,這樣就能利用本地事務來保證在對這兩個表的操作滿足事務特性,并且使用了消息隊列來保證最終一致性。
- 在分布式事務操作的一方完成寫業(yè)務數(shù)據(jù)的操作之后向本地消息表發(fā)送一個消息,本地事務能保證這個消息一定會被寫入本地消息表中;
- 之后將本地消息表中的消息轉發(fā)到 Kafka 等消息隊列中,如果轉發(fā)成功則將消息從本地消息表中刪除,否則繼續(xù)重新轉發(fā);
- 消息消費方處理這個消息,并完成自己的業(yè)務邏輯。此時如果本地事務處理成功,表明已經(jīng)處理成功了,如果處理失敗,那么就會重試執(zhí)行。如果是業(yè)務上面的失敗,可以給生產(chǎn)方發(fā)送一個業(yè)務補償消息,通知生產(chǎn)方進行回滾等操作;
優(yōu)點: 一種非常經(jīng)典的實現(xiàn),避免了分布式事務,實現(xiàn)了最終一致性。
缺點: 消息表會耦合到業(yè)務系統(tǒng)中,如果沒有封裝好的解決方案,會有很多雜活需要處理。
這個方案的核心在于第二階段的重試和冪等執(zhí)行。失敗后重試,這是一種補償機制,它是能保證系統(tǒng)最終一致的關鍵流程。
可靠消息的最終一致性代碼示例
表結構
DROP TABLE IF EXISTS `rp_transaction_message`;CREATE TABLE `rp_transaction_message` (`id` VARCHAR (50) NOT NULL DEFAULT '' COMMENT '主鍵ID',`version` INT (11) NOT NULL DEFAULT '0' COMMENT '版本號',`editor` VARCHAR (100) DEFAULT NULL COMMENT '修改者',`creater` VARCHAR (100) DEFAULT NULL COMMENT '創(chuàng)建者',`edit_time` datetime DEFAULT NULL COMMENT '最后修改時間',`create_time` datetime NOT NULL DEFAULT '0000-00-00 00:00:00' COMMENT '創(chuàng)建時間',`message_id` VARCHAR (50) NOT NULL DEFAULT '' COMMENT '消息ID',`message_body` LONGTEXT NOT NULL COMMENT '消息內(nèi)容',`message_data_type` VARCHAR (50) DEFAULT NULL COMMENT '消息數(shù)據(jù)類型',`consumer_queue` VARCHAR (100) NOT NULL DEFAULT '' COMMENT '消費隊列',`message_send_times` SMALLINT (6) NOT NULL DEFAULT '0' COMMENT '消息重發(fā)次數(shù)',`areadly_dead` VARCHAR (20) NOT NULL DEFAULT '' COMMENT '是否死亡',`status` VARCHAR (20) NOT NULL DEFAULT '' COMMENT '狀態(tài)',`remark` VARCHAR (200) DEFAULT NULL COMMENT '備注',`field1` VARCHAR (200) DEFAULT NULL COMMENT '擴展字段1',`field2` VARCHAR (200) DEFAULT NULL COMMENT '擴展字段2',`field3` VARCHAR (200) DEFAULT NULL COMMENT '擴展字段3',PRIMARY KEY (`id`),KEY `AK_Key_2` (`message_id`) ) ENGINE = INNODB DEFAULT CHARSET = utf8;public interface RpTransactionMessageService {/*** 預存儲消息.*/public int saveMessageWaitingConfirm(RpTransactionMessage rpTransactionMessage) throws MessageBizException;/*** 確認并發(fā)送消息.*/public void confirmAndSendMessage(String messageId) throws MessageBizException;/*** 存儲并發(fā)送消息.*/public int saveAndSendMessage(RpTransactionMessage rpTransactionMessage) throws MessageBizException;/*** 直接發(fā)送消息.*/public void directSendMessage(RpTransactionMessage rpTransactionMessage) throws MessageBizException;/*** 重發(fā)消息.*/public void reSendMessage(RpTransactionMessage rpTransactionMessage) throws MessageBizException;/*** 根據(jù)messageId重發(fā)某條消息.*/public void reSendMessageByMessageId(String messageId) throws MessageBizException;/*** 將消息標記為死亡消息.*/public void setMessageToAreadlyDead(String messageId) throws MessageBizException;/*** 根據(jù)消息ID獲取消息*/public RpTransactionMessage getMessageByMessageId(String messageId) throws MessageBizException;/*** 根據(jù)消息ID刪除消息*/public void deleteMessageByMessageId(String messageId) throws MessageBizException;/*** 重發(fā)某個消息隊列中的全部已死亡的消息.*/public void reSendAllDeadMessageByQueueName(String queueName, int batchSize) throws MessageBizException;/*** 獲取分頁數(shù)據(jù)*/PageBean listPage(PageParam pageParam, Map<String, Object> paramMap) throws MessageBizException;} @Service("rpTransactionMessageService") public class RpTransactionMessageServiceImpl implements RpTransactionMessageService {private static final Log log = LogFactory.getLog(RpTransactionMessageServiceImpl.class);@Autowiredprivate RpTransactionMessageDao rpTransactionMessageDao;@Autowiredprivate JmsTemplate notifyJmsTemplate;public int saveMessageWaitingConfirm(RpTransactionMessage message) {if (message == null) {throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "保存的消息為空");}if (StringUtil.isEmpty(message.getConsumerQueue())) {throw new MessageBizException(MessageBizException.MESSAGE_CONSUMER_QUEUE_IS_NULL, "消息的消費隊列不能為空 ");}message.setEditTime(new Date());message.setStatus(MessageStatusEnum.WAITING_CONFIRM.name());message.setAreadlyDead(PublicEnum.NO.name());message.setMessageSendTimes(0);return rpTransactionMessageDao.insert(message);}public void confirmAndSendMessage(String messageId) {final RpTransactionMessage message = getMessageByMessageId(messageId);if (message == null) {throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "根據(jù)消息id查找的消息為空");}message.setStatus(MessageStatusEnum.SENDING.name());message.setEditTime(new Date());rpTransactionMessageDao.update(message);notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());notifyJmsTemplate.send(new MessageCreator() {public Message createMessage(Session session) throws JMSException {return session.createTextMessage(message.getMessageBody());}});}public int saveAndSendMessage(final RpTransactionMessage message) {if (message == null) {throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "保存的消息為空");}if (StringUtil.isEmpty(message.getConsumerQueue())) {throw new MessageBizException(MessageBizException.MESSAGE_CONSUMER_QUEUE_IS_NULL, "消息的消費隊列不能為空 ");}message.setStatus(MessageStatusEnum.SENDING.name());message.setAreadlyDead(PublicEnum.NO.name());message.setMessageSendTimes(0);message.setEditTime(new Date());int result = rpTransactionMessageDao.insert(message);notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());notifyJmsTemplate.send(new MessageCreator() {public Message createMessage(Session session) throws JMSException {return session.createTextMessage(message.getMessageBody());}});return result;}public void directSendMessage(final RpTransactionMessage message) {if (message == null) {throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "保存的消息為空");}if (StringUtil.isEmpty(message.getConsumerQueue())) {throw new MessageBizException(MessageBizException.MESSAGE_CONSUMER_QUEUE_IS_NULL, "消息的消費隊列不能為空 ");}notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());notifyJmsTemplate.send(new MessageCreator() {public Message createMessage(Session session) throws JMSException {return session.createTextMessage(message.getMessageBody());}});}public void reSendMessage(final RpTransactionMessage message) {if (message == null) {throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "保存的消息為空");}if (StringUtil.isEmpty(message.getConsumerQueue())) {throw new MessageBizException(MessageBizException.MESSAGE_CONSUMER_QUEUE_IS_NULL, "消息的消費隊列不能為空 ");}message.addSendTimes();message.setEditTime(new Date());rpTransactionMessageDao.update(message);notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());notifyJmsTemplate.send(new MessageCreator() {public Message createMessage(Session session) throws JMSException {return session.createTextMessage(message.getMessageBody());}});}public void reSendMessageByMessageId(String messageId) {final RpTransactionMessage message = getMessageByMessageId(messageId);if (message == null) {throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "根據(jù)消息id查找的消息為空");}int maxTimes = Integer.valueOf(PublicConfigUtil.readConfig("message.max.send.times"));if (message.getMessageSendTimes() >= maxTimes) {message.setAreadlyDead(PublicEnum.YES.name());}message.setEditTime(new Date());message.setMessageSendTimes(message.getMessageSendTimes() + 1);rpTransactionMessageDao.update(message);notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());notifyJmsTemplate.send(new MessageCreator() {public Message createMessage(Session session) throws JMSException {return session.createTextMessage(message.getMessageBody());}});}public void setMessageToAreadlyDead(String messageId) {RpTransactionMessage message = getMessageByMessageId(messageId);if (message == null) {throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "根據(jù)消息id查找的消息為空");}message.setAreadlyDead(PublicEnum.YES.name());message.setEditTime(new Date());rpTransactionMessageDao.update(message);}public RpTransactionMessage getMessageByMessageId(String messageId) {Map<String, Object> paramMap = new HashMap<String, Object>();paramMap.put("messageId", messageId);return rpTransactionMessageDao.getBy(paramMap);}public void deleteMessageByMessageId(String messageId) {Map<String, Object> paramMap = new HashMap<String, Object>();paramMap.put("messageId", messageId);rpTransactionMessageDao.delete(paramMap);}@SuppressWarnings("unchecked")public void reSendAllDeadMessageByQueueName(String queueName, int batchSize) {log.info("==>reSendAllDeadMessageByQueueName");int numPerPage = 1000;if (batchSize > 0 && batchSize < 100) {numPerPage = 100;} else if (batchSize > 100 && batchSize < 5000) {numPerPage = batchSize;} else if (batchSize > 5000) {numPerPage = 5000;} else {numPerPage = 1000;}int pageNum = 1;Map<String, Object> paramMap = new HashMap<String, Object>();paramMap.put("consumerQueue", queueName);paramMap.put("areadlyDead", PublicEnum.YES.name());paramMap.put("listPageSortType", "ASC");Map<String, RpTransactionMessage> messageMap = new HashMap<String, RpTransactionMessage>();List<Object> recordList = new ArrayList<Object>();int pageCount = 1;PageBean pageBean = rpTransactionMessageDao.listPage(new PageParam(pageNum, numPerPage), paramMap);recordList = pageBean.getRecordList();if (recordList == null || recordList.isEmpty()) {log.info("==>recordList is empty");return;}pageCount = pageBean.getTotalPage();for (final Object obj : recordList) {final RpTransactionMessage message = (RpTransactionMessage) obj;messageMap.put(message.getMessageId(), message);}for (pageNum = 2; pageNum <= pageCount; pageNum++) {pageBean = rpTransactionMessageDao.listPage(new PageParam(pageNum, numPerPage), paramMap);recordList = pageBean.getRecordList();if (recordList == null || recordList.isEmpty()) {break;}for (final Object obj : recordList) {final RpTransactionMessage message = (RpTransactionMessage) obj;messageMap.put(message.getMessageId(), message);}}recordList = null;pageBean = null;for (Map.Entry<String, RpTransactionMessage> entry : messageMap.entrySet()) {final RpTransactionMessage message = entry.getValue();message.setEditTime(new Date());message.setMessageSendTimes(message.getMessageSendTimes() + 1);rpTransactionMessageDao.update(message);notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());notifyJmsTemplate.send(new MessageCreator() {public Message createMessage(Session session) throws JMSException {return session.createTextMessage(message.getMessageBody());}});}}@SuppressWarnings("unchecked")public PageBean<RpTransactionMessage> listPage(PageParam pageParam, Map<String, Object> paramMap) {return rpTransactionMessageDao.listPage(pageParam, paramMap);}} @Component("messageBiz") public class MessageBiz {private static final Log log = LogFactory.getLog(MessageBiz.class);@Autowiredprivate RpTradePaymentQueryService rpTradePaymentQueryService;@Autowiredprivate RpTransactionMessageService rpTransactionMessageService;/*** 處理[waiting_confirm]狀態(tài)的消息* @param messages*/public void handleWaitingConfirmTimeOutMessages(Map<String, RpTransactionMessage> messageMap) {log.debug("開始處理[waiting_confirm]狀態(tài)的消息,總條數(shù)[" + messageMap.size() + "]");// 單條消息處理(目前該狀態(tài)的消息,消費隊列全部是accounting,如果后期有業(yè)務擴充,需做隊列判斷,做對應的業(yè)務處理。)for (Map.Entry<String, RpTransactionMessage> entry : messageMap.entrySet()) {RpTransactionMessage message = entry.getValue();try {log.debug("開始處理[waiting_confirm]消息ID為[" + message.getMessageId() + "]的消息");String bankOrderNo = message.getField1();RpTradePaymentRecord record = rpTradePaymentQueryService.getRecordByBankOrderNo(bankOrderNo);// 如果訂單成功,把消息改為待處理,并發(fā)送消息if (TradeStatusEnum.SUCCESS.name().equals(record.getStatus())) {// 確認并發(fā)送消息rpTransactionMessageService.confirmAndSendMessage(message.getMessageId());} else if (TradeStatusEnum.WAITING_PAYMENT.name().equals(record.getStatus())) {// 訂單狀態(tài)是等到支付,可以直接刪除數(shù)據(jù)log.debug("訂單沒有支付成功,刪除[waiting_confirm]消息id[" + message.getMessageId() + "]的消息");rpTransactionMessageService.deleteMessageByMessageId(message.getMessageId());}log.debug("結束處理[waiting_confirm]消息ID為[" + message.getMessageId() + "]的消息");} catch (Exception e) {log.error("處理[waiting_confirm]消息ID為[" + message.getMessageId() + "]的消息異常:", e);}}}/*** 處理[SENDING]狀態(tài)的消息* @param messages*/public void handleSendingTimeOutMessage(Map<String, RpTransactionMessage> messageMap) {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");log.debug("開始處理[SENDING]狀態(tài)的消息,總條數(shù)[" + messageMap.size() + "]");// 根據(jù)配置獲取通知間隔時間Map<Integer, Integer> notifyParam = getSendTime();// 單條消息處理for (Map.Entry<String, RpTransactionMessage> entry : messageMap.entrySet()) {RpTransactionMessage message = entry.getValue();try {log.debug("開始處理[SENDING]消息ID為[" + message.getMessageId() + "]的消息");// 判斷發(fā)送次數(shù)int maxTimes = Integer.valueOf(PublicConfigUtil.readConfig("message.max.send.times"));log.debug("[SENDING]消息ID為[" + message.getMessageId() + "]的消息,已經(jīng)重新發(fā)送的次數(shù)[" + message.getMessageSendTimes() + "]");// 如果超過最大發(fā)送次數(shù)直接退出if (maxTimes < message.getMessageSendTimes()) {// 標記為死亡rpTransactionMessageService.setMessageToAreadlyDead(message.getMessageId());continue;}// 判斷是否達到發(fā)送消息的時間間隔條件int reSendTimes = message.getMessageSendTimes();int times = notifyParam.get(reSendTimes == 0 ? 1 : reSendTimes);long currentTimeInMillis = Calendar.getInstance().getTimeInMillis();long needTime = currentTimeInMillis - times * 60 * 1000;long hasTime = message.getEditTime().getTime();// 判斷是否達到了可以再次發(fā)送的時間條件if (hasTime > needTime) {log.debug("currentTime[" + sdf.format(new Date()) + "],[SENDING]消息上次發(fā)送時間[" + sdf.format(message.getEditTime()) + "],必須過了[" + times + "]分鐘才可以再發(fā)送。");continue;}// 重新發(fā)送消息rpTransactionMessageService.reSendMessage(message);log.debug("結束處理[SENDING]消息ID為[" + message.getMessageId() + "]的消息");} catch (Exception e) {log.error("處理[SENDING]消息ID為[" + message.getMessageId() + "]的消息異常:", e);}}}/*** 根據(jù)配置獲取通知間隔時間* @return*/private Map<Integer, Integer> getSendTime() {Map<Integer, Integer> notifyParam = new HashMap<Integer, Integer>();notifyParam.put(1, Integer.valueOf(PublicConfigUtil.readConfig("message.send.1.time")));notifyParam.put(2, Integer.valueOf(PublicConfigUtil.readConfig("message.send.2.time")));notifyParam.put(3, Integer.valueOf(PublicConfigUtil.readConfig("message.send.3.time")));notifyParam.put(4, Integer.valueOf(PublicConfigUtil.readConfig("message.send.4.time")));notifyParam.put(5, Integer.valueOf(PublicConfigUtil.readConfig("message.send.5.time")));return notifyParam;}} public class AccountingMessageListener implements SessionAwareMessageListener<Message> {private static final Log LOG = LogFactory.getLog(AccountingMessageListener.class);/*** 會計隊列模板(由Spring創(chuàng)建并注入進來)*/@Autowiredprivate JmsTemplate notifyJmsTemplate;@Autowiredprivate RpAccountingVoucherService rpAccountingVoucherService;@Autowiredprivate RpTransactionMessageService rpTransactionMessageService;public synchronized void onMessage(Message message, Session session) {RpAccountingVoucher param = null;String strMessage = null;try {ActiveMQTextMessage objectMessage = (ActiveMQTextMessage) message;strMessage = objectMessage.getText();LOG.info("strMessage1 accounting:" + strMessage);param = JSONObject.parseObject(strMessage, RpAccountingVoucher.class);// 這里轉換成相應的對象還有問題if (param == null) {LOG.info("param參數(shù)為空");return;}int entryType = param.getEntryType();double payerChangeAmount = param.getPayerChangeAmount();String voucherNo = param.getVoucherNo();String payerAccountNo = param.getPayerAccountNo();int fromSystem = param.getFromSystem();int payerAccountType = 0;if (param.getPayerAccountType() != null && !param.getPayerAccountType().equals("")) {payerAccountType = param.getPayerAccountType();}double payerFee = param.getPayerFee();String requestNo = param.getRequestNo();double bankChangeAmount = param.getBankChangeAmount();double receiverChangeAmount = param.getReceiverChangeAmount();String receiverAccountNo = param.getReceiverAccountNo();String bankAccount = param.getBankAccount();String bankChannelCode = param.getBankChannelCode();double profit = param.getProfit();double income = param.getIncome();double cost = param.getCost();String bankOrderNo = param.getBankOrderNo();int receiverAccountType = 0;double payAmount = param.getPayAmount();if (param.getReceiverAccountType() != null && !param.getReceiverAccountType().equals("")) {receiverAccountType = param.getReceiverAccountType();}double receiverFee = param.getReceiverFee();String remark = param.getRemark();rpAccountingVoucherService.createAccountingVoucher(entryType, voucherNo, payerAccountNo, receiverAccountNo, payerChangeAmount, receiverChangeAmount, income, cost, profit, bankChangeAmount, requestNo, bankChannelCode, bankAccount, fromSystem, remark, bankOrderNo, payerAccountType, payAmount, receiverAccountType, payerFee, receiverFee);//刪除消息rpTransactionMessageService.deleteMessageByMessageId(param.getMessageId());} catch (BizException e) {// 業(yè)務異常,不再寫會隊列LOG.error("==>BizException", e);} catch (Exception e) {// 不明異常不再寫會隊列LOG.error("==>Exception", e);}}public JmsTemplate getNotifyJmsTemplate() {return notifyJmsTemplate;}public void setNotifyJmsTemplate(JmsTemplate notifyJmsTemplate) {this.notifyJmsTemplate = notifyJmsTemplate;}public RpAccountingVoucherService getRpAccountingVoucherService() {return rpAccountingVoucherService;}public void setRpAccountingVoucherService(RpAccountingVoucherService rpAccountingVoucherService) {this.rpAccountingVoucherService = rpAccountingVoucherService;}}
與常規(guī)MQ的ACK機制對比
常規(guī)MQ確認機制:
- Producer生成消息并發(fā)送給MQ(同步、異步);
- MQ接收消息并將消息數(shù)據(jù)持久化到消息存儲(持久化操作為可選配置);
- MQ向Producer返回消息的接收結果(返回值、異常);
- Consumer監(jiān)聽并消費MQ中的消息;
- Consumer獲取到消息后執(zhí)行業(yè)務處理;
- Consumer對已成功消費的消息向MQ進行ACK確認(確認后的消息將從MQ中刪除);
常規(guī)MQ隊列消息的處理流程無法實現(xiàn)消息發(fā)送一致性,因此直接使用現(xiàn)成的MQ中間件產(chǎn)品無法實現(xiàn)可靠消息最終一致性的分布式事務解決方案
消息發(fā)送一致性:是指產(chǎn)生消息的業(yè)務動作與消息發(fā)送的一致。也就是說,如果業(yè)務操作成功,那么由這個業(yè)務操作所產(chǎn)生的消息一定要成功投遞出去(一般是發(fā)送到kafka、rocketmq、rabbitmq等消息中間件中),否則就丟消息。
下面用偽代碼進行演示消息發(fā)送和投遞的不可靠性:
先進行數(shù)據(jù)庫操作,再發(fā)送消息:
public void test1(){ //1 數(shù)據(jù)庫操作 //2 發(fā)送MQ消息 }這種情況下無法保證數(shù)據(jù)庫操作與發(fā)送消息的一致性,因為可能數(shù)據(jù)庫操作成功,發(fā)送消息失敗。
先發(fā)送消息,再操作數(shù)據(jù)庫:
public void test1(){ //1 發(fā)送MQ消息 //2 數(shù)據(jù)庫操作 }這種情況下無法保證數(shù)據(jù)庫操作與發(fā)送消息的一致性,因為可能發(fā)送消息成功,數(shù)據(jù)庫操作失敗。
在數(shù)據(jù)庫事務中,先發(fā)送消息,后操作數(shù)據(jù)庫:
這里使用spring 的@Transactional注解,方法里面的操作都在一個事務中。同樣無法保證一致性,因為發(fā)送消息成功了,數(shù)據(jù)庫操作失敗的情況下,數(shù)據(jù)庫操作是回滾了,但是MQ消息沒法進行回滾。
在數(shù)據(jù)庫事務中,先操作數(shù)據(jù)庫,后發(fā)送消息:
這種情況下,貌似沒有問題,如果發(fā)送MQ消息失敗,拋出異常,事務一定會回滾(加上了@Transactional注解后,spring方法拋出異常后,會自動進行回滾)。
這只是一個假象,因為發(fā)送MQ消息可能事實上已經(jīng)成功,如果是響應超時導致的異常。這個時候,數(shù)據(jù)庫操作依然回滾,但是MQ消息實際上已經(jīng)發(fā)送成功,導致不一致。
與消息發(fā)送一致性流程的對比:
- 常規(guī)MQ隊列消息的處理流程無法實現(xiàn)消息發(fā)送一致性;
- 投遞消息的流程其實就是消息的消費流程,可細化;
TCC (Try-Confirm-Cancel)補償模式(最終一致性)
TCC 其實就是采用的補償機制,其核心思想是:針對每個操作,都要注冊一個與其對應的確認和補償(撤銷)操作。
它分為三個階段:
- Try 階段主要是對業(yè)務系統(tǒng)做檢測及資源預留
- Confirm 階段主要是對業(yè)務系統(tǒng)做確認提交,Try階段執(zhí)行成功并開始執(zhí)行 Confirm階段時,默認 Confirm階段是不會出錯的。即:只要Try成功,Confirm一定成功。
- Cancel 階段主要是在業(yè)務執(zhí)行錯誤,需要回滾的狀態(tài)下執(zhí)行的業(yè)務取消,預留資源釋放。
舉例(Bob 要向 Smith 轉賬):
- 首先在 Try 階段,要先調用遠程接口把 Smith 和 Bob 的錢給凍結起來。
- 在 Confirm 階段,執(zhí)行遠程調用的轉賬的操作,轉賬成功進行解凍。
- 如果第2步執(zhí)行成功,那么轉賬成功,如果第二步執(zhí)行失敗,則調用遠程凍結接口對應的解凍方法 (Cancel)。
優(yōu)點:
跟2PC比起來,實現(xiàn)以及流程相對簡單了一些,但數(shù)據(jù)的一致性比2PC也要差一些
缺點:
缺點還是比較明顯的,在2,3步中都有可能失敗。TCC屬于應用層的一種補償方式,所以需要程序員在實現(xiàn)的時候多寫很多補償?shù)拇a,在一些場景中,一些業(yè)務流程可能用TCC不太好定義及處理。
可靠消息最終一致(常用)
不要用本地的消息表了,直接基于MQ來實現(xiàn)事務。比如阿里的RocketMQ就支持消息事務。
可靠消息最終一致性方案大概流程:
- A系統(tǒng)先發(fā)送一個prepared消息到mq,如果這個prepared消息發(fā)送失敗那么就直接取消操作別執(zhí)行了
- 如果這個消息發(fā)送成功過了,那么接著執(zhí)行本地事務,如果成功就告訴mq發(fā)送確認消息,如果失敗就告訴mq回滾消息
- 如果發(fā)送了確認消息,那么此時B系統(tǒng)會接收到確認消息,然后執(zhí)行本地的事務
- mq會自動定時輪詢所有prepared消息回調你的接口,問你,這個消息是不是本地事務處理失敗了,所有沒發(fā)送確認消息?那是繼續(xù)重試還是回滾?一般來說這里你就可以查下數(shù)據(jù)庫看之前本地事務是否執(zhí)行,如果回滾了,那么這里也回滾吧。這個就是避免可能本地事務執(zhí)行成功了,別確認消息發(fā)送失敗了。
這個方案里,要是系統(tǒng)B的事務失敗了咋辦?重試咯,自動不斷重試直到成功,如果實在是不行,要么就是針對重要的資金類業(yè)務進行回滾,比如B系統(tǒng)本地回滾后,想辦法通知系統(tǒng)A也回滾;或者是發(fā)送報警由人工來手工回滾和補償
目前國內(nèi)互聯(lián)網(wǎng)公司大都是這么玩兒的,要不你使用RocketMQ支持的,要不你就基于其他MQ中間件自己封裝一套類似的邏輯,總之思路就是這樣的。
最大努力通知
業(yè)務發(fā)起方將協(xié)調服務的消息發(fā)送到MQ,下游服務接收此消息,如果處理失敗,將進行重試,重試N次后依然失敗,將不進行重試,放棄處理,這個應用場景要求對事物性要求不高的地方。
最大努力通知方案?
最終總結:
? ? ? 需要討論與學習,請加QQ群:793305035
??
轉載于:https://www.cnblogs.com/jurendage/p/11353968.html
總結
以上是生活随笔為你收集整理的Java生鲜电商平台-SpringCloud微服务架构中分布式事务解决方案的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 招商银行FGO联名信用卡第二期上线 卡片
- 下一篇: Java生鲜电商平台-SpringClo