日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Fescar TC-commit流程

發布時間:2025/3/21 编程问答 35 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Fescar TC-commit流程 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

開篇

?這篇文章的目的主要是講解Fescar TC執行commit的流程,目的是講解清楚commit流程中的一些步驟。

?遺憾的是因為commit本身Fescar的分支事務注冊上報,如果事先不了解Fescar的分支事務,有些邏輯理解起來會有一些奇怪,對于branchSession本身還未了解,所以只能單獨講解commit流程。


背景


說明:

  • 分支事務中數據的 本地鎖 由本地事務管理,在分支事務 Phase1 結束時釋放。
    同時,隨著本地事務結束,連接 也得以釋放。
  • 分支事務中數據的 全局鎖 在事務協調器側管理,在決議 Phase2 全局提交時,全局鎖馬上可以釋放。只有在決議全局回滾的情況下,全局鎖 才被持有至分支的 Phase2 結束。

這個設計,極大地減少了分支事務對資源(數據和連接)的鎖定時間,給整體并發和吞吐的提升提供了基礎。

這里需要重點指出的是:Phase1階段的commit()操作是各個分支事務本地的事務操作。Phase2階段的操作是全局的commit()和rollback()。TC-commit流程指的就是Phase2階段。


TC commit流程介紹

  • 1.根據transactionId查找begin階段生成的GlobalSession對象。
  • 2.對GlobalSession對象進行清理操作,刪除分支事務的鎖并清理GlobalSession對象。
  • 3.TC通知所有RM(各分支事務的資源管理器)進行全局提交操作(doGlobalCommit)。


TC commit源碼分析

public class DefaultCoordinator extends AbstractTCInboundHandlerimplements TransactionMessageHandler, ResourceManagerInbound {@Overrideprotected void doGlobalCommit(GlobalCommitRequest request, GlobalCommitResponse response, RpcContext rpcContext)throws TransactionException {response.setGlobalStatus(core.commit(XID.generateXID(request.getTransactionId())));} }

說明:

  • DefaultCoordinator的doGlobalCommit()作為全局回滾入口
  • core.commit()根據XID去執行全局commit()操作。


Commit 主流程

public class DefaultCore implements Core {public GlobalStatus commit(String xid) throws TransactionException {// 1.查找GlobalSessionGlobalSession globalSession = SessionHolder.findGlobalSession(XID.getTransactionId(xid));if (globalSession == null) {return GlobalStatus.Finished;}GlobalStatus status = globalSession.getStatus();// 2.關閉全局session并執行清理工作globalSession.closeAndClean(); // Highlight: Firstly, close the session, then no more branch can be registered.// 3.執行GlobalCommit通知動作if (status == GlobalStatus.Begin) {if (globalSession.canBeCommittedAsync()) {asyncCommit(globalSession);} else {doGlobalCommit(globalSession, false);}}// 返回GlobalCommit后的狀態return globalSession.getStatus();} }

說明:

  • DefaultCore是全局回滾的核心邏輯。
  • SessionHolder.findGlobalSession查找全局的GlobalSession對象。
  • GlobalSession執行closeAndClean操作。
  • DefaultCore執行doGlobalCommit通知TC執行全局回滾操作。


查找GlobalSession

public class SessionHolder {public static GlobalSession findGlobalSession(Long transactionId) throws TransactionException {return getRootSessionManager().findGlobalSession(transactionId);} }public class DefaultSessionManager extends AbstractSessionManager {}public abstract class AbstractSessionManager implements SessionManager, SessionLifecycleListener {protected Map<Long, GlobalSession> sessionMap = new ConcurrentHashMap<>();public GlobalSession findGlobalSession(Long transactionId) throws TransactionException {return sessionMap.get(transactionId);} }

說明:

  • findGlobalSession()方法從DefaultSessionManager當中獲取GlobalSession。
  • DefaultSessionManager的父類AbstractSessionManager的findGlobalSession從sessionMap獲取GlobalSession對象。


GlobalSession的closeAndClean

public class GlobalSession implements SessionLifecycle, SessionStorable {public void closeAndClean() throws TransactionException {close();clean();}public void close() throws TransactionException {if (active) {for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {lifecycleListener.onClose(this);}}}private void clean() throws TransactionException {for (BranchSession branchSession : branchSessions) {branchSession.unlock();}} }public class DefaultSessionManager extends AbstractSessionManager {} public abstract class AbstractSessionManager implements SessionManager, SessionLifecycleListener {public void onClose(GlobalSession globalSession) throws TransactionException {globalSession.setActive(false);} }

說明:

  • GlobalSession的執行closeAndClean操作,先執行close再執行clean。
  • lifecycleListener.onClose()執行DefaultSessionManager的onClose()。
  • DefaultSessionManager的onClose()把設置active標識為false。
  • clean()操作對所有的分支事務branchSession釋放鎖。這部分邏輯比較復雜單獨列出。


BranchSession的unlock

public class BranchSession implements Lockable, Comparable<BranchSession>, SessionStorable {public boolean unlock() throws TransactionException {if (lockHolder.size() == 0) {return true;}Iterator<Map.Entry<Map<String, Long>, Set<String>>> it = lockHolder.entrySet().iterator();while (it.hasNext()) {Map.Entry<Map<String, Long>, Set<String>> entry = it.next();Map<String, Long> bucket = entry.getKey();Set<String> keys = entry.getValue();synchronized (bucket) {for (String key : keys) {Long v = bucket.get(key);if (v == null) {continue;}if (v.longValue() == getTransactionId()) {bucket.remove(key);}}}}lockHolder.clear();return true;} }

說明:

  • BranchSession的unlock()操作對BranchSession 進行清理。
  • BranchSession內部的數據由于暫未閱讀該部分代碼所以暫時不能解釋清楚。
  • 全局清除lockHolder。


TC執行GlobalCommit

public class DefaultCore implements Core {public void doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {// 遍歷所有的BranchSession執行回滾操作for (BranchSession branchSession : globalSession.getSortedBranches()) {BranchStatus currentStatus = branchSession.getStatus();if (currentStatus == BranchStatus.PhaseOne_Failed) {continue;}try {BranchStatus branchStatus = resourceManagerInbound.branchCommit(XID.generateXID(branchSession.getTransactionId()), branchSession.getBranchId(),branchSession.getResourceId(), branchSession.getApplicationData());switch (branchStatus) {case PhaseTwo_Committed:globalSession.removeBranch(branchSession);continue;case PhaseTwo_CommitFailed_Unretryable:if (globalSession.canBeCommittedAsync()) {LOGGER.error("By [{}], failed to commit branch {}", branchStatus, branchSession);continue;} else {globalSession.changeStatus(GlobalStatus.CommitFailed);globalSession.end();LOGGER.error("Finally, failed to commit global[{}] since branch[{}] commit failed",globalSession.getTransactionId(), branchSession.getBranchId());return;}default:if (!retrying) {queueToRetryCommit(globalSession);return;}if (globalSession.canBeCommittedAsync()) {LOGGER.error("By [{}], failed to commit branch {}", branchStatus, branchSession);continue;} else {LOGGER.error("Failed to commit global[{}] since branch[{}] commit failed, will retry later.",globalSession.getTransactionId(), branchSession.getBranchId());return;}}} catch (Exception ex) {LOGGER.info("Exception committing branch {}", branchSession, ex);if (!retrying) {queueToRetryCommit(globalSession);if (ex instanceof TransactionException) {throw (TransactionException) ex;} else {throw new TransactionException(ex);}}}}if (globalSession.hasBranch()) {return;}globalSession.changeStatus(GlobalStatus.Committed);globalSession.end();} }

說明:

  • 對所有的BranchSession執行branchCommit通知
  • 針對branchCommit返回狀態進行判斷,有一些邏輯在里面,后續閱讀了Branch相關資料后再補充狀態轉移圖。

總結

以上是生活随笔為你收集整理的Fescar TC-commit流程的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。