seata 如何开启tcc事物_分布式事务Seata-TCC源码分析
為了更好理解分布式事務,首先提出一個問題:
假設數(shù)據(jù)庫中有兩個表ta,tb,我們要分別更改ta表中的ra記錄和tb表中的rb記錄,但要求ra和rb記錄都修改成功,才認為此次操作時成功,或者需要失敗回滾。針對這種情況處理方式很簡單,只需要使用個事務就好了。
但假如ta和tb不在一個數(shù)據(jù)庫中或者不在一個數(shù)據(jù)庫實例上,此時數(shù)據(jù)庫的事務這兩個表也是無法同時管理的,針對這種情況要如何解決了?如何保證對ta和tb操作的一致性?
此時可以通過TCC來解決上述問題,TCC通過實現(xiàn)兩階段協(xié)議,將服務流程抽象為Try-Confirm-Cancel 三個操作:
第一階段:try,主要用于對資源的預留
第二階段:comfirm/cancel,comfirm用于對預留資源的使用,對業(yè)務進行提交,cancel是對預留資源的釋放,對業(yè)務進行回滾操作
下面從三個方面介紹TCC
seata中TCC的源碼實現(xiàn)
寫好TCC實現(xiàn)的注意點
seata中TCC模式如何做到高可用的
1. seata中TCC的實現(xiàn)
seata主要由三個模塊組成
TC (Transaction Coordinator) - 事務協(xié)調者維護全局和分支事務的狀態(tài),驅動全局事務提交或回滾。
TM (Transaction Manager) - 事務管理器定義全局事務的范圍:開始全局事務、提交或回滾全局事務。
RM (Resource Manager) - 資源管理器管理分支事務處理的資源,與TC交談以注冊分支事務和報告分支事務的狀態(tài),并驅動分支事務提交或回滾。
public interface TccActionOne {
@TwoPhaseBusinessAction(name = "DubboTccActionOne" , commitMethod = "commit", rollbackMethod = "rollback")
public boolean prepare(BusinessActionContext actionContext, int a);
public boolean commit(BusinessActionContext actionContext);
public boolean rollback(BusinessActionContext actionContext);
}
public interface TccActionTwo {
@TwoPhaseBusinessAction(name = "DubboTccActionTwo" , commitMethod = "commit", rollbackMethod = "rollback")
public boolean prepare(BusinessActionContext actionContext, String b);
public boolean commit(BusinessActionContext actionContext);
public boolean rollback(BusinessActionContext actionContext);
}
同時在provider和consumer端都要引入具體的GlobalTransactionScanner,該類會對TM和RM進行初始化和注冊:
具體調用如圖:
image.jpeg
以上是簡單的使用方式,有了上面基本使用流程的介紹后,現(xiàn)在開始分析下具體的代碼實現(xiàn),首先重點關注下以下的類和注解:
GlobalTransactionScanner,用于掃描是否開啟了分布式事務,并對加了分布式事務注解的方法注入代理,如TwoPhaseBusinessAction和GlobalTransactional
注解TwoPhaseBusinessAction,表示該方法使用的TCC模式,并同時制定commit和cancel方法
注解GlobalTransactional,用于表示被修飾的方法會開啟分布式事務來進行處理
GlobalTransactionScanner通過AbstractAutoProxyCreator類,來為被分布式相關注解修飾的方法添加動態(tài)代理,所以在服務啟動時,會執(zhí)行GlobalTransactionScanner類中相關方法,
主要涉及的方法有:
GlobalTransactionScanner#initClient,初始化TM和RM客戶端
GlobalTransactionScanner#wrapIfNecessary,為添加了TwoPhaseBusinessAction和GlobalTransactional注解的方法添加代理,同時分別為修飾的方法注入TccActionInterceptor和GlobalTransactionalInterceptor代理類,同時會將本地服務作為RM客戶端注冊到TC服務端中
1.1 客戶端初始化
所以GlobalTransactionScanner是Seata客戶端的啟動類,首先看下TM和RM客戶端的初始化
TM和RM會分別初始化TmNettyRemotingClient和RmNettyRemotingClient,這個兩個類的父類都是AbstractNettyRemotingClient,在該類的init方法中,會啟動一個定時來檢查TC的channel是否存活,同時會發(fā)送注冊信息到TC中,最后會啟動netty客戶端
public void init() {
timerExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
//檢測連接TC的channel是否存活,若不存在對應channel或者channel已關閉,則會重新連接到TC,同時發(fā)送注冊信息到TC服務中
clientChannelManager.reconnect(getTransactionServiceGroup());
}
}, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
if (NettyClientConfig.isEnableClientBatchSendRequest()) {
mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
MAX_MERGE_SEND_THREAD,
KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
mergeSendExecutorService.submit(new MergedSendRunnable());
}
super.init();
clientBootstrap.start();
}
在NettyClientChannelManager#reconnect方法中,會通過獲取所有注冊到注冊中心的TC服務地址,然后判斷當前緩存NettyClientChannelManager#channels中是否存在對應地址且存活狀態(tài)的channel,若不存在,則會為該TC地址創(chuàng)建channel,同時向改地址發(fā)送注冊信息,TmNettyRemotingClient和RmNettyRemotingClient注冊信息分別為RegisterTMRequest和RegisterRMRequest,主要方法步驟是在netty.NettyClientChannelManager#doConnect中創(chuàng)建channel,然后在NettyPoolableFactory#makeObject方法中發(fā)送對應的注冊消息
而在GlobalTransactionScanner#wrapIfNecessary方法中,會為TwoPhaseBusinessAction和GlobalTransactional修飾的方法添加代理實現(xiàn)
@Override
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
try {
synchronized (PROXYED_SET) {
if (PROXYED_SET.contains(beanName)) {
return bean;
}
interceptor = null;
//check TCC proxy
if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
//TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
(ConfigurationChangeListener)interceptor);
} else {
Class> serviceInterface = SpringProxyUtils.findTargetClass(bean);
Class>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
if (!existsAnnotation(new Class[]{serviceInterface})
&& !existsAnnotation(interfacesIfJdk)) {
return bean;
}
if (interceptor == null) {
if (globalTransactionalInterceptor == null) {
globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
ConfigurationCache.addConfigListener(
ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
(ConfigurationChangeListener)globalTransactionalInterceptor);
}
interceptor = globalTransactionalInterceptor;
}
}
LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());
if (!AopUtils.isAopProxy(bean)) {
bean = super.wrapIfNecessary(bean, beanName, cacheKey);
} else {
AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
for (Advisor avr : advisor) {
advised.addAdvisor(0, avr);
}
}
PROXYED_SET.add(beanName);
return bean;
}
} catch (Exception exx) {
throw new RuntimeException(exx);
}
}
方法中會為TwoPhaseBusinessAction注解修飾的方法生成TccActionInterceptor代理,為GlobalTransactional生成GlobalTransactionalInterceptor代理。
但在TCCBeanParserUtils#isTccAutoProxy方法中若存在TwoPhaseBusinessAction注解,會通過RmNettyRemotingClient#registerResource發(fā)送注冊信息,具體方法在DefaultRemotingParser#parserRemotingServiceInfo中,個人覺得這個步驟可以去掉了有點冗余
1.2 服務端初始化
TC服務端啟動類io.seata.server.Server#main,該方法中會初始化DefaultCoordinator類,這個類是所有消息的處理類,DefaultCoordinator主要屬性如下
//各種定時任務,用來重試
private ScheduledThreadPoolExecutor retryRollbacking = new ScheduledThreadPoolExecutor(1,new NamedThreadFactory("RetryRollbacking", 1));
private ScheduledThreadPoolExecutor retryCommitting = new ScheduledThreadPoolExecutor(1,new NamedThreadFactory("RetryCommitting", 1));
private ScheduledThreadPoolExecutor asyncCommitting = new ScheduledThreadPoolExecutor(1,new NamedThreadFactory("AsyncCommitting", 1));
private ScheduledThreadPoolExecutor timeoutCheck = new ScheduledThreadPoolExecutor(1,new NamedThreadFactory("TxTimeoutCheck", 1));
private ScheduledThreadPoolExecutor undoLogDelete = new ScheduledThreadPoolExecutor(1,new NamedThreadFactory("UndoLogDelete", 1));
private RemotingServer remotingServer; //消息通信服務端
private DefaultCore core; //主要的事務處理
針對消息的處理流程,具體方法在NettyRemotingServer#registerProcessor:
private void registerProcessor() {
// 1. registry on request message processor
ServerOnRequestProcessor onRequestProcessor =
new ServerOnRequestProcessor(this, getHandler());
//處理事務提交回滾等消息
super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_SEATA_MERGE, onRequestProcessor, messageExecutor);
// 2. registry on response message processor,對分支事務提交和回滾響應結果的處理
ServerOnResponseProcessor onResponseProcessor =
new ServerOnResponseProcessor(getHandler(), getFutures());
super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT_RESULT, onResponseProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK_RESULT, onResponseProcessor, messageExecutor);
// 3. registry rm message processor,處理RM客戶端的注冊消息
RegRmProcessor regRmProcessor = new RegRmProcessor(this);
super.registerProcessor(MessageType.TYPE_REG_RM, regRmProcessor, messageExecutor);
// 4. registry tm message processor,處理TM客戶端的注冊消息
RegTmProcessor regTmProcessor = new RegTmProcessor(this);
super.registerProcessor(MessageType.TYPE_REG_CLT, regTmProcessor, null);
// 5. registry heartbeat message processor,處理客戶端的心跳消息
ServerHeartbeatProcessor heartbeatMessageProcessor = new ServerHeartbeatProcessor(this);
super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, heartbeatMessageProcessor, null);
}
從上圖可以看出注冊消息是在RegRmProcessor和RegTmProcessor中進行處理。
1.3 TCC消息處理分析
首先還是看下一開始的示例代碼:
@GlobalTransactional
public String doTransactionCommit(){
//第一個TCC 事務參與者
boolean result = tccActionOne.prepare(null, 1);
if(!result){
throw new RuntimeException("TccActionOne failed.");
}
List list = new ArrayList();
list.add("c1");
list.add("c2");
result = tccActionTwo.prepare(null, "two", list);
if(!result){
throw new RuntimeException("TccActionTwo failed.");
}
return RootContext.getXID();
}
在調用doTransactionCommit方法時,會進入到代理類GlobalTransactionalInterceptor中,最終會執(zhí)行到TransactionalTemplate#execute方法,該方法的主要邏輯如下:
beginTransaction(txInfo, tx);//開始事務
rs = business.execute(); //執(zhí)行業(yè)務代碼,即執(zhí)行doTransactionCommit方法
commitTransaction(tx); //提交事務
在開始事務beginTransaction方法中,會向TC服務發(fā)送GlobalBeginRequest消息,來獲取事務xid,該消息最終會在服務端DefaultCore#begin方法中得到處理:
通過雪花算法產生一個隨機數(shù)作為transactionId.根據(jù)transactionId生成xid,具體規(guī)則是ipAddress + ":" + port + ,":" + transactionId,ipAddress為本機ip,port為當前服務的端口
將全局事務記錄寫入global_table表中,同時返回xid,表中xid為主鍵,transactionId為索引
執(zhí)行業(yè)務代碼,業(yè)務代碼會調用遠端服務,如tccActionOne.prepare方法,由于該方法被TwoPhaseBusinessAction修飾,會執(zhí)行代理類TccActionInterceptor,在TccActionInterceptor類中的invoke方法主要邏輯如下:
String branchId = doTccActionLogStore(method, arguments, businessAction, actionContext); //注冊分支事務,同時獲取分支事務id
.....
ret.put(Constants.TCC_METHOD_RESULT, targetCallback.execute()); #執(zhí)行實際的分支事務的業(yè)務代碼,如tccActionOne.prepare方法
其中在doTccActionLogStore方法中客戶端通過發(fā)送BranchRegisterRequest消息,其中clientId和lockKeys都是null,resourceId為TwoPhaseBusinessAction中name名稱該消息最終會在服務端AbstractCore#branchRegister:
通過隨機算法生成branchId
生成一個分支記錄,將記錄插入到branch_table表中,其中branchId為主鍵
doTransactionCommit所有業(yè)務邏輯執(zhí)行成功后,GlobalTransactionalInterceptor代理類會執(zhí)行到commitTransaction方法,進行全局事務的提交,客戶端會通過DefaultTransactionManager#commit方法發(fā)送GlobalCommitRequest事務提交消息,此時服務端接收到該消息后,會通過DefaultCore#doGlobalCommit方法進行全局事務的提交,該方法的主要邏輯如下:
從存儲中獲取全局事務xid下所有分支事務記錄,為每個分支事務調用AbstractCore#branchCommitSend方法,發(fā)送BranchCommitRequest消息到對應的分支事務客戶端,來進行分支事務的commit,客戶端接收到消息后,會執(zhí)行TwoPhaseBusinessAction注解中填寫的commit方法來完成分支事務的提交
當有任何一個分支服務調用失敗時,如tccActionOne.prepare調用失敗,會回滾全局事務,然后TC服務端會回滾所有的分支事務
2. 寫好TCC實現(xiàn)的注意點
寫好一個完備的TCC的實現(xiàn)是有一定的要求,需要解決空回滾,冪等操作和懸掛問題。
空回滾
即全局事務回滾時,有可能分支事務try接口由于網絡問題并沒有被觸發(fā)或者還在處于try階段,此時TC已經觸發(fā)了分支事務的cancel,此時需要分支事務服務需要返回成功,不然會有重試,即分支事務要支持空回滾
冪登性
由于網絡抖動問題,分支事務中的try方法可能會被執(zhí)行多次,所以要保證資源不會被重復消耗,解決辦法可以通過為每一個請求維護一個唯一id,如分支事務id,來過濾重復的請求
懸掛問題
當全局事務回滾時,由于分支事務try方法執(zhí)行了較長時間,導致分支事務執(zhí)行cancel方法后,try方法才執(zhí)行成功,這樣導致被try鎖定的資源得不到釋放,解決辦法是將每個分支事務的請求記錄下來,所以當執(zhí)行try方法后,發(fā)現(xiàn)已經存在cancel的執(zhí)行記錄后,則回滾當前的try操作
3. seata中TCC模式如何做到高可用的
要做到高可用,要做到服務的無狀態(tài),為了做到這點seata做了如下工作:
存儲,TC中事務數(shù)據(jù)的存儲避免使用本地存儲,可以使用mysql等
服務發(fā)現(xiàn)與注冊,
從上文實現(xiàn)分析中,我們可以看出TC服務會將本服務的ip注冊到注冊中心,如zk,etcd等,TM和RM客戶端會拉取所有TC服務端的地址,同時將客戶端服務的ip注冊到所有TC服務中,這樣保證了每個TC服務都有所有客戶端的鏈接信息
總結
以上是生活随笔為你收集整理的seata 如何开启tcc事物_分布式事务Seata-TCC源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: godaddy修改php版本,Godad
- 下一篇: 冯雪 手术机器人的应用_未来达芬奇手术机