蚂蚁金服分布式事务框架DTX源码学习
文章目錄
- 一、前言
- 二、DTX簡(jiǎn)介
- 三、角色
- 四、服務(wù)發(fā)起者與參與者DTX客戶(hù)端啟動(dòng)流程
- 1、項(xiàng)目啟動(dòng),創(chuàng)建dtx動(dòng)態(tài)代理
- 2、初始化DtxClient客戶(hù)端的init()方法
- 五、服務(wù)發(fā)起以及參與流程
一、前言
之前因工作原因(公司購(gòu)買(mǎi)了阿里的金融中間服務(wù)),接觸一段DTX,通過(guò)反編譯看了一些DTX的源碼并記錄一些筆記,但是并不完全,由于DTX-SERVER阿里只是提供了一個(gè)很簡(jiǎn)單的測(cè)試demo,加上時(shí)間等因素,主要只是看了下客戶(hù)端的啟動(dòng)流程,至于DTX的發(fā)起以及參與流程并沒(méi)有詳細(xì)跟蹤,僅作為自己學(xué)習(xí)過(guò)程中的一點(diǎn)記錄和歷程。
二、DTX簡(jiǎn)介
1、DTX是基于螞蟻金服SOFARPC框架的分布式事務(wù)管理架構(gòu),能支持百萬(wàn)級(jí)別的分支分布式事務(wù)管理,具有大規(guī)模。高擴(kuò)展、高性能、低成本的特點(diǎn)。在跨服務(wù)分布式事務(wù)問(wèn)題的解決方案上,DTX基于兩種理論實(shí)現(xiàn)了兩種模式:基于BASE理論的TCC模式(侵入式)和基于ACID理論的FMT模式(非侵入式),這里學(xué)習(xí)的是TCC模式。
2、 TCC方案其實(shí)是兩階段提交的一種改進(jìn),將整個(gè)業(yè)務(wù)邏輯的每個(gè)分支分成了Try、Confirm、Cancel三個(gè)操作,其中Try部分完成業(yè)務(wù)的準(zhǔn)備工作、Confirm部分完成業(yè)務(wù)的提交、Cancel部分完成事務(wù)的回滾。這三步僅僅是方法論,具體在每一步的操作實(shí)現(xiàn),則由所涉及的服務(wù)自行設(shè)計(jì)代碼實(shí)現(xiàn)。以簡(jiǎn)單的A向B轉(zhuǎn)賬為例,A加錢(qián)與B減錢(qián)的操作由兩個(gè)參與方服務(wù)來(lái)實(shí)現(xiàn),A和B的兩個(gè)Try會(huì)同時(shí)進(jìn)行業(yè)務(wù)系統(tǒng)檢測(cè)和資源預(yù)留,只有兩個(gè)Try都成功了才會(huì)往下進(jìn)行Confirm操作以提交金額的增減。對(duì)于復(fù)雜的操作,還會(huì)在一個(gè)分布式事務(wù)里嵌套多層參與方,只有每層的Try都成功了,才會(huì)完成整個(gè)分布式事務(wù)的第一階段,中間一旦任何一層失敗都會(huì)回滾。
三、角色
DTX框架主要以下三個(gè)角色
1.服務(wù)參與者:以sofa-rpc方式發(fā)布服務(wù)的服務(wù)提供者,走的是bolt協(xié)議
2.DTX-SERVER:DTX事務(wù)管理控制器,是一個(gè)獨(dú)立的服務(wù),相當(dāng)于一個(gè)事務(wù)管理中間件
3.服務(wù)發(fā)起者:以sofa-rpc方式外調(diào)服務(wù)的服務(wù)消費(fèi)者,走的是bolt協(xié)議
四、服務(wù)發(fā)起者與參與者DTX客戶(hù)端啟動(dòng)流程
1、項(xiàng)目啟動(dòng),創(chuàng)建dtx動(dòng)態(tài)代理
服務(wù)發(fā)起者或參與者(sofaboot項(xiàng)目)啟動(dòng)后,spring會(huì)掃描com.alipay.sofa.dtx.client.aop.ComponentScanner類(lèi)(此類(lèi)上沒(méi)有spring組件注解,需手動(dòng)在xml中配置),創(chuàng)建dtx動(dòng)態(tài)代理
public class ComponentScanner extends AbstractAutoProxyCreator implements ApplicationContextAware,PriorityOrdered, InitializingBean, ApplicationListener<ApplicationEvent>, BeanFactoryPostProcessor { .......@Overridepublic void afterPropertiesSet() throws Exception {initDtxClient();}/*** 初始化 dtx-client* @throws Exception*/private void initDtxClient() throws Exception {//初始化dtx 客戶(hù)端DtxClientFactory.initDtxClient(applicationContext);this.inited.set(true);logger.info("DTX client init finish.");} ....... }2、初始化DtxClient客戶(hù)端的init()方法
ComponentScanner實(shí)現(xiàn)了InitializingBean接口,spring初始化bean的時(shí)候會(huì)執(zhí)行afterPropertiesSet()方法,初始化dtx 客戶(hù)端,接下來(lái)會(huì)執(zhí)行com.alipay.dtx.client.core.DtxClient類(lèi)的init()方法
public class DtxClient implements ServersRefreshHandler { ......../*** 初始化網(wǎng)絡(luò),注冊(cè)資源* @throws Exception*/private void init() throws Exception {//初始化rpc ,建立長(zhǎng)連接rpcClient = DtxClientFactory.getRpcClient();((DefaultRpcClient)rpcClient).setServersRefreshHandler(this);((DefaultRpcClient)rpcClient).setServerMessageHandler(DtxClientFactory.getServerMessageHandler(applicationContext));//dtx-rpc client initrpcClient.init();initReourceManager();//消息發(fā)送messageSender = DefaultMessageSender.getInstance();((DefaultMessageSender)messageSender).setRpcClient(rpcClient);userDefinedResourceManager.setMessageSender(messageSender);autoResourceManager.setMessageSender(messageSender);DtxServiceImpl.setDtxTM(DtxTransactionManager.getInstance());}/*** 初始化資源管理器*/private void initReourceManager(){//初始化資源管理器userDefinedResourceManager = UserDefinedResourceManager.getInstance();userDefinedResourceManager.setApplicationContext(applicationContext);userDefinedResourceManager.init(rpcClient);autoResourceManager = DtxClientFactory.getAutoResourceManager();autoResourceManager.init(rpcClient);if(CollectionUtil.isNotEmpty(serverConnections)) {userDefinedResourceManager.onServerConnectionsRefresh(serverConnections) ;autoResourceManager.onServerConnectionsRefresh(serverConnections);}} ........ }/*** user-defined resource manager** 用戶(hù)自定義資源管理** @author zhangsen**/ public class UserDefinedResourceManager extends AbstractResourceManager implements UserDefinedResourceListener,StateResolverManager { ......../*** 初始化*/public void init(IRpcClient rpcClient) {setRpcClient(rpcClient);//將所有資源注冊(cè)至所有dtx-serverregisterResource();//消息發(fā)送messageSender = DefaultMessageSender.getInstance();((DefaultMessageSender)messageSender).setRpcClient(rpcClient);//設(shè)置自定義資源監(jiān)聽(tīng)器,當(dāng)有新的自定義資源時(shí),觸發(fā)自定義資源重新想所有dtx-server注冊(cè)UserDefinedResourceHolder.getInstance().setUserDefinedResourceListener(this);inited = true;}/*** 向dtx-server注冊(cè)所有的自定義資源*/@Overridepublic synchronized boolean registerResource() {boolean result = true;//回查 資源if(stateResolverDesc != null){String resourceId = stateResolverDesc.getAppName();try {//自定義資源注冊(cè)至所有dtx-serverboolean ret = registerResourceToAllServer(resourceId, NetworkUtil.getLocalIP(), ResourceType.STATE_RESOLVER);if(ret){stateResolverHolders.put(resourceId, stateResolverDesc);}logger.info("state-resolver resource registered, register result:"+ ret +",resourceId:" + resourceId + ", state resolver detail:" + JSON.toJSONString(stateResolverDesc));result &= ret;} catch (Exception e) {logger.error("register state-resolver resource error, state resolver detail" + JSON.toJSONString(stateResolverDesc), e);result &= false;}}//自定義資源if(actions == null || actions.size() == 0){logger.warn("no user-defined resource need to register.");return false;}Map<String,UserDefinedResourceDesc> tempRegistedResourceHolders = new ConcurrentHashMap<String,UserDefinedResourceDesc>();// 自定義資源注冊(cè)for(ActionDesc action : actions) {String serviceId = action.getServiceId();UserDefinedResourceDesc userDefinedResourceHolder = new UserDefinedResourceDesc();userDefinedResourceHolder.setPrepareMethod(action.getPrepareMethod());userDefinedResourceHolder.setCommitMethod(action.getCommitMethod());userDefinedResourceHolder.setRollbackMethod(action.getRollbackMethod());userDefinedResourceHolder.setActionName(action.getActionName());userDefinedResourceHolder.setTargetBean(action.getTargetBean());userDefinedResourceHolder.setTccType(action.getTccType());userDefinedResourceHolder.setInterfaceClass(action.getInterfaceClass());userDefinedResourceHolder.setTargetBeanName(action.getTargetBeanName());userDefinedResourceHolder.setDataSourceBeanId(action.getDataSourceBeanId());try {//自定義資源注冊(cè)至所有dtx-serverboolean ret = registerResourceToAllServer(serviceId, NetworkUtil.getLocalIP(), ResourceType.USER_DEFINE);if(ret){tempRegistedResourceHolders.put(serviceId, userDefinedResourceHolder);}logger.info("user-defined resource registered, register result:"+ ret +",resourceId:" + serviceId );result &= ret;} catch (Exception e) {logger.error("register user-defined resource error", e);result &= false;}}registedResourceHolders.clear();registedResourceHolders.putAll(tempRegistedResourceHolders);return result;}........ }1、DtxClientFactory.getRpcClient()會(huì)返回一個(gè)com.alipay.dtx.rpc.client.impl.DefaultRpcClient對(duì)象(單例),用于訂閱dtx-server地址,建立到dtx-server的長(zhǎng)連接;
2、setServersRefreshHandler方法設(shè)置了dtx-server發(fā)生變化的處理器,當(dāng)dtx-server發(fā)生變化后,新建連接、 斷開(kāi)重連、dtx-server地址重新推送等一系列操作;
3、setServerMessageHandler方法設(shè)置了消息處理器com.alipay.dtx.resourceManager.handle.ServerMessageHandler,用于處理從dtx-server返回的消息;
4、initReourceManager初始化資源管理器,向dtx-server注冊(cè)自定義資源以及自動(dòng)資源并設(shè)置消息發(fā)送器DefaultMessageSender
下面進(jìn)入rpcClient.init()看下:
public class DefaultRpcClient implements IRpcClient,ServerAddressListener { ........@Overridepublic void init() throws Exception {logger.info("DTX rpc init start.");//長(zhǎng)連接管理器connectionManager = RpcConnectionManagerImpl.getInstance(serverMessageHandler);connectionManager.setServersRefreshHandler(serversRefreshHandler);//服務(wù)器地址管理serverAddressManaager = ServerAddressManager.getInstance();serverAddressManaager.setAddressListener(this);serverAddressManaager.init();inited.set(true);logger.info("dtx-rpc init finish.");}/*** 接收dtx-server的ip地址;創(chuàng)建 到dtx-server的長(zhǎng)連接*/@Overridepublic boolean onAddressMessage(List<ServerInfo> serverAdress) {logger.info("receive dtx-server address : " + JSON.toJSONString(serverAdress));boolean connResult = onServerAddressChanged(serverAdress);boolean connResultForDtxSdk = onServerAddressChangedForDtxSdk(serverAdress);return connResultForDtxSdk && connResult;}/*** 收到dtx-server地址,創(chuàng)建長(zhǎng)連接* @param serverAdress* @return*/private boolean onServerAddressChanged(List<ServerInfo> serverAdresses) {//檢查并創(chuàng)建 長(zhǎng)連接if(serverAdresses == null || serverAdresses.size() == 0 ){logger.warn("dtx-server ip list is empty.");return false;}try{//創(chuàng)建并保存連接boolean ret = connectionManager.connect(serverAdresses, RuntimeConfiguration.getServerPort());if(ret) {//dtx-server 重新鏈接完成,回調(diào)dtx-server刷新處理器serversRefreshHandler.onServerConnectionsRefresh(connectionManager.getConnections());logger.info("create long-connection success. dtx-server:" + JSON.toJSONString(serverAdresses));}else {logger.warn("creating long-connection to server has some failed. dtx-server:" + JSON.toJSONString(serverAdresses));}return ret;}catch(Throwable t){logger.error("create long-connection error,serverAdresses:" + JSON.toJSONString(serverAdresses), t);}return false;} ........ }* 建立、維護(hù) 長(zhǎng)連接 public class RpcConnectionManagerImpl implements RpcConnectionManager { ......../*** 單實(shí)例*/private static RpcConnectionManagerImpl instance;public synchronized static RpcConnectionManagerImpl getInstance(MessageHandler messageHandler){if(instance == null) {instance = new RpcConnectionManagerImpl(messageHandler);instance.init();}return instance;}/*** 初始化*/private void init(){//bolt client initSystem.setProperty(Configs.CONN_RECONNECT_SWITCH, "true");rpcClient = new RpcClient(); // rpcClient.enableReconnectSwitch();rpcClient.init();//server message listnerrpcClient.registerUserProcessor(new ServerMessageProcessor(this.messageHandler, StateResolverRequest.class.getName()));rpcClient.registerUserProcessor(new ServerMessageProcessor(this.messageHandler, BranchCommitRequest.class.getName()));rpcClient.registerUserProcessor(new ServerMessageProcessor(this.messageHandler, BranchRolbackRequest.class.getName()));rpcClient.registerUserProcessor(new JsonMsgUserProcessor(this.messageHandler));//異步批量消息發(fā)送線(xiàn)程new OnwaySender(this).start();//鑒權(quán)authorization = AuthorizationImpl.getInstance(rpcClient);//初始化完成isInited.set(true);}/*** 待批量發(fā)送的消息;*/protected final static BlockingQueue<Message> batchMessageQueue = new ArrayBlockingQueue<Message>(RpcConstants.DEFAULT_QUEUE_SIZE);/*** 異步批量消息發(fā)送線(xiàn)程** @author zhangsen**/public static class OnwaySender extends Thread {private RpcConnectionManager rpcConnectionManager;public OnwaySender(RpcConnectionManager rpcConnectionManagerImpl){this.setDaemon(true);this.setName("dtx-onway-sender");this.rpcConnectionManager = rpcConnectionManagerImpl;}@Overridepublic void run() {while (true) {try{//獲取待批量發(fā)送消息List<Message> list = new ArrayList<Message>();synchronized(batchMessageQueue) {int i = 0;while((i++) < RpcConstants.DEFAULT_BATCH_SIZE) {Message msg = batchMessageQueue.poll();if(msg != null) {list.add(msg);}else {//隊(duì)列已無(wú)數(shù)據(jù)break;}}}//消息發(fā)送if(list.size() != 0) {BatchMessage batchMessage = new BatchMessage(list);//消息發(fā)送Message ret = null;try{ret = rpcConnectionManager.invoke(null, batchMessage, RpcConstants.DTX_REQUEST_TIMEOUT*2, RpcConstants.DEFAULT_RETRY_NUM*3);logger.debug("Batch message sent, batchMessage:" + JSON.toJSONString(batchMessage) + ", result:" + JSON.toJSONString(ret));}catch(Throwable t){logger.error("Batch message sent error, batchMessage:" + JSON.toJSONString(batchMessage), t);}}else {logger.debug("Batch message sent, batchMessage: 0");}}catch(Throwable t){logger.error("Onway-sender error", t);}try{//30s 周期Thread.sleep(RpcConstants.DEFAULT_ONWAY_SENDERR_PERIOD);}catch(Exception e){logger.error("Thread.sleep error", e);}}}}/*** 獲取 或者 創(chuàng)建 長(zhǎng)連接* @param serverIP* @return*/private synchronized Connection getConnection(String serverIP, Map<String, Connection> tempConnectionMaps){if(StringUtils.isEmpty(serverIP)){return null;}//鏈接是否已經(jīng)創(chuàng)建Connection conn = tempConnectionMaps.get(serverIP);if(conn != null && conn.isFine()) {//鏈接已創(chuàng)建,無(wú)需重復(fù)創(chuàng)建return conn;}tempConnectionMaps.remove(serverIP);//需要重新創(chuàng)建鏈接String ip = new StringBuilder().append(serverIP).append(":").append(serverPort).toString();try {//創(chuàng)建或者獲取長(zhǎng)鏈接conn = rpcClient.getConnection(ip , RpcConstants.CONNECTION_TIMEOUT);//向dtx-server注冊(cè)實(shí)例id,send instanceId and clientIpClientConnectRequest connectRequest = new ClientConnectRequest(RuntimeConfiguration.getInstanceId(), NetworkUtil.getLocalIP(),RuntimeConfiguration.getZone(), RuntimeConfiguration.getDataCenter());try {connectRequest.setTracerId(RuntimeFramework.getInstance().getTracerId());} catch (Exception t) {logger.error("getTracerId error", t);}//鑒權(quán)上下文connectRequest.setAuthorizationContext(authorization.getAuthorizationContext());Response ret = (Response) rpcClient.invokeSync(conn, connectRequest, RpcConstants.CONNECTION_TIMEOUT);//鑒權(quán)if(ret != null && ret.isSuccess()) {//save connectionlogger.info("create connection to " + ip + " success.");tempConnectionMaps.put(serverIP, conn);return conn;} else {logger.error("create connection to " + ip + " failed. ret: " + JSON.toJSONString(ret));}} catch(Throwable t) {logger.error("create connection to " + ip + " error." , t);}return null;} ........ }/*** dtx-server ip 地址管理**/ public class ServerAddressManager { ......../*** 初始化地址管理器*/public void init() {try {//先從系統(tǒng)配置中獲取地址if(hasServerHosts()){List<String> l = getServerHosts() ;List<ServerInfo> serverAddress = new ArrayList<ServerInfo>();if(l != null){for( String s : l){ServerInfo sInfo = new ServerInfo();sInfo.setServerIp(s);serverAddress.add(sInfo);}}serverAddressListner.onAddressMessage(serverAddress);}else{//從注冊(cè)器中獲取AddressSubscribe.subscribeServerAddress(serverAddressListner);}}catch(Throwable t){logger.error("init dtx-server list error", t);throw new RuntimeException(ErrorCode.getErrorDesc(ErrorCode.DTX302),t);}} ........ }public class RpcClient { ......../*** Initialization.*/public void init() {if (this.addressParser == null) {this.addressParser = new RpcAddressParser();}this.connectionManager.setAddressParser(this.addressParser);this.connectionManager.init();this.rpcRemoting = new RpcClientRemoting(new RpcCommandFactory(), this.addressParser,this.connectionManager);this.taskScanner.add(this.connectionManager);this.taskScanner.start();if (globalSwitch.isOn(GlobalSwitch.CONN_MONITOR_SWITCH)) {if (monitorStrategy == null) {ScheduledDisconnectStrategy strategy = new ScheduledDisconnectStrategy();connectionMonitor = new DefaultConnectionMonitor(strategy, this.connectionManager);} else {connectionMonitor = new DefaultConnectionMonitor(monitorStrategy,this.connectionManager);}connectionMonitor.start();logger.warn("Switch on connection monitor");}if (globalSwitch.isOn(GlobalSwitch.CONN_RECONNECT_SWITCH)) {reconnectManager = new ReconnectManager(connectionManager);connectionEventHandler.setReconnectManager(reconnectManager);logger.warn("Switch on reconnect manager");}}........ }1、獲取長(zhǎng)連接管理器(單例)并初始化,觀看以上代碼可以得知,new了一個(gè)com.alipay.remoting.rpc.RpcClient對(duì)象,并初始化了這個(gè)Rpc客戶(hù)端,設(shè)置地址解析器,初始化連接管理器connectionManager,新建rpc遠(yuǎn)程客戶(hù)端對(duì)象以及RpcTaskScanner(rpc任務(wù)掃描器,定時(shí)掃描移除過(guò)期連接池連接),開(kāi)啟全局監(jiān)控以及重連開(kāi)關(guān),初始化Rpc客戶(hù)端后,向RpcConnectionFactory 的userProcessors對(duì)象中注冊(cè)用戶(hù)自定義處理器,這里的消息類(lèi)型有四種:1>狀態(tài)處理請(qǐng)求StateResolverRequest 2>分支提交請(qǐng)求 3>分支合并請(qǐng)求 4>Json字符串消息;
2、最后開(kāi)啟RpcConnectionManagerImpl 類(lèi)中的異步批量消息發(fā)送線(xiàn)程O(píng)nwaySender,不停地從batchMessageQueue 消息隊(duì)列中獲取消息,發(fā)送到dtx-server;
3、長(zhǎng)連接管理器初始化完畢后,開(kāi)始初始化服務(wù)器地址管理,首先獲取dtx-server服務(wù)器地址,然后調(diào)用DefaultRpcClient對(duì)象的onAddressMessage方法,獲取或創(chuàng)建鏈接,并向dtx-server注冊(cè)實(shí)例id,發(fā)送instanceid和clientip信息,如果建立連接成功并返回結(jié)果,則將連接放回連接池,等待下一次調(diào)用;
以下是連接管理器初始化和rpc連接工廠初始化源碼:
public class DefaultConnectionManager implements ConnectionManager, ConnectionHeartbeatManager,Scannable { ........@/*** @see com.alipay.remoting.ConnectionManager#init()*/public void init() {this.connectionEventHandler.setConnectionManager(this);this.connectionEventHandler.setConnectionEventListener(connectionEventListener);this.connectionFactory.init(connectionEventHandler);} ........ }public class RpcConnectionFactory implements ConnectionFactory { ........@Overridepublic void init(final ConnectionEventHandler connectionEventHandler) {bootstrap = new Bootstrap();bootstrap.group(workerGroup).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, SystemProperties.tcp_nodelay()).option(ChannelOption.SO_REUSEADDR, SystemProperties.tcp_so_reuseaddr()).option(ChannelOption.SO_KEEPALIVE, SystemProperties.tcp_so_keepalive());// init netty write buffer water markinitWriteBufferWaterMark();// init byte buf allocatorif (SystemProperties.netty_buffer_pooled()) {this.bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);} else {this.bootstrap.option(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT);}final boolean idleSwitch = SystemProperties.tcp_idle_switch();final int idleTime = SystemProperties.tcp_idle();final RpcHandler rpcHandler = new RpcHandler(userProcessors);final HeartbeatHandler heartbeatHandler = new HeartbeatHandler();bootstrap.handler(new ChannelInitializer<SocketChannel>() {protected void initChannel(SocketChannel channel) throws Exception {ChannelPipeline pipeline = channel.pipeline();pipeline.addLast("decoder", new RpcProtocolDecoder(RpcProtocolManager.DEFAULT_PROTOCOL_CODE_LENGTH));pipeline.addLast("encoder",new ProtocolCodeBasedEncoder(ProtocolCode.fromBytes(RpcProtocolV2.PROTOCOL_CODE)));if (idleSwitch) {pipeline.addLast("idleStateHandler", new IdleStateHandler(idleTime, idleTime,0, TimeUnit.MILLISECONDS));pipeline.addLast("heartbeatHandler", heartbeatHandler);}pipeline.addLast("connectionEventHandler", connectionEventHandler);pipeline.addLast("handler", rpcHandler);}});}@Overridepublic void registerUserProcessor(UserProcessor<?> processor) {if (processor == null || StringUtils.isBlank(processor.interest())) {throw new RuntimeException("User processor or processor interest should not be blank!");}UserProcessor<?> preProcessor = this.userProcessors.putIfAbsent(processor.interest(),processor);if (preProcessor != null) {String errMsg = "Processor with interest key ["+ processor.interest()+ "] has already been registered to rpc client, can not register again!";throw new RuntimeException(errMsg);}} ........ }為連接事件處理器設(shè)置監(jiān)聽(tīng)器,并將處理器作為參數(shù)初始化rpc連接工廠,基于netty初始化channel,監(jiān)聽(tīng)Channel的各種動(dòng)作以及狀態(tài)的改變,例如連接事件處理以及rpc事件處理,
以上則是服務(wù)發(fā)起者以及服務(wù)參與者啟動(dòng)DTX客戶(hù)端初始化流程,由于DTX-SERVER并未開(kāi)源,這里僅分析服務(wù)發(fā)起者以及服務(wù)參與者客戶(hù)端的啟動(dòng)流程。
五、服務(wù)發(fā)起以及參與流程
1、DTX的服務(wù)發(fā)起以及參與過(guò)程其實(shí)是通過(guò)DTX動(dòng)態(tài)代理以AOP的方式在服務(wù)調(diào)用前向DTX-SERVER注冊(cè)分支,以及調(diào)用結(jié)束后向DTX-SERVER發(fā)送提交或者回滾請(qǐng)求,再由DTX-SERVER去通知各個(gè)服務(wù)參與者進(jìn)行相應(yīng)的事務(wù)操作(客戶(hù)端和DTX-SERVER維持著長(zhǎng)連接),在通知的過(guò)程中,有失敗重試機(jī)制。
2、由于阿里提供的DTX-SERVER只是一個(gè)單點(diǎn)的測(cè)試demo,所以并不清楚DTX-SERVER的高可用機(jī)制
總結(jié)
以上是生活随笔為你收集整理的蚂蚁金服分布式事务框架DTX源码学习的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 王垠 java,王垠,40行代码,JAV
- 下一篇: STM32硬件基础