分库分表的事务处理机制
轉載自?分庫分表的事務處理機制
分布式事務
?
由于我們將單表的數據切片后存儲在多個數據庫甚至多個數據庫實例中,所以依靠數據庫本身的事務機制不能滿足所有場景的需要。但是,我們推薦在一個數據庫實例中的操作盡可能使用本地事務來保證一致性,跨數據庫實例的一系列更新操作需要根據事務路由在不同的數據源中完成,各個數據源之間的更新操作需要通過分布式事務處理。
?
這里只介紹實現分布式操作一致性的幾個主流思路,保證分布式事務一致性的具體方法請參考《分布式服務架構:原理、設計與實戰》中第2章的內容。
主流的分布式事務解決方案有三種:兩階段提交協議、最大努力保證模式和事務補償機制。
?
1兩階段提交協議
?
兩階段提交協議將分布式事務分為兩個階段,一個是準備階段,一個是提交階段,兩個階段都由事務管理器發起。基于兩階段提交協議,事務管理器能夠最大限度地保證跨數據庫操作的事務的原子性,是分布式系統環境下最嚴格的事務實現方法。符合J2EE規范的AppServer(例如:Websphere、Weblogic、
Jboss等)對關系型數據庫數據源和消息隊列都實現了兩階段提交協議,只需在使用時配置即可。如圖3-9所示。
?
?
?
但是,兩階段提交協議也帶來了性能方面的問題,難于進行水平伸縮,因為在提交事務的過程中,事務管理器需要和每個參與者進行準備和提交的操作的協調,在準備階段鎖定資源,在提交階段消費資源,但是由于參與者較多,鎖定資源和消費資源之間的時間差被拉長,導致響應速度較慢,在此期間產生死鎖或者不確定結果的可能性較大。因此,在互聯網行業里,為了追求性能的提升,很少使用兩階段提交協議。
?
另外,由于兩階段提交協議是阻塞協議,在極端情況下不能快速響應請求方,因此有人提出了三階段提交協議,解決了兩階段提交協議的阻塞問題,但仍然需要事務管理器在參與者之間協調,才能完成一個分布式事務。
?
?
2最大努力保證模式
?
這是一種非常通用的保證分布式一致性的模式,很多開發人員一直在使用,但是并未意識到這是一種模式。最大努力保證模式適用于對一致性要求并不十分嚴格但是對性能要求較高的場景。
?
具體的實現方法是,在更新多個資源時,將多個資源的提交盡量延后到最后一刻處理,這樣的話,如果業務流程出現問題,則所有的資源更新都可以回滾,事務仍然保持一致。唯一可能出現問題的情況是在提交多個資源時發生了系統問題,比如網絡問題等,但是這種情況是非常罕見的,一旦出現這種情況,就需要進行實時補償,將已提交的事務進行回滾,這和我們常說的TCC模式有些類似。
?
下面是使用最大努力保證模式的一個樣例,在該樣例中涉及兩個操作,一個是從消息隊列消費消息,一個是更新數據庫,需要保證分布式的一致性。
-
開始消息事務。
-
開始數據庫事務。
-
接收消息。
-
更新數據庫。
-
提交數據庫事務。
-
提交消息事務。
這時,從第1步到第4步并不是很關鍵,關鍵的是第5步和第6步,需要將其放在最后一起提交,盡最大努力保證前面的業務處理的一致性。到了第5步和第6步,業務邏輯處理完成,這時只可能發生系統錯誤,如果第5步失敗,則可以將消息隊列和數據庫事務全部回滾,保持一致。如果第5步成功,第6步遇到了網絡超時等問題,則這是唯一可能產生問題的情況,在這種情況下,消息的消費過程并沒有被提交到消息隊列,消息隊列可能會重新發送消息給其他消息處理服務,這會導致消息被重復消費,但是可以通過冪等處理來保證消除重復消息帶來的影響。
?
當然,在使用這種模式時,我們要充分考慮每個資源的提交順序。我們在生產實踐中遇到的一種反模式,就是在數據庫事務中嵌套遠程調用,而且遠程調用是耗時任務,導致數據庫事務被拉長,最后拖垮數據庫。因此,上面的案例涉及的是消息事務嵌套數據庫事務,在這里必須進行充分評估和設計,才可以規避事務風險。
?
3事務補償機制
?
顯然,在對性能要求很高的場景中,兩階段提交協議并不是一種好方案,最大努力保證模式也會使多個分布式操作互相嵌套,有可能互相影響。這里,我們給出事務補償機制,其性能很高,并且能夠盡最大可能地保證事務的最終一致性。
?
在數據庫分庫分表后,如果涉及的多個更新操作在某一個數據庫范圍內完成,則可以使用數據庫內的本地事務保證一致性;對于跨庫的多個操作,可通過補償和重試,使其在一定的時間窗口內完成操作,這樣就可以實現事務的最終一致性,突破事務遇到問題就滾回的傳統思路。
?
如果采用事務補償機制,則在遇到問題時,我們需要記錄遇到問題的環境、信息、步驟、狀態等,后續通過重試機制使其達到最終一致性,詳細內容可以參考《分布式服務架構:原理、設計與實戰》第2章,徹底理解ACID原理、CAP理論、BASE原理、最終一致性模式等內容。
?
事務路由
?
無論使用上面哪種方法實現分布式事務,都需要對分庫分表的多個數據源路由事務,一般通過對Spring環境的配置,為不同的數據源配置不同的事務管理器(TransactionManager),這樣,如果更新操作在一個數據庫實例內發生,便可以使用數據源的事務來處理。對于跨數據源的事務,可通過在應用層使用最大努力保證模式和事務補償機制來達成事務的一致性。當然,有時我們需要通過編寫程序來選擇數據庫的事務管理器,根據實現方式的不同,可將事務路由具體分為以下三種。
?
1自動提交事務路由
自動提交事務通過依賴JDBC數據源的自動提交事務特性,對任何數據庫進行更新操作后會自動提交事務,不需要開發人員手工操作事務,也不需要配置事務,實現起來很簡單,但是只能滿足簡單的業務邏輯需求。
?
在通常情況下,JDBC在連接創建后默認設置自動提交為true,當然,也可以在獲取連接后手工修改這個屬性,代碼如下:
?
connnection conn =?null; ? try{ ?conn = getConnnection(); ?conn.setAutoCommit(true); ?// 數據庫操作……………………………conn.commit(); ? }catch(Throwable e){ ?if(conn!=null){ ?try?{ ?conn.rollback(); ?}?catch?(SQLException e1) { ?e1.printStackTrace(); ?} ?} ?throw?new?RuntimeException(e); ? }finally{ ?if(conn!=null){ ?try?{ ?conn.close(); ?}?catch?(SQLException e) { ?e.printStackTrace(); ?} ?} ? }?
我們基本不需要使用原始的JDBC API來改變這些屬性,這些操作一般都會被封裝在我們使用的框架中。3.6節介紹的開源數據庫分庫分表框架dbsplit默認使用的就是這種模式。
?
2可編程事務路由
?
我們在應用中通常采用Spring的聲明式的事務來管理數據庫事務,在分庫分表時,事務處理是個問題,在一個需要開啟事務的方法中,需要動態地確定開啟哪個數據庫實例的事務,也就是說在每個開啟事務的方法調用前就必須確定開啟哪個數據源的事務。下面使用偽代碼來說明如何實現一個可編程事務路由的小框架。
?
首先,通過Spring配置文件展示可編程事務小框架是怎么使用的:
<?xml version="1.0?> <beans><bean?id="sharding-db-trx0"class="org.springframework.jdbc.datasource.Data SourceTransactionManager"><property?name="dataSource"><ref?bean="sharding-db0"?/></property></bean><bean?id="sharding-db-trx1"class="org.springframework.jdbc.datasource.DataSourceTransactionMana ger"><property?name="dataSource"><ref?bean="sharding-db1"?/></property></bean><bean?id="sharding-db-trx2"class="org.springframework.jdbc.datasource.DataSourceTransactionMana ger"><property?name="dataSource"><ref?bean="sharding-db2"?/></property></bean><bean?id="sharding-db-trx3"class="org.springframework.jdbc.datasource.DataSourceTransactionMana ger"><property?name="dataSource"><ref?bean="sharding-db3"?/></property></bean><bean?id="shardingTransactionManager"?class="com.robert.dbsplit.core. ShardingTransactionManager"><property?name="proxyTransactionManagers"><map?value-type="org.springframework.transaction.PlatformTran sactionManager"><entry?key="sharding0"?value-ref="sharding-db-trx0"?/><entry?key="sharding1"?value-ref="sharding-db-trx1"?/><entry?key="sharding2"?value-ref="sharding-db-trx2"?/><entry?key="sharding3"?value-ref="sharding-db-trx3"?/></map></property></bean><aop:config><aop:advisor?advice-ref="txAdvice"?pointcut="execution(* com.robert.biz.*insert(..))"/><aop:advisor?advice-ref="txAdvice"?pointcut="execution(* com.robert.biz.*update(..))"/><aop:advisor?advice-ref="txAdvice"?pointcut="execution(* com.robert.biz.*delete(..))"/></aop:config><tx:advice?id="txAdvice"?transaction-manager="shardingTransactionManager"><tx:attributes><tx:method?name="*"?rollback-for="java.lang.Exception"/></tx:attributes></tx:advice></beans>?
這里使用Spring環境的aop和tx標簽來攔截com.robert.biz包下的所有插入、更新和刪除的方法,當指定的包的方法被調用時,就會使用Spring提供的事務Advice,Spring的事務Advice(tx:advice)會使用事務管理器來控制事務,如果某個方法發生了異常,那么Spring的事務Advice就會使shardingTransactionManager回滾相應的事務。
?
我們看到shardingTransactionManager的類型是ShardingTransactionManager,這個類型是我們開發的一個組合的事務管理器,這個事務管理器聚合了所有分片數據庫的事務管理器對象,然后根據某個標記來路由到不同的事務管理器中,這些事務管理器用來控制各個分片的數據源的事務。
?
這里的標記是什么呢?我們在調用方法時,會提前把分片的標記放進ThreadLocal中,然后在ShardingTransactionManager的getTransaction方法被調用時,取得ThreadLocal中存的標記,最后根據標記來判斷使用哪個分片數據庫的事務管理器對象。
?
為了通過標記路由到不同的事務管理器,我們設計了一個專門的ShardingContextHolder類,在該類的內部使用了一個ThreadLocal類來指定分片數據庫的關鍵字,在ShardingTransaction Manager中通過取得這個標記來選擇具體的分片數據庫的事務管理器對象。因此,這個類提供了setShard和getShard的方法,setShard用于使用者編程指定使用哪個分片數據庫的事務管理器,而getShard用于ShardingTransactionManager獲取標記并取得分片數據庫的事務管理器對象。相關代碼如下:
?
public?class?ShardingContextHolder<T> {private?static?final ThreadLocal shardHolder =?new?ThreadLocal();public?static?<T>?void?setShard(T shard)?{Validate.notNull(shard,?"請指定某個分片數據庫!");shardHolder.set(shard);}public?static?<T>?T?getShard()?{return?(T) shardHolder.get();} }?
有了ShardingContextHolder類后,我們就可以在ShardingTransactionManager中根據給定的分片配置將事務操控權路由到不同分片的數據庫的事務管理器上,實現很簡單,如果在ThreadLocal中存儲了某個分片數據庫的事務管理器的關鍵字,就使用那個分片的數據庫的事務管理器:
?
public?class?ShardingTransactionManager?implements?PlatformTransactionManager?{private?Map<Object, PlatformTransactionManager> proxyTransactionManagers =new?HashMap<Object, PlatformTransactionManager>();protected?PlatformTransactionManager?getTargetTransactionManager()?{Object shard = ShardingContextHolder.getShard();Validate.notNull(shard,?"必須指定一個路由的shard!");return?targetTransactionManagers.get(shard);}public?void?setProxyTransactionManagers(Map<Object, PlatformTransaction Manager> targetTransactionManagers)?{this.targetTransactionManagers = targetTransactionManagers;}public?void?commit(TransactionStatus status)?throws?TransactionException?{getProxyTransactionManager().commit(status);}public?TransactionStatus?getTransaction(TransactionDefinition definition)?throws?TransactionException?{return?getProxyTransactionManager().getTransaction(definition);}public?void?rollback(TransactionStatus status)?throws?TransactionException?{getProxyTransactionManager().rollback(status);} }?
有了這些使用類,我們的可編程事務路由小框架就實現了,這樣在某個具體的服務開始之前,我們就可以使用如下代碼來控制使用某個分片的數據庫的事務管理器了:
RoutingContextHolder.setShard("sharding0"); return?userService.create(user);?
3聲明式事務路由
在上一小節實現了可編程事務路由的小框架,這個小框架通過讓開發人員在ThreadLocal中指定數據庫分片并編程實現。大多數分庫分表框架會實現聲明式事務路由,也就是在實現的服務方法上直接聲明事務的處理注解,注解包含使用哪個數據庫分片的事務管理器的信息,這樣,開發人員就可以專注于業務邏輯的實現,把事務處理交給框架來實現。
?
下面是筆者在實際的線上項目中實現的聲明式事務路由的一個使用實例:
?
@TransactionHint(table =?"INVOICE", keyPath =?"0.accountId")public?void?persistInvoice(Invoice invoice)?{// Save invoice to DBthis.createInvoice(invoice);for?(InvoiceItem invoiceItem : invoice.getItems()) {invoiceItem.setInvId(invoice.getId());invoiceItemService.createInvoiceItem(invoice.getAccountId(), invoiceItem);}// Save invoice to cacheinvoiceCacheService.set(invoice.getAccountId(), invoice.getInvPeriodStart().getTime(), invoice.getInvPeriodEnd().getTime(),invoice);// Update last invoice date to AccountAccount account =?new?Account();account.setId(invoice.getAccountId());account.setLstInvDate(invoice.getInvPeriodEnd());accountService.updateAccount(account);}?
在這個實例中,我們開發了一個持久發票的服務方法。持久發票的服務方法用來保存發票信息和發票項的詳情信息,這里,發票與發票項這兩個領域對象具有父子結構關系。由于在設計過程中通過賬戶ID對這個父子表進行分庫分表,因此在進行事務路由時,也需要通過賬戶ID控制使用哪個數據庫分片的事務管理器。在這個實例中,我們配置了?TransactionHint,TransactionHint的聲明如下:
@Target({ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) @Documented public?@interface?TransactionHint {String?table()?default?"";String?keyPath()?default?""; }可以看到,TransactionHint包含了兩個屬性,第1個屬性table指定這次操作涉及分片的數據庫表,第2個屬性指定這次操作根據哪個參數的哪個字段進行分片路由。該實例通過table指定了INVOICE表,并通過keyPath指定了使用第1個參數的字段accountId作為路由的關鍵字。
?
這里的實現與可編程事務路由的小框架實現類似,在方法persistInvoice被調用時,根據TransactionHint提供的操作的數據庫表名稱,在Spring環境的配置中找到這個表的分庫分表的配置信息,例如:一共分了多少個數據庫實例、數據庫和表。
?
下面是在Spring環境中配置的INVOICE表和INVOICE_ITEM表的具體信息,我們看到它們一共使用了兩個數據庫實例,每個實例有兩個庫,每個庫有8個表,使用水平下標策略。配置如下:
?
<bean?name="billingInvSplitTable"?class="com.robert.dbsplit.core.Split Table"init-method="init"><property?name="dbNamePrefix"?value="billing_inv"/><property?name="tableNamePrefix"?value="INVOICE"/><property?name="dbNum"?value="2"/><property?name="tableNum"?value="8"/><property?name="splitStrategyType"?value="HORIZONTAL"/><property?name="splitNodes"><list><ref?bean="splitNode0"/><ref?bean="splitNode1"/></list></property><property?name="readWriteSeparate"?value="true"/></bean><bean?name="billingInvItemSplitTable"?class="com.robert.dbsplit.core.SplitTable"init-method="init"><property?name="dbNamePrefix"?value="billing_inv"/><property?name="tableNamePrefix"?value="INVOICE_ITEM"/><property?name="dbNum"?value="2"/><property?name="tableNum"?value="8"/><property?name="splitStrategyType"?value="HORIZONTAL"/><property?name="splitNodes"><list><ref?bean="splitNode0"/><ref?bean="splitNode1"/></list></property><property?name="readWriteSeparate"?value="true"/></bean>然后,在方法被調用時通過AOP進行攔截,根據TransactionHint配置的路由的主鍵信息keyPath ="0.accountId",得知這次根據第0個參數Invoice的accountID字段來路由,根據Invoice的accountID的值來計算這次持久發票表具體涉及哪個數據庫分片,然后把這個數據庫分片的信息保存到ThreadLocal中。具體的實現代碼如下:
?
SimpleSplitJdbcTemplate simpleSplitJdbcTemplate =(SimpleSplitJdbcTemplate) ReflectionUtil.getFieldValue(field SimpleSplitJdbcTemplate, invocation.getThis());Method method = invocation.getMethod(); // Convert to th method of implementation class method = targetClass.getMethod(method.getName(), method.getParameter Types());TransactionHint[] transactionHints = method.getAnnotationsByType (TransactionHint.class); if?(transactionHints ==?null?|| transactionHints.length <?1)throw?new?IllegalArgumentException("The method "?+ method +?" includes illegal transaction hint."); TransactionHint transactionHint = transactionHints[0];String?tableName = transactionHint.table(); String?keyPath = transactionHint.keyPath();String[] parts = keyPath.split("\\."); int paramIndex = Integer.valueOf(parts[0]);Object[] params = invocation.getArguments(); Object?splitKey = params[paramIndex];if?(parts.length >?1) {String[] paths = Arrays.copyOfRange(parts,?1, parts.length);splitKey = ReflectionUtil.getFieldValueByPath(splitKey, paths); }SplitNode splitNode = simpleSplitJdbcTemplate.decideSplitNode(tableName, splitKey);ThreadContextHolder.INST.setContext(splitNode);ThreadContextHolder是一個單例的對象,在該對象里封裝了一個ThreadLocal,用來存儲某個方法在某個線程下關聯的分片信息:
?
public?class?ThreadContextHolder<T> {public?static?final ThreadContextHolder<SplitNode> INST =?new?ThreadContextHolder<SplitNode>();private?ThreadLocal<T> contextHolder =?new?ThreadLocal<T>();public?T?getContext()?{return?contextHolder.get();}public?void?setContext(T context)?{contextHolder.set(context);} }?
接下來與可編程式事務路由類似,實現一個定制化的事務管理器,在獲取目標事務管理器時,通過我們在ThreadLocal中保存的數據庫分片信息,獲得這個分片數據庫的事務管理器,然后返回:
?
public?class?RoutingTransactionManager?implements?PlatformTransactionManager?{protected?PlatformTransactionManager?getTargetTransactionManager()?{SplitNode splitNode = ThreadContextHolder.INST.getContext();return?splitNode.getPlatformTransactionManager();}public?void?commit(TransactionStatus status)?throws?TransactionException?{getTargetTransactionManager().commit(status);}public?TransactionStatus?getTransaction(TransactionDefinition definition)?throws?TransactionException?{return?getTargetTransactionManager().getTransaction(definition);}public?void?rollback(TransactionStatus status)?throws?TransactionException?{getTargetTransactionManager().rollback(status);} }?
本節介紹的開源數據庫分庫分表框架dbsplit是一個分庫分表的簡單示例實現,在筆者所工作的公司內部有內部版本,在內部版本中實現了聲明式事務路由,但是這部分功能并沒有開源到dbsplit項目,原因是有些與業務結合的邏輯無法分離。如果感興趣,則可以加入我們的開源項目開發中。
?
?
總結
以上是生活随笔為你收集整理的分库分表的事务处理机制的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: dnf 电脑配置(dnf配置电脑配置)
- 下一篇: java各种集合的线程安全