日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) >

Fescar 全局锁介绍

發(fā)布時(shí)間:2025/7/25 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Fescar 全局锁介绍 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

開(kāi)篇

?這篇文章的目的主要是講解TC的在處理分支事務(wù)注冊(cè)過(guò)程中對(duì)全局鎖的處理流程,理解了全局鎖以后才能明白對(duì)DB同一個(gè)記錄進(jìn)行多次變更是如何解決的。

?如上圖所示,問(wèn)最終全局事務(wù)A對(duì)資源R1應(yīng)該回滾到哪種狀態(tài)?很明顯,如果再根據(jù)UndoLog去做回滾,就會(huì)發(fā)生嚴(yán)重問(wèn)題:覆蓋了全局事務(wù)B對(duì)資源R1的變更。
?那Fescar是如何解決這個(gè)問(wèn)題呢?答案就是?Fescar的全局寫(xiě)排它鎖解決方案,在全局事務(wù)A執(zhí)行過(guò)程中全局事務(wù)B會(huì)因?yàn)楂@取不到全局鎖而處于等待狀態(tài)。


Fescar 全局鎖處理流程

RM 嘗試獲取全局鎖

public class ConnectionProxy extends AbstractConnectionProxy {public void commit() throws SQLException {if (context.inGlobalTransaction()) {try {// 1、向TC發(fā)起注冊(cè)操作并檢查是否能夠獲取全局鎖register();} catch (TransactionException e) {recognizeLockKeyConflictException(e);}try {if (context.hasUndoLog()) {UndoLogManager.flushUndoLogs(this);}// 2、執(zhí)行本地的事務(wù)的commit操作targetConnection.commit();} catch (Throwable ex) {report(false);if (ex instanceof SQLException) {throw (SQLException) ex;} else {throw new SQLException(ex);}}report(true);context.reset();} else {targetConnection.commit();}}private void register() throws TransactionException {Long branchId = DataSourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(),null, context.getXid(), context.buildLockKeys());context.setBranchId(branchId);} }

說(shuō)明:

  • RM 執(zhí)行本地事務(wù)提交操作在ConnectionProxy的commit()完成。
  • commit()的過(guò)程當(dāng)中按照register->commit的流程執(zhí)行。
  • register()過(guò)程RM會(huì)向TC發(fā)起注冊(cè)請(qǐng)求判斷是否能夠獲取全局鎖
  • register()通過(guò)DataSourceManager的branchRegister()操作完成。


TC處理全局鎖申請(qǐng)流程

public class DefaultCore implements Core {protected void doBranchRegister(BranchRegisterRequest request, BranchRegisterResponse response,RpcContext rpcContext) throws TransactionException {response.setTransactionId(request.getTransactionId());response.setBranchId(core.branchRegister(request.getBranchType(), request.getResourceId(), rpcContext.getClientId(),XID.generateXID(request.getTransactionId()), request.getLockKey()));}public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String lockKeys) throws TransactionException {GlobalSession globalSession = assertGlobalSession(XID.getTransactionId(xid), GlobalStatus.Begin);BranchSession branchSession = new BranchSession();branchSession.setTransactionId(XID.getTransactionId(xid));branchSession.setBranchId(UUIDGenerator.generateUUID());branchSession.setApplicationId(globalSession.getApplicationId());branchSession.setTxServiceGroup(globalSession.getTransactionServiceGroup());branchSession.setBranchType(branchType);branchSession.setResourceId(resourceId);branchSession.setLockKey(lockKeys);branchSession.setClientId(clientId);// 判斷branchSession是否能夠獲取鎖if (!branchSession.lock()) {throw new TransactionException(LockKeyConflict);}try {globalSession.addBranch(branchSession);} catch (RuntimeException ex) {throw new TransactionException(FailedToAddBranch);}return branchSession.getBranchId();}public boolean lock() throws TransactionException {return LockManagerFactory.get().acquireLock(this);}}

說(shuō)明:

  • TC 在處理branchRegister()的過(guò)程中會(huì)判斷branchResiter請(qǐng)求攜帶的session信息能否獲取全局鎖。
  • branchSession.lock()判斷能否獲取鎖,如果獲取失敗則拋出TransactionException(LockKeyConflict)異常。
  • branchSession.lock()判斷能否獲取鎖,如果獲取成功則將branchSession添加到全局鎖當(dāng)中。


TC 判斷全局鎖分配流程

public class DefaultLockManagerImpl implements LockManager {public boolean acquireLock(BranchSession branchSession) throws TransactionException {String resourceId = branchSession.getResourceId();long transactionId = branchSession.getTransactionId();//1、根據(jù)resourceId去LOCK_MAP獲取,獲取失敗則新增一個(gè)空的對(duì)象。ConcurrentHashMap<String, ConcurrentHashMap<Integer, Map<String, Long>>> dbLockMap = LOCK_MAP.get(resourceId);if (dbLockMap == null) {LOCK_MAP.putIfAbsent(resourceId, new ConcurrentHashMap<String, ConcurrentHashMap<Integer, Map<String, Long>>>());dbLockMap = LOCK_MAP.get(resourceId);}ConcurrentHashMap<Map<String, Long>, Set<String>> bucketHolder = branchSession.getLockHolder();// 2、獲取branchSession的全局鎖的key對(duì)象String lockKey = branchSession.getLockKey();if(StringUtils.isEmpty(lockKey)) {return true;}// 3、按照分號(hào)“;”切割多個(gè)LockKey,每個(gè)LockKey按照table:pk1;pk2;pk3格式組裝。String[] tableGroupedLockKeys = lockKey.split(";");for (String tableGroupedLockKey : tableGroupedLockKeys) {int idx = tableGroupedLockKey.indexOf(":");if (idx < 0) {branchSession.unlock();throw new ShouldNeverHappenException("Wrong format of LOCK KEYS: " + branchSession.getLockKey());}// 4、分割獲取branchRegister請(qǐng)求的表名和pks。String tableName = tableGroupedLockKey.substring(0, idx);String mergedPKs = tableGroupedLockKey.substring(idx + 1);// 5、獲取表下的已經(jīng)加鎖的記錄tableLockMap ConcurrentHashMap<Integer, Map<String, Long>> tableLockMap = dbLockMap.get(tableName);if (tableLockMap == null) {dbLockMap.putIfAbsent(tableName, new ConcurrentHashMap<Integer, Map<String, Long>>());tableLockMap = dbLockMap.get(tableName);}// 6、遍歷該表所有pks判斷是否已加鎖。String[] pks = mergedPKs.split(",");for (String pk : pks) {// 7、同一個(gè)表的pk按照hash值進(jìn)行hash分配到tableLockMap當(dāng)中。int bucketId = pk.hashCode() % BUCKET_PER_TABLE;Map<String, Long> bucketLockMap = tableLockMap.get(bucketId);if (bucketLockMap == null) {tableLockMap.putIfAbsent(bucketId, new HashMap<String, Long>());bucketLockMap = tableLockMap.get(bucketId);}// 8、根據(jù)pk去獲取bucketLockMap當(dāng)中獲取鎖對(duì)象。synchronized (bucketLockMap) {Long lockingTransactionId = bucketLockMap.get(pk);if (lockingTransactionId == null) {// No existing lock// 9、將鎖添加到branchSession當(dāng)中bucketLockMap.put(pk, transactionId);Set<String> keysInHolder = bucketHolder.get(bucketLockMap);if (keysInHolder == null) {bucketHolder.putIfAbsent(bucketLockMap, new ConcurrentSet<String>());keysInHolder = bucketHolder.get(bucketLockMap);}keysInHolder.add(pk);} else if (lockingTransactionId.longValue() == transactionId) {// Locked by mecontinue;} else {// 直接返回異常LOGGER.info("Global lock on [" + tableName + ":" + pk + "] is holding by " + lockingTransactionId);branchSession.unlock(); // Release all acquired locks.return false;}}}}return true;} }

說(shuō)明:

  • TC 判斷branchRegister操作能否獲取鎖按照維度層層遞進(jìn)判斷。
  • 1、先從ResourceId的維度進(jìn)行判斷, LOCK_MAP.get(resourceId)。
  • 2、再?gòu)膖ableName的維度進(jìn)行判斷, dbLockMap.get(tableName)。
  • 3、再?gòu)膒k的hashcode的維度進(jìn)行判斷,tableLockMap.get(bucketId)。
  • 4、在從pk的維度進(jìn)行判斷,bucketLockMap.get(pk)。
  • 5、如果按照上述維度判斷后未存在加鎖的branchSession就返回能夠加鎖,否則返回不能加鎖。
  • 6、判斷加鎖過(guò)程中處理了冪等,如果自己已經(jīng)加鎖了可以再次獲取鎖。


TM 全局鎖維護(hù)數(shù)據(jù)結(jié)構(gòu)

private static final ConcurrentHashMap<String, ConcurrentHashMap<String, ConcurrentHashMap<Integer, Map<String, Long>>>> LOCK_MAP = new ConcurrentHashMap<String, ConcurrentHashMap<String, ConcurrentHashMap<Integer, Map<String, Long>>>>();

說(shuō)明:

  • LOCK_MAP 作為T(mén)C全局鎖的保存結(jié)構(gòu)。
  • 第一層ConcurrentHashMap的key為resourceId。
  • 第二層ConcurrentHashMap的key為tableName。
  • 第三層ConcurrentHashMap的key為pk的hashCode。
  • 第四層的Map的key為pk,value為transactionId(RM攜帶過(guò)來(lái))。


參考文章

  • 分布式事務(wù)中間件Fescar—全局寫(xiě)排它鎖解讀


Fescar源碼分析連載

  • Fescar 源碼解析系列

總結(jié)

以上是生活随笔為你收集整理的Fescar 全局锁介绍的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。