javascript
MyBatis—Spring 动态数据源事务的处理
在一般的 Spring 應(yīng)用中,如果底層數(shù)據(jù)庫訪問采用的是 MyBatis,那么在大多數(shù)情況下,只使用一個(gè)單獨(dú)的數(shù)據(jù)源,Spring 的事務(wù)管理在大多數(shù)情況下都是有效的。然而,在一些復(fù)雜的業(yè)務(wù)場景下,如需要在某一時(shí)刻訪問不同的數(shù)據(jù)庫,由于 Spring 對于事務(wù)管理實(shí)現(xiàn)的方式,可能不能達(dá)到預(yù)期的效果。本文將簡要介紹 Spring 中事務(wù)的實(shí)現(xiàn)方式,并對以 MyBatis 為底層數(shù)據(jù)庫訪問的系統(tǒng)為例,提供多數(shù)據(jù)源事務(wù)處理的解決方案
Spring 事務(wù)的實(shí)現(xiàn)原理
常見地,在 Spring 中添加事務(wù)的方式通常都是在對應(yīng)的方法或類上加上 @Transactional 注解顯式地將這部分處理加上事務(wù),對于 @Transactional 注解,Spring 會在 org.springframework.transaction.annotation.AnnotationTransactionAttributeSource 定義方法攔截的匹配規(guī)則(即 AOP 部分中的 PointCut),而具體的處理邏輯(即 AOP 中的 Advice)則是在 org.springframework.transaction.interceptor.TransactionInterceptor 中定義
具體事務(wù)執(zhí)行的調(diào)用鏈路如下
Spring 對于事務(wù)切面采取的具體行為實(shí)現(xiàn)如下:
public class TransactionInterceptor
extends TransactionAspectSupport
implements MethodInterceptor, Serializable {
// 這里的方法定義為 MethodInterceptor,即 AOP 實(shí)際調(diào)用點(diǎn)
@Override
@Nullable
public Object invoke(MethodInvocation invocation) throws Throwable {
// Work out the target class: may be {@code null}.
// The TransactionAttributeSource should be passed the target class
// as well as the method, which may be from an interface.
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
// Adapt to TransactionAspectSupport's invokeWithinTransaction...
// invokeWithinTransaction 為父類 TransactionAspectSupport 定義的方法
return invokeWithinTransaction(invocation.getMethod(), targetClass, new CoroutinesInvocationCallback() {
@Override
@Nullable
public Object proceedWithInvocation() throws Throwable {
return invocation.proceed();
}
@Override
public Object getTarget() {
return invocation.getThis();
}
@Override
public Object[] getArguments() {
return invocation.getArguments();
}
});
}
}
繼續(xù)進(jìn)入 TransactionAspectSupport 的 invokeWithinTransaction 方法:
public abstract class TransactionAspectSupport
implements BeanFactoryAware, InitializingBean {
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {
// 省略響應(yīng)式事務(wù)和編程式事務(wù)的處理邏輯
// 當(dāng)前事務(wù)管理的實(shí)際
PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
// Standard transaction demarcation with getTransaction and commit/rollback calls.
/*
檢查在當(dāng)前的執(zhí)行上下文中,是否需要創(chuàng)建新的事務(wù),這是因?yàn)楫?dāng)前執(zhí)行的業(yè)務(wù)處理可能在上一個(gè)已經(jīng)開始
的事務(wù)處理中
*/
TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
Object retVal;
try {
// This is an around advice: Invoke the next interceptor in the chain.
// This will normally result in a target object being invoked.
retVal = invocation.proceedWithInvocation(); // 實(shí)際業(yè)務(wù)代碼的業(yè)務(wù)處理
}
catch (Throwable ex) {
// target invocation exception
completeTransactionAfterThrowing(txInfo, ex); // 出現(xiàn)異常的回滾處理
throw ex;
}
finally {
cleanupTransactionInfo(txInfo);
}
if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
// Set rollback-only in case of Vavr failure matching our rollback rules...
TransactionStatus status = txInfo.getTransactionStatus();
if (status != null && txAttr != null) {
retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
}
}
// 如果沒有出現(xiàn)異常,則提交本次事務(wù)
commitTransactionAfterReturning(txInfo);
return retVal;
}
}
}
在獲取事務(wù)信息對象時(shí),首先需要獲取到對應(yīng)的事務(wù)狀態(tài)對象 TransactionStatus,這個(gè)狀態(tài)對象決定了 Spring 后續(xù)要對當(dāng)前事務(wù)采取的何種行為,具體代碼在 org.springframework.transaction.support.AbstractPlatformTransactionManager#getTransaction
// 這里的 definition 是通過解析 @Transactional 注解中的屬性得到的配置對象
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException {
// Use defaults if no transaction definition given.
TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
/*
這里獲取事務(wù)相關(guān)的對象(如持有的數(shù)據(jù)庫連接等),具體由子類來定義相關(guān)的實(shí)現(xiàn)
*/
Object transaction = doGetTransaction();
boolean debugEnabled = logger.isDebugEnabled();
// 如果當(dāng)前已經(jīng)在一個(gè)事務(wù)中,那么需要按照定義的屬性采取對應(yīng)的行為
if (isExistingTransaction(transaction)) {
// Existing transaction found -> check propagation behavior to find out how to behave.
return handleExistingTransaction(def, transaction, debugEnabled);
}
// Check definition settings for new transaction.
if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
}
// No existing transaction found -> check propagation behavior to find out how to proceed.
if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
// 需要重新開啟一個(gè)新的事務(wù)的情況,具體在 org.springframework.transaction.TransactionDefinition 有相關(guān)的定義
else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
}
try {
// 開啟一個(gè)新的事務(wù)
return startTransaction(def, transaction, debugEnabled, suspendedResources);
}
catch (RuntimeException | Error ex) {
resume(null, suspendedResources);
throw ex;
}
}
else {
// Create "empty" transaction: no actual transaction, but potentially synchronization.
if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
logger.warn("Custom isolation level specified but no actual transaction initiated; " +
"isolation level will effectively be ignored: " + def);
}
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
}
}
在 AbstractPlatformTransactionManager 中已經(jīng)定義了事務(wù)處理的大體框架,而實(shí)際的事務(wù)實(shí)現(xiàn)則交由具體的子類實(shí)現(xiàn),在一般情況下,由 org.springframework.jdbc.datasource.DataSourceTransactionManager 采取具體的實(shí)現(xiàn)
主要關(guān)注的點(diǎn)在于對于事務(wù)信息對象的創(chuàng)建,事務(wù)的開啟、提交回滾操作,具體對應(yīng)的代碼如下:
事務(wù)信息對象的創(chuàng)建代碼:
protected Object doGetTransaction() {
/*
簡單地理解,DataSourceTransactionObject 就是一個(gè)持有數(shù)據(jù)庫連接的資源對象
*/
DataSourceTransactionObject txObject = new DataSourceTransactionObject();
txObject.setSavepointAllowed(isNestedTransactionAllowed());
/*
TransactionSynchronizationManager 是用于管理在事務(wù)執(zhí)行過程相關(guān)的信息對象的一個(gè)工具類,基本上
這個(gè)類持有的事務(wù)信息貫穿了整個(gè) Spring 事務(wù)管理
*/
ConnectionHolder conHolder =
(ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
txObject.setConnectionHolder(conHolder, false);
return txObject;
}
開啟事務(wù)對應(yīng)的源代碼:
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;
try {
/*
如果當(dāng)前事務(wù)對象沒有持有數(shù)據(jù)庫連接,則需要從對應(yīng)的 DataSource 中獲取對應(yīng)的連接
*/
if (!txObject.hasConnectionHolder() ||
txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
Connection newCon = obtainDataSource().getConnection();
if (logger.isDebugEnabled()) {
logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
}
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);
txObject.setReadOnly(definition.isReadOnly());
// Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
// so we don't want to do it unnecessarily (for example if we've explicitly
// configured the connection pool to set it already).
/*
由于當(dāng)前的事務(wù)已經(jīng)交由 Spring 進(jìn)行管理,那么在這種情況下,原有數(shù)據(jù)庫連接的自動提交
必須是關(guān)閉的,因?yàn)槿绻_啟了自動提交,那么實(shí)際上就相當(dāng)于每一次的 SQL 都會執(zhí)行一次事務(wù)的提交,
這種情況下事務(wù)的管理沒有意義
*/
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
if (logger.isDebugEnabled()) {
logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
}
con.setAutoCommit(false);
}
prepareTransactionalConnection(con, definition);
txObject.getConnectionHolder().setTransactionActive(true);
int timeout = determineTimeout(definition);
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}
// Bind the connection holder to the thread.
/*
如果是新創(chuàng)建的事務(wù),那么需要綁定這個(gè)數(shù)據(jù)庫連接對象到這個(gè)事務(wù)中,使得后續(xù)再進(jìn)來的業(yè)務(wù)處理
能夠順利地進(jìn)入原有的事務(wù)中
*/
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
}
}
catch (Throwable ex) {
if (txObject.isNewConnectionHolder()) {
DataSourceUtils.releaseConnection(con, obtainDataSource());
txObject.setConnectionHolder(null, false);
}
throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
}
}
事務(wù)提交相關(guān)的代碼:
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
boolean beforeCompletionInvoked = false;
try {
boolean unexpectedRollback = false;
/*
一些事務(wù)提交時(shí)的鉤子方法,使得第三方的數(shù)據(jù)庫持久話框架(如 MyBatis)的
事務(wù)能夠被 Spring 管理
*/
prepareForCommit(status);
triggerBeforeCommit(status);
triggerBeforeCompletion(status);
beforeCompletionInvoked = true;
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Releasing transaction savepoint");
}
unexpectedRollback = status.isGlobalRollbackOnly();
status.releaseHeldSavepoint();
}
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction commit");
}
unexpectedRollback = status.isGlobalRollbackOnly();
doCommit(status);
}
else if (isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = status.isGlobalRollbackOnly();
}
// Throw UnexpectedRollbackException if we have a global rollback-only
// marker but still didn't get a corresponding exception from commit.
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction silently rolled back because it has been marked as rollback-only");
}
}
catch (UnexpectedRollbackException ex) {
// can only be caused by doCommit
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
throw ex;
}
catch (TransactionException ex) {
// can only be caused by doCommit
if (isRollbackOnCommitFailure()) {
doRollbackOnCommitException(status, ex);
}
else {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
}
throw ex;
}
catch (RuntimeException | Error ex) {
if (!beforeCompletionInvoked) {
triggerBeforeCompletion(status);
}
doRollbackOnCommitException(status, ex);
throw ex;
}
// Trigger afterCommit callbacks, with an exception thrown there
// propagated to callers but the transaction still considered as committed.
try {
// 事務(wù)正常提交后的鉤子方法
triggerAfterCommit(status);
}
finally {
// 事務(wù)正常提交后有關(guān)資源清理的鉤子方法
triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
}
}
finally {
cleanupAfterCompletion(status);
}
}
事務(wù)回滾的相關(guān)代碼:
private void doRollbackOnCommitException(DefaultTransactionStatus status, Throwable ex) throws TransactionException {
try {
if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction rollback after commit exception", ex);
}
doRollback(status);
}
else if (status.hasTransaction() && isGlobalRollbackOnParticipationFailure()) {
if (status.isDebug()) {
logger.debug("Marking existing transaction as rollback-only after commit exception", ex);
}
doSetRollbackOnly(status);
}
}
catch (RuntimeException | Error rbex) {
logger.error("Commit exception overridden by rollback exception", ex);
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
throw rbex;
}
// 一些事務(wù)相關(guān)的鉤子方法
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
}
MyBatis 與 Spring 事務(wù)的整合
在 MyBatis 中,實(shí)際獲取連接是通過 BaseExecutor 中 Transaction 屬性來獲取對應(yīng)的連接,實(shí)際上 MyBatis 執(zhí)行時(shí)并不會意識到當(dāng)前上下文是否處于一個(gè)事務(wù)中,在整合到 Spring 的過程中,默認(rèn)的 Transaction 實(shí)現(xiàn)類為 org.mybatis.spring.transaction.SpringManagedTransaction:
public class SpringManagedTransaction implements Transaction {
private static final Logger LOGGER = LoggerFactory.getLogger(SpringManagedTransaction.class);
private final DataSource dataSource;
private Connection connection;
private boolean isConnectionTransactional;
private boolean autoCommit;
public SpringManagedTransaction(DataSource dataSource) {
notNull(dataSource, "No DataSource specified");
this.dataSource = dataSource;
}
/**
* {@inheritDoc}
*/
@Override
public Connection getConnection() throws SQLException {
if (this.connection == null) {
openConnection();
}
return this.connection;
}
/*
從當(dāng)前的數(shù)據(jù)源對象 dataSource 中獲取一個(gè)連接對象,而結(jié)合上文 Spring 中對于事務(wù)的處理,如果已經(jīng)將
dataSource 屬性綁定到了當(dāng)前的線程,那么在這里就會獲取到原有創(chuàng)建事務(wù)時(shí)已經(jīng)創(chuàng)建的連接,而不是從頭重新生成一個(gè)連接
*/
private void openConnection() throws SQLException {
this.connection = DataSourceUtils.getConnection(this.dataSource);
this.autoCommit = this.connection.getAutoCommit();
/*
這里的目的是為了處理 MyBatis 部分關(guān)于事務(wù)提交的處理,因?yàn)?MyBatis 會將自己的事務(wù)處理放入到 Spring 事務(wù)中的
鉤子方法中進(jìn)行處理,如果此時(shí)持有的連接對象與整個(gè) Spring 事務(wù)持有的連接對象一致時(shí),由于 MyBatis 的事務(wù)提交會
早于 Spring 的事務(wù)提交(triggerBeforeCommit() 鉤子方法),從而導(dǎo)致 Spring 在提交事務(wù)時(shí)出現(xiàn)事務(wù)重復(fù)提交的異常
*/
this.isConnectionTransactional = DataSourceUtils.isConnectionTransactional(this.connection, this.dataSource);
LOGGER.debug(() -> "JDBC Connection [" + this.connection + "] will"
+ (this.isConnectionTransactional ? " " : " not ") + "be managed by Spring");
}
@Override
public void commit() throws SQLException {
if (this.connection != null && !this.isConnectionTransactional && !this.autoCommit) {
LOGGER.debug(() -> "Committing JDBC Connection [" + this.connection + "]");
this.connection.commit();
}
}
@Override
public void rollback() throws SQLException {
if (this.connection != null && !this.isConnectionTransactional && !this.autoCommit) {
LOGGER.debug(() -> "Rolling back JDBC Connection [" + this.connection + "]");
this.connection.rollback();
}
}
@Override
public void close() throws SQLException {
DataSourceUtils.releaseConnection(this.connection, this.dataSource);
}
@Override
public Integer getTimeout() throws SQLException {
ConnectionHolder holder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource);
if (holder != null && holder.hasTimeout()) {
return holder.getTimeToLiveInSeconds();
}
return null;
}
}
而 MyBatis 對于 Transaction 中的提交處理,需要將其整合到 Spring 中,是通過向 TransactionSynchronizationManager 注冊 TransactionSynchronization 來實(shí)現(xiàn)的,在 MyBatis 中,實(shí)際的實(shí)現(xiàn)類為 SqlSessionSynchronization:
private static final class SqlSessionSynchronization extends TransactionSynchronizationAdapter {
private final SqlSessionHolder holder; // 當(dāng)前持有的 SqlSession
/*
用于綁定到 TransactionSynchronizationManager 的 Key 對象,由于 Spring 對于 Bean 的單例處理,實(shí)際上每次
都是唯一的 SqlSessionFactory 實(shí)例對象,因此在 TransactionSynchronizationManager 中的 ThreadLocal 可以通過
這個(gè)對象找到當(dāng)前線程綁定的實(shí)際 Value 對象
*/
private final SqlSessionFactory sessionFactory;
private boolean holderActive = true;
public SqlSessionSynchronization(SqlSessionHolder holder, SqlSessionFactory sessionFactory) {
notNull(holder, "Parameter 'holder' must be not null");
notNull(sessionFactory, "Parameter 'sessionFactory' must be not null");
this.holder = holder;
this.sessionFactory = sessionFactory;
}
@Override
public int getOrder() {
// order right before any Connection synchronization
return DataSourceUtils.CONNECTION_SYNCHRONIZATION_ORDER - 1;
}
@Override
public void suspend() {
if (this.holderActive) {
LOGGER.debug(() -> "Transaction synchronization suspending SqlSession [" + this.holder.getSqlSession() + "]");
TransactionSynchronizationManager.unbindResource(this.sessionFactory);
}
}
@Override
public void resume() {
if (this.holderActive) {
LOGGER.debug(() -> "Transaction synchronization resuming SqlSession [" + this.holder.getSqlSession() + "]");
TransactionSynchronizationManager.bindResource(this.sessionFactory, this.holder);
}
}
@Override
public void beforeCommit(boolean readOnly) {
/*
注意 Spring 事務(wù)中的 triggerBeforeCommit() 鉤子方法,在事務(wù)提交前會依次檢查 TransactionSynchronizationManager 中綁定的 TransactionSynchronization,并在事務(wù)實(shí)際提交前(即當(dāng)前事務(wù)信息是新開啟的事務(wù))前調(diào)用每個(gè) TransactionSynchronization 的 beforeCommit 方法
*/
if (TransactionSynchronizationManager.isActualTransactionActive()) {
try {
LOGGER.debug(() -> "Transaction synchronization committing SqlSession [" + this.holder.getSqlSession() + "]");
/*
由于 SqlSession 最終的方法調(diào)用會委托給對應(yīng)的 Executor 進(jìn)行處理,而 executor 的 commit()
則會繼續(xù)調(diào)用 Transaction 對象的 commit() 方法,從而實(shí)現(xiàn)與上文 SpringManagedTransaction 對象整合
*/
this.holder.getSqlSession().commit();
} catch (PersistenceException p) {
if (this.holder.getPersistenceExceptionTranslator() != null) {
DataAccessException translated = this.holder.getPersistenceExceptionTranslator()
.translateExceptionIfPossible(p);
if (translated != null) {
throw translated;
}
}
throw p;
}
}
}
/*
triggerBeforeCompletion() 鉤子方法
*/
@Override
public void beforeCompletion() {
// Issue #18 Close SqlSession and deregister it now
// because afterCompletion may be called from a different thread
if (!this.holder.isOpen()) {
LOGGER
.debug(() -> "Transaction synchronization deregistering SqlSession [" + this.holder.getSqlSession() + "]");
TransactionSynchronizationManager.unbindResource(sessionFactory);
this.holderActive = false;
LOGGER.debug(() -> "Transaction synchronization closing SqlSession [" + this.holder.getSqlSession() + "]");
this.holder.getSqlSession().close();
}
}
/*
triggerAfterCompletion() 鉤子方法,主要是為了清理相關(guān) ThreadLocal 綁定的資源對象
*/
@Override
public void afterCompletion(int status) {
if (this.holderActive) {
// afterCompletion may have been called from a different thread
// so avoid failing if there is nothing in this one
LOGGER
.debug(() -> "Transaction synchronization deregistering SqlSession [" + this.holder.getSqlSession() + "]");
TransactionSynchronizationManager.unbindResourceIfPossible(sessionFactory);
this.holderActive = false;
LOGGER.debug(() -> "Transaction synchronization closing SqlSession [" + this.holder.getSqlSession() + "]");
this.holder.getSqlSession().close();
}
this.holder.reset();
}
}
為了使得 MyBatis 在執(zhí)行的過程中能夠 Spring 進(jìn)行管理,因此需要代理實(shí)際執(zhí)行的 SqlSession,實(shí)際執(zhí)行類為 SqlSessionTemplate,在執(zhí)行的過程中,實(shí)際行為在 SqlSessionInterceptor 中定義:
// InvocationHandler 為 JDK 動態(tài)代理的部分,定義了代理類需要采取的相關(guān)行為
private class SqlSessionInterceptor implements InvocationHandler {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
/*
getSqlSession 為 SqlSessionUtil 的靜態(tài)方法,實(shí)際上在執(zhí)行過程中也是通過 TransactionSynchronizationManager 來感知當(dāng)前上下文所處的事務(wù)信息,當(dāng)處于同一個(gè)事務(wù)中時(shí),則會通過 sqlSessionFactory
作為 key 來獲取之前的 SqlSession,從而保證事務(wù)的正常運(yùn)行
*/
SqlSession sqlSession = getSqlSession(SqlSessionTemplate.this.sqlSessionFactory,
SqlSessionTemplate.this.executorType, SqlSessionTemplate.this.exceptionTranslator);
try {
Object result = method.invoke(sqlSession, args);
if (!isSqlSessionTransactional(sqlSession, SqlSessionTemplate.this.sqlSessionFactory)) {
// force commit even on non-dirty sessions because some databases require
// a commit/rollback before calling close()
sqlSession.commit(true);
}
return result;
} catch (Throwable t) {
Throwable unwrapped = unwrapThrowable(t);
// 省略部分異常處理代碼
throw unwrapped;
} finally {
if (sqlSession != null) {
closeSqlSession(sqlSession, SqlSessionTemplate.this.sqlSessionFactory);
}
}
}
}
getSqlSession 是 SqlSessionUtil 的靜態(tài)方法,實(shí)際源代碼如下所示:
public static SqlSession getSqlSession(SqlSessionFactory sessionFactory,
ExecutorType executorType,
PersistenceExceptionTranslator exceptionTranslator) {
notNull(sessionFactory, NO_SQL_SESSION_FACTORY_SPECIFIED);
notNull(executorType, NO_EXECUTOR_TYPE_SPECIFIED);
/*
如果能在 TransactionSynchronizationManager 中找到和當(dāng)前 SqlSessionFactory 綁定的 SqlSession
信息,則說明當(dāng)前可能處于一個(gè)事務(wù)中
*/
SqlSessionHolder holder = (SqlSessionHolder) TransactionSynchronizationManager.getResource(sessionFactory);
SqlSession session = sessionHolder(executorType, holder);
if (session != null) {
return session;
}
/*
執(zhí)行到這里,說明要么此時(shí)是第一次進(jìn)入事務(wù),或者當(dāng)前的執(zhí)行方式是以非事務(wù)的形式執(zhí)行的,但無論是那種形式,都需要創(chuàng)建一個(gè)新的 SqlSession
*/
LOGGER.debug(() -> "Creating a new SqlSession");
session = sessionFactory.openSession(executorType);
/*
如果當(dāng)前是以事務(wù)的形式執(zhí)行的,則需要將創(chuàng)建的 SqlSession 注冊到當(dāng)前事務(wù)上下文中
*/
registerSessionHolder(sessionFactory, executorType, exceptionTranslator, session);
return session;
}
private static SqlSession sessionHolder(ExecutorType executorType, SqlSessionHolder holder) {
SqlSession session = null;
/*
holder 在注冊到 TransactionSynchronizationManager 中時(shí)就會將 synchronizedWithTransaction
設(shè)置為 true,因此實(shí)際上只要注冊到了 TransactionSynchronizationManager 中則說明已經(jīng)在一個(gè)事務(wù)中了
*/
if (holder != null && holder.isSynchronizedWithTransaction()) {
if (holder.getExecutorType() != executorType) {
throw new TransientDataAccessResourceException(
"Cannot change the ExecutorType when there is an existing transaction");
}
holder.requested();
LOGGER.debug(() -> "Fetched SqlSession [" + holder.getSqlSession() + "] from current transaction");
session = holder.getSqlSession();
}
return session;
}
private static void registerSessionHolder(SqlSessionFactory sessionFactory, ExecutorType executorType,
PersistenceExceptionTranslator exceptionTranslator, SqlSession session) {
SqlSessionHolder holder;
/*
TransactionSynchronizationManager.isSynchronizationActive() 檢查當(dāng)前是否處于一個(gè)事務(wù)上下文中,這個(gè)屬性
會在創(chuàng)建事務(wù)的時(shí)候進(jìn)行初始化
*/
if (TransactionSynchronizationManager.isSynchronizationActive()) {
Environment environment = sessionFactory.getConfiguration().getEnvironment();
if (environment.getTransactionFactory() instanceof SpringManagedTransactionFactory) {
LOGGER.debug(() -> "Registering transaction synchronization for SqlSession [" + session + "]");
holder = new SqlSessionHolder(session, executorType, exceptionTranslator);
/*
注冊 sessionFactory 到事務(wù)上下文,使得能夠被后續(xù)的處理感知
*/
TransactionSynchronizationManager.bindResource(sessionFactory, holder);
/*
注冊一個(gè) TransactionSynchronization,這個(gè) TransactionSynchronization 相關(guān)的方法會在 Spring 事務(wù)的鉤子方法中被調(diào)用
*/
TransactionSynchronizationManager
.registerSynchronization(new SqlSessionSynchronization(holder, sessionFactory));
holder.setSynchronizedWithTransaction(true); // 與上面 sessionHolder 同步
holder.requested();
} else {
if (TransactionSynchronizationManager.getResource(environment.getDataSource()) == null) {
LOGGER.debug(() -> "SqlSession [" + session
+ "] was not registered for synchronization because DataSource is not transactional");
} else {
throw new TransientDataAccessResourceException(
"SqlSessionFactory must be using a SpringManagedTransactionFactory in order to use Spring transaction synchronization");
}
}
} else {
LOGGER.debug(() -> "SqlSession [" + session
+ "] was not registered for synchronization because synchronization is not active");
}
具體整合關(guān)系如下圖所示:
動態(tài)數(shù)據(jù)源的處理
基本處理
一般在 Spring 中實(shí)現(xiàn)動態(tài)數(shù)據(jù)源都是基于 AbstractRoutingDataSource 并實(shí)現(xiàn) determineCurrentLookupKey 來實(shí)現(xiàn)的,在實(shí)現(xiàn)的過程中,AbstractRoutingDataSource 會持有一個(gè)關(guān)于數(shù)據(jù)源 DataSource 的映射關(guān)系,通過 determineCurrentLookupKey 作為 key 來決定實(shí)際要采取的實(shí)際數(shù)據(jù)源。這種方式相當(dāng)于多累加了一層,在一般的使用場景下可能不會有什么問題,但是當(dāng)涉及到事務(wù)時(shí),可能會出現(xiàn)一些不可思議的問題
假如現(xiàn)在我們有兩個(gè)數(shù)據(jù)源:MySQL 和 PostgreSQL,我們可以定義自己的數(shù)據(jù)源枚舉類(當(dāng)然直接使用字符串也可以,但是使用枚舉會更好)DataSourceType:
public enum DataSourceType {
MYSQL,
POSTGRESQL
}
現(xiàn)在,我們需要在系統(tǒng)中定義我們自己的實(shí)際數(shù)據(jù)源,這里為了簡便,直接使用 DataSourceBuilder 的方式進(jìn)行構(gòu)建:
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.sql.DataSource;
@Configuration
public class DataSourceConfig {
/*
MySQL 數(shù)據(jù)源
*/
@Bean(name = "mysqlDataSource")
public DataSource mysqlDataSource() {
return DataSourceBuilder.create()
.url("jdbc:mysql://127.0.0.1:3306/lxh_db")
.username("root")
.password("12345678")
.type(DruidDataSource.class)
.build();
}
/*
PostgreSQL 數(shù)據(jù)源
*/
@Bean(name = "psqlDataSource")
public DataSource psqlDataSource() {
return DataSourceBuilder.create()
.url("jdbc:postgresql://127.0.0.1:5432/lxh_db")
.username("postgres")
.password("12345678")
.type(DruidDataSource.class)
.build();
}
}
為了實(shí)現(xiàn)動態(tài)數(shù)據(jù)源,我們需要繼承 AbstractRoutingDataSource,并實(shí)現(xiàn) determineCurrentLookupKey 方法。為了能夠動態(tài)地改變當(dāng)前執(zhí)行上下文的數(shù)據(jù)源類型,我們使用一個(gè) ThreadLocal 來存儲當(dāng)前需要的數(shù)據(jù)源類型:
public class DataSourceHolder {
private static final ThreadLocal<DataSourceType> dataSourceHolder = new ThreadLocal<>();
public static void setCurDataSource(DataSourceType type) {
dataSourceHolder.set(type);
}
public static DataSourceType getCurDataSource() {
return dataSourceHolder.get();
}
}
之后,我們重新定義我們自己的動態(tài)數(shù)據(jù)源類型:
import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;
public class DynamicDataSource
extends AbstractRoutingDataSource {
@Override
protected Object determineCurrentLookupKey() {
return DataSourceHolder.getCurDataSource();
}
}
現(xiàn)在我們的動態(tài)數(shù)據(jù)源還沒有實(shí)際的 DataSource 映射,因此我們在實(shí)例化 DynamicDataSource 時(shí)需要手動注冊:
import com.google.common.collect.ImmutableMap;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import javax.sql.DataSource;
import java.util.Map;
@Configuration
public class DataSourceConfig {
/*
由于我們已經(jīng)在系統(tǒng)中定義了多個(gè) DataSource,因此我們需要使用 @Primary 注解來標(biāo)記當(dāng)前定義的 DataSource 是實(shí)際需要用到的 DataSource
*/
@Primary
@Bean(name = "dynamicDataSource")
public DataSource dynamicDataSource(@Qualifier("mysqlDataSource") DataSource mysqlDataSource,
@Qualifier("psqlDataSource") DataSource psqlDataSource) {
DynamicDataSource dataSource = new DynamicDataSource();
// 綁定目標(biāo) key 到實(shí)際數(shù)據(jù)源的映射關(guān)系,并將它們注冊到我們的動態(tài)數(shù)據(jù)源中
Map<Object, Object> dataSourceMap = ImmutableMap.builder()
.put(DataSourceType.MYSQL, mysqlDataSource)
.put(DataSourceType.POSTGRESQL, psqlDataSource)
.build();
dataSource.setTargetDataSources(dataSourceMap);
// 當(dāng)通過 key 無法找到對應(yīng)的數(shù)據(jù)源時(shí),默認(rèn)的數(shù)據(jù)源類型
dataSource.setDefaultTargetDataSource(mysqlDataSource);
return dataSource;
}
}
這樣做就可以使用我們的動態(tài)數(shù)據(jù)源了,在使用前,只需要調(diào)用 DataSourceHolder.setCurDataSource 來進(jìn)行數(shù)據(jù)源切換即可:
public class XXService {
@Resource
private BBService bbService;
public void handler() {
DataSourceType prevType = DataSourceHolder.getCurDataSource();
DataSourceHolder.setCurDataSource(DataSourceType.XXX); // 設(shè)置當(dāng)前的數(shù)據(jù)源類型
bbService.handler(); // bbService 在處理時(shí)就會使用 XXX 對應(yīng)的數(shù)據(jù)源
DataSourceHolder.setCurDataSource(prevType); // 還原回之前的數(shù)據(jù)源
}
}
進(jìn)一步簡化
上面動態(tài)數(shù)據(jù)源的使用似乎有些繁瑣,我們可以使用 AOP 來簡化這個(gè)步驟,由于我們無法在運(yùn)行中得知用戶需要使用的數(shù)據(jù)源類型,因此我們只能要求用戶決定。為了達(dá)到這一目的,我們可以自己定義一個(gè)注解來標(biāo)記用戶希望使用的數(shù)據(jù)源類型:
import java.lang.annotation.*;
/**
* 用于定義處理上下文的所需要持有的數(shù)據(jù)源類型
*/
@Inherited
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD, ElementType.TYPE})
public @interface DataSource {
DataSourceType value() default DataSourceType.MYSQL;
}
這樣,用戶如果希望在 XXService 服務(wù)中都使用 MySQL 數(shù)據(jù)源,而在 BBService 中都使用 PostrgreSQL 數(shù)據(jù)源,可以這么做:
@Service
@DataSource(MYSQL)
public class XXService {
}
@Service
@DataSource(POSTGRESQL)
public class BBService {
}
現(xiàn)在我們已經(jīng)定義了需要攔截的位置,還需要定義相關(guān)的行為來達(dá)到自動切換數(shù)據(jù)源上下文的目的:
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.xhliu.springtransaction.annotation.DataSource;
import org.xhliu.springtransaction.datasource.DataSourceHolder;
import org.xhliu.springtransaction.datasource.DataSourceType;
@Aspect
@Component
public class DataSourceAspect {
private final static Logger log = LoggerFactory.getLogger(DataSourceAspect.class);
@Around("@annotation(org.xhliu.springtransaction.annotation.DataSource)")
public Object dataSourceSelect(ProceedingJoinPoint pjp) throws Throwable {
DataSourceType prevType = DataSourceHolder.getCurDataSource();
// 獲取當(dāng)前用戶需要使用的動態(tài)數(shù)據(jù)源類型
DataSource dataSource = parseDataSourceAnno(pjp);
try {
log.debug("當(dāng)前執(zhí)行的上下文中,數(shù)據(jù)源的所屬類型: {}", dataSource.value());
DataSourceHolder.setCurDataSource(dataSource.value());
return pjp.proceed();
} finally {
// 最終需要還原回一開始的數(shù)據(jù)源
DataSourceHolder.setCurDataSource(prevType);
}
}
private static DataSource parseDataSourceAnno(ProceedingJoinPoint pjp) {
MethodSignature signature = (MethodSignature) pjp.getSignature();
DataSource dataSource = signature.getMethod().getDeclaredAnnotation(DataSource.class);
if (dataSource != null) return dataSource;
Object target = pjp.getTarget();
return target.getClass().getDeclaredAnnotation(DataSource.class);
}
}
修改 MyBatis 事務(wù)的行為
基本處理
由于 Spring 事務(wù)是通過 TransactionSynchronizationManager 的 ThreadLocal 綁定 DataSource 和對應(yīng)的 Connection 來實(shí)現(xiàn)事務(wù)的上下文檢測,因此我們創(chuàng)建的 DataSource 在事務(wù)的執(zhí)行過程中是無法再動態(tài)地切換數(shù)據(jù)源。為了解決這一問題,我們需要重新定義 MyBatis 事務(wù)的處理邏輯,使得它能夠動態(tài)地切換數(shù)據(jù)源
我們定義自己的 DynamicTransaction 來替換現(xiàn)有的 SpringManagedTransaction:
import org.apache.ibatis.transaction.Transaction;
import org.mybatis.spring.transaction.SpringManagedTransaction;
import org.springframework.jdbc.datasource.DataSourceUtils;
import org.springframework.jdbc.datasource.DelegatingDataSource;
import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.xhliu.springtransaction.datasource.DataSourceType;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* 用于定義在當(dāng)前 MyBatis 處理上下文中,正在被使用的事務(wù)對象類型,由于現(xiàn)有的 {@link SpringManagedTransaction}
* 實(shí)現(xiàn)只能綁定到一個(gè)數(shù)據(jù)源,在基于 {@link AbstractRoutingDataSource} 的數(shù)據(jù)源中,當(dāng)同屬于一個(gè)事務(wù)時(shí),無法切換到希望的
* 數(shù)據(jù)源,為此,需要定義一個(gè)特殊的事務(wù)類型來替換現(xiàn)有的事務(wù)類型,從而實(shí)現(xiàn)在一個(gè)事務(wù)中能夠切換數(shù)據(jù)源的效果
*
* @author lxh
*/
public class DynamicTransaction
extends SpringManagedTransaction {
// 緩存當(dāng)前數(shù)據(jù)源之間的映射關(guān)系
private final Map<DataSourceType, Transaction> txMap = new ConcurrentHashMap<>();
// 實(shí)際當(dāng)前系統(tǒng)中持有的動態(tài)數(shù)據(jù)源對象
private final DataSource dataSource;
public DynamicTransaction(DataSource dataSource) {
super(dataSource);
this.dataSource = dataSource;
}
@Override
public Connection getConnection() throws SQLException {
Connection connection = getConnection(DynamicDataSourceUtils.determineDataSourceType());
if (TransactionSynchronizationManager.isActualTransactionActive()) {
connection.setAutoCommit(false); // 如果當(dāng)前已經(jīng)持有了事務(wù),那么獲取到的連接應(yīng)當(dāng)都是非自動提交的
}
return connection;
}
@Override
public void commit() throws SQLException {
/*
由于該方法的調(diào)用發(fā)生在 Spring 事務(wù)提交之前 `triggerBeforeCommit` 鉤子方法
*/
for (Map.Entry<DataSourceType, Transaction> entry : txMap.entrySet()) {
if (!entry.getValue().getConnection().getAutoCommit()) {
entry.getValue().getConnection().commit();
}
}
}
@Override
public void rollback() throws SQLException {
/*
前面提到,MyBatis 整合 Spring 的事務(wù)過程中是通過 AbstractPlatformTransactionManager 的鉤子方法實(shí)現(xiàn)的,
在回滾時(shí)如果能夠檢測到事務(wù)存活,那么說明此時(shí)事務(wù)依舊被 Spring 管理,因此此時(shí)這部分的處理不應(yīng)當(dāng)被回滾
*/
if (TransactionSynchronizationManager.isActualTransactionActive()) return;
for (Map.Entry<DataSourceType, Transaction> entry : txMap.entrySet()) {
entry.getValue().getConnection().rollback();
}
}
@Override
public void close() throws SQLException {
if (TransactionSynchronizationManager.isActualTransactionActive()) return;
for (Map.Entry<DataSourceType, Transaction> entry : txMap.entrySet()) {
DataSourceUtils.releaseConnection(entry.getValue().getConnection(), curDataSource(entry.getKey()));
}
}
private Connection getConnection(DataSourceType type) throws SQLException {
if (txMap.containsKey(type)) {
return txMap.get(type).getConnection();
}
txMap.put(type, new SpringManagedTransaction(curDataSource(type)));
return txMap.get(type).getConnection();
}
private DataSource curDataSource(DataSourceType type) {
DataSource curDS = dataSource;
/*
由于有可能存在代理,因此需要不斷剝離現(xiàn)有數(shù)據(jù)源對象,直到獲取到實(shí)際的數(shù)據(jù)源對象
*/
while (curDS instanceof DelegatingDataSource) {
curDS = ((DelegatingDataSource) curDS).getTargetDataSource();
}
/*
對于動態(tài)數(shù)據(jù)源對象,需要通過對應(yīng) Key 獲取到對應(yīng)的實(shí)際 DataSource 對象
*/
if (curDS instanceof AbstractRoutingDataSource) {
Map<Object, DataSource> dss = ((AbstractRoutingDataSource) curDS).getResolvedDataSources();
return dss.getOrDefault(type, ((AbstractRoutingDataSource) curDS).getResolvedDefaultDataSource());
}
return curDS; // 其它一般情況的數(shù)據(jù)源。。。。
}
}
為了使得 MyBatis能夠使用我們自定義的 Transaction,我們需要重新配置 MyBatis 的 TransactionFactory,因此我們需要重新定義自己的 TransactionFactory:
import org.apache.ibatis.session.TransactionIsolationLevel;
import org.apache.ibatis.transaction.Transaction;
import org.mybatis.spring.transaction.SpringManagedTransactionFactory;
import javax.sql.DataSource;
/**
* 重新定義 MyBatis 中的事務(wù)工廠,使得自定義的動態(tài)數(shù)據(jù)源事務(wù)能夠被 MyBatis 加載
*
* @author lxh
*/
public class DynamicTransactionFactory
extends SpringManagedTransactionFactory {
@Override
public Transaction newTransaction(DataSource dataSource,
TransactionIsolationLevel level,
boolean autoCommit) {
return new DynamicTransaction(dataSource);
}
}
現(xiàn)在,我們要做的是替換現(xiàn)有 MyBatis 中的 TransactionFactory,這個(gè)配置是在 MybatisAutoConfiguration (如果是第三方的擴(kuò)展的 MyBatis,則是在其對應(yīng)的 **AutoConfiguration 中)中完成的配置:
@Bean
@ConditionalOnMissingBean
public SqlSessionFactory sqlSessionFactory(DataSource dataSource) throws Exception {
SqlSessionFactoryBean factory = new SqlSessionFactoryBean();
factory.setDataSource(dataSource);
factory.setVfs(SpringBootVFS.class);
if (StringUtils.hasText(this.properties.getConfigLocation())) {
factory.setConfigLocation(this.resourceLoader.getResource(this.properties.getConfigLocation()));
}
/*
應(yīng)用相關(guān)的 Configuration,包括 Mapper 的路徑,日志等配置信息
*/
applyConfiguration(factory);
// 省略部分代碼。。。。
/*
這里是 MyBatis 提供的一個(gè)擴(kuò)展點(diǎn),用于修改 SqlSessionFactoryBean 的相關(guān)配置屬性,如 TransactionFactory 等相關(guān)信息,具體詳情可以查看 org.mybatis.spring.boot.autoconfigure.SqlSessionFactoryBeanCustomizer
*/
applySqlSessionFactoryBeanCustomizers(factory);
return factory.getObject();
}
由于 MyBatis 已經(jīng)提供了相關(guān)的擴(kuò)展點(diǎn),因此我們可以由此將我們自定義的 TransactionFactory 替換掉 MyBatis 中默認(rèn)的 TransactionFactory:
import org.apache.ibatis.transaction.TransactionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.boot.autoconfigure.SqlSessionFactoryBeanCustomizer;
public class TransactionSqlSessionFactoryBeanCustomizer
implements SqlSessionFactoryBeanCustomizer {
private final TransactionFactory txFactory;
public TransactionSqlSessionFactoryBeanCustomizer(TransactionFactory txFactory) {
this.txFactory = txFactory;
}
@Override
public void customize(SqlSessionFactoryBean factoryBean) {
factoryBean.setTransactionFactory(txFactory);
}
}
我們需要將這個(gè)類添加到 Spring 上下文中,使得 Spring 能夠發(fā)現(xiàn)并實(shí)例化它(這里我們使用注解的形式):
import org.mybatis.spring.SqlSessionFactoryBean;
import org.apache.ibatis.transaction.TransactionFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.xhliu.springtransaction.transaction.DynamicTransactionFactory;
/**
* @author lxh
*/
@Configuration
public class MyBatisConfig {
@Bean(name = "dynamicTransactionFactory")
public TransactionFactory dynamicTransactionFactory() {
return new DynamicTransactionFactory();
}
@Bean(name = "dynamicDataSourceCustomizer")
public SqlSessionFactoryBeanCustomizer
dynamicDataSourceCustomizer(
@Qualifier("dynamicTransactionFactory") TransactionFactory dynamicTransactionFactory
) {
return new TransactionSqlSessionFactoryBeanCustomizer(dynamicTransactionFactory);
}
}
現(xiàn)在每個(gè)組件的關(guān)系如下:
一些可能出現(xiàn)的問題
TransactionFactory 無法注冊
在一些低版本的 MyBatis 或者第三方 MyBatis 組件中,可能使用 SqlSessionFactoryBeanCustomizer 來配置 SqlSessionFactoryBean,在這種情況下,最佳的解決方式是提高 MyBatis 的版本,但是在一些三方組件中,這部分可能很難發(fā)生變化(不再維護(hù)或者其它原因無法修改),這種情況下,需要我們手動替換 SqlSessionFactory 的定義,比如我們創(chuàng)建自己的 MineMyBatisAutoConfiguration:
/**
* @author lxh
*/
@Configuration
public class MineMyBatisAutoConfiguration {
private final static Logger log = LoggerFactory.getLogger(MineMyBatisAutoConfiguration.class);
private final MybatisProperties properties;
private final Interceptor[] interceptors;
private final TypeHandler[] typeHandlers;
private final LanguageDriver[] languageDrivers;
private final ResourceLoader resourceLoader;
private final DatabaseIdProvider databaseIdProvider;
private final List<ConfigurationCustomizer> configurationCustomizers;
private final List<SqlSessionFactoryBeanCustomizer> sqlSessionFactoryBeanCustomizers;
public MineMyBatisAutoConfiguration(
MybatisProperties properties,
ObjectProvider<Interceptor[]> interceptorsProvider,
ObjectProvider<TypeHandler[]> typeHandlersProvider,
ObjectProvider<LanguageDriver[]> languageDriversProvider,
ResourceLoader resourceLoader,
ObjectProvider<DatabaseIdProvider> databaseIdProvider,
ObjectProvider<List<ConfigurationCustomizer>> configurationCustomizersProvider,
ObjectProvider<List<SqlSessionFactoryBeanCustomizer>> sqlSessionFactoryBeanCustomizers
) {
this.properties = properties;
this.interceptors = interceptorsProvider.getIfAvailable();
this.typeHandlers = typeHandlersProvider.getIfAvailable();
this.languageDrivers = languageDriversProvider.getIfAvailable();
this.resourceLoader = resourceLoader;
this.databaseIdProvider = databaseIdProvider.getIfAvailable();
this.configurationCustomizers = configurationCustomizersProvider.getIfAvailable();
this.sqlSessionFactoryBeanCustomizers = sqlSessionFactoryBeanCustomizers.getIfAvailable();
}
@Bean
@ConditionalOnMissingBean
public SqlSessionFactory sqlSessionFactory(DataSource dataSource) throws Exception {
SqlSessionFactoryBean factory = new SqlSessionFactoryBean();
// 三方組件相關(guān)的源碼。。。。
/*
將 SqlSessionFactoryBeanCustomizer 配置到當(dāng)前的 SqlSessionFactoryBean,使得我們現(xiàn)有的
TransactionFactory 的配置能夠生效
*/
applySqlSessionFactoryBeanCustomizers(factory);
return factory.getObject();
}
private void applySqlSessionFactoryBeanCustomizers(SqlSessionFactoryBean factory) {
if (!CollectionUtils.isEmpty(this.sqlSessionFactoryBeanCustomizers)) {
for (SqlSessionFactoryBeanCustomizer customizer : this.sqlSessionFactoryBeanCustomizers) {
customizer.customize(factory);
}
}
}
private void applyConfiguration(SqlSessionFactoryBean factory) {
org.apache.ibatis.session.Configuration configuration = this.properties.getConfiguration();
if (configuration == null && !StringUtils.hasText(this.properties.getConfigLocation())) {
configuration = new org.apache.ibatis.session.Configuration();
}
if (configuration != null && !CollectionUtils.isEmpty(this.configurationCustomizers)) {
for (ConfigurationCustomizer customizer : this.configurationCustomizers) {
customizer.customize(configuration);
}
}
factory.setConfiguration(configuration);
}
}
由于配置類的加載順序問題,可能需要手動地修改配置類定義的順序,由于 Spring 會首先加載被 @ComponentScan 注解修飾的配置類,因此在啟動類中需要將這個(gè)類作為最開始掃描的基類,從而不會被其它 MyBatis 組件替換:
/*
強(qiáng)制將 MineMyBatisAutoConfiguration 的配置類定義放到最前
*/
@ComponentScan(basePackageClasses = {MineMyBatisAutoConfiguration.class, DemoApplication.class})
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
死鎖
實(shí)際上,由于 Spring 事務(wù)會綁定 DataSource 作為事務(wù)的關(guān)鍵信息對象,同時(shí)會通過 DataSource 的 getConnection() 方法作為此 DataSource 對應(yīng)事務(wù)的唯一連接,這在原有的事務(wù)處理中是沒有問題的。然而,由于我們修改了 MyBatis 獲取數(shù)據(jù)庫連接的方式,使得它不再是直接當(dāng)前線程綁定的事務(wù)信息中的連接了,也就是說,MyBatis 獲取到的 Connection 和 Spring 事務(wù)中的存活的 Connection 不再是同一個(gè)。在這種情況下,Spring 事務(wù)在等待 MyBatis 處理的結(jié)束去釋放連接,而 MyBatis 獲取數(shù)據(jù)又需要重新從 DataSource 中再獲取一次(一般是通過數(shù)據(jù)庫連接池,如果此時(shí)連接池中的連接數(shù)已經(jīng)被耗盡了,那么此時(shí) MyBatis 的處理會被阻塞),而 MyBatis 的阻塞又會導(dǎo)致 Spring 事務(wù)中的數(shù)據(jù)庫連接無法被釋放,這可能導(dǎo)致 MyBatis 永遠(yuǎn)無法再獲取到新的連接!
具體情況如下圖所示:
回想一下死鎖出現(xiàn)的幾個(gè)條件:持有互斥鎖、持有并等待、非搶占式以及構(gòu)成循環(huán)回路。盡管在這個(gè)問題中并不存在實(shí)際意義上的互斥鎖,但是對于連接池的請求也間接地相當(dāng)于希望獲取互斥鎖,同時(shí)內(nèi)部的兩個(gè)獲取連接的操作也在形式上滿足了其余的幾個(gè)條件。
為了解決死鎖,只需要去掉其中的一個(gè)條件即可,最佳的條件去除就是互斥鎖。經(jīng)過上文的分析,出現(xiàn)死鎖的原因是因?yàn)橐粋€(gè)事務(wù)中多次獲取了連接,我們只需要保證在一個(gè)事務(wù)中不會出現(xiàn)對同一個(gè)數(shù)據(jù)源多次獲取連接即可
首先,我們需要確保在一個(gè)事務(wù)中綁定的 DataSource 為我們實(shí)際需要獲取連接的數(shù)據(jù)源,而不是 AbstractRoutingDataSource(綁定該數(shù)據(jù)源就會使得后續(xù) MyBatis 在獲取連接時(shí)重新獲取一次),因此,我們需要修改現(xiàn)在的 DataSourceTransactionManager,使得它能夠綁定到正確的實(shí)際數(shù)據(jù)源:
import org.jetbrains.annotations.NotNull;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.jdbc.datasource.DelegatingDataSource;
import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;
import javax.sql.DataSource;
import java.util.Map;
public class DynamicDataSourceTransactionManager
extends DataSourceTransactionManager {
public DynamicDataSourceTransactionManager(DataSource dataSource) {
super(dataSource);
}
@NotNull
@Override
protected DataSource obtainDataSource() {
/*
剝離 AbstractRoutingDataSource,使得事務(wù)能夠綁定實(shí)際的 DataSource,后續(xù)的 MyBatis 獲取連接時(shí)即可通過 DataSource 獲取到當(dāng)前事務(wù)上下文中關(guān)聯(lián)的數(shù)據(jù)庫連接
*/
DataSource curDataSource = super.obtainDataSource();
while (curDataSource instanceof DelegatingDataSource) {
curDataSource = ((DelegatingDataSource) curDataSource).getTargetDataSource();
}
if (curDataSource instanceof AbstractRoutingDataSource) {
Map<Object, DataSource> dss = ((AbstractRoutingDataSource) curDataSource).getResolvedDataSources();
return dss.getOrDefault(DynamicDataSourceUtils.determineDataSourceType(),
((AbstractRoutingDataSource) curDataSource).getResolvedDefaultDataSource());
}
assert curDataSource != null;
return curDataSource;
}
}
為了使得這個(gè)事務(wù)管理能夠生效,我們需要替換現(xiàn)有的 DataSourceTransactionManager Bean 定義:
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.transaction.TransactionManagerCustomizers;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import javax.sql.DataSource;
@Configuration
public class TransactionConfiguration {
/*
由于 spring-jdbc 定義的 DataSourceTransactionManager 是被 @ConditionalOnMissingBean 修飾的,因此我們
在這里直接定義 Bean 就可以重新覆蓋原有的 DataSourceTransactionManager 定義
*/
@Bean(name = "dynamicDataSourceTransactionManager")
public DataSourceTransactionManager dynamicDataSourceTransactionManager(
@Qualifier("dynamicDataSource") DataSource dataSource,
ObjectProvider<TransactionManagerCustomizers> transactionManagerCustomizers
) {
DynamicDataSourceTransactionManager transactionManager = new DynamicDataSourceTransactionManager(dataSource);
transactionManagerCustomizers.ifAvailable((customizers) -> customizers.customize(transactionManager));
return transactionManager;
}
}
現(xiàn)在,死鎖的問題便得到了順利地解決
多線程中的事務(wù)
由于 Spring 的事務(wù)信息是通過 ThreadLocal 控制的,因此在不同的線程中,Spring 事務(wù)便不能很好地工作,為了解決這個(gè)問題,我們可以在線程執(zhí)行任務(wù)前將現(xiàn)有線程關(guān)聯(lián)的事務(wù)信息綁定到當(dāng)前工作線程,當(dāng)出現(xiàn)異常時(shí),我們可以將這個(gè)事務(wù)信息標(biāo)記為 “只能回滾”,從而達(dá)到整體的一致性的目標(biāo)
以下面的例子為例:
public TransactionStatus run() {
/*
txManager 為創(chuàng)建任務(wù)時(shí)必須的 DataSourceTransactionManager 事務(wù)管理對象
resource 為之前事務(wù)所在線程綁定的資源對象,我們知道就是 DataSourceTransactionObject,持有數(shù)據(jù)庫連接的信息對象,
這樣,當(dāng)前線程中后續(xù)的 MyBatis 組件在獲取連接時(shí)也能夠復(fù)用現(xiàn)有的數(shù)據(jù)庫連接
*/
Object key = txManager.getResourceFactory();
TransactionSynchronizationManager.bindResource(key, resource);
TransactionStatus status = txManager.getTransaction(definition);
try {
runnable.run();
} catch (Throwable t) {
log.debug("任務(wù)執(zhí)行出現(xiàn)異常", t);
status.setRollbackOnly(); // 出現(xiàn)異常時(shí)將整個(gè)事務(wù)設(shè)置為只能回滾的狀態(tài)
} finally {
// 移除與當(dāng)前線程執(zhí)行的關(guān)聯(lián)關(guān)系,避免任務(wù)執(zhí)行過程中的資源混亂
TransactionSynchronizationManager.unbindResource(key);
}
return status;
}
具體 demo 地址:https://github.com/LiuXianghai-coder/Spring-Study/tree/master/spring-transaction
總結(jié)
以上是生活随笔為你收集整理的MyBatis—Spring 动态数据源事务的处理的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 调试分析Linux 0.00引导程序
- 下一篇: TypeChat、JSONSchemaC