日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 前端技术 > javascript >内容正文

javascript

Spring Boot Transaction 源码解析(一)

發布時間:2024/4/13 javascript 36 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spring Boot Transaction 源码解析(一) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

目錄

PlatformTransactionManager

TransactionStatus?

?DefaultTransactionStatus

AbstractPlatformTransactionManager

屬性

getTransaction

handleExistingTransaction

需要由子類實現的方法

commit

processCommit?

需要由子類實現的方法

rollback

processRollback?

需要由子類實現的方法

SuspendedResourcesHolder

事務狀態控制

TransactionSynchronization

TransactionSynchronizationManager

TransactionSynchronizationUtils


在使用Spring事務時,會使用@Transactional注解。@Transactional指定了PlatformTransactionManager

public @interface Transactional {@AliasFor("transactionManager")String value() default "";/*PlatformTransactionManager*/@AliasFor("value")String transactionManager() default "";Propagation propagation() default Propagation.REQUIRED;/*TransactionAttribute*/Isolation isolation() default Isolation.DEFAULT;int timeout() default TransactionDefinition.TIMEOUT_DEFAULT;boolean readOnly() default false;Class<? extends Throwable>[] rollbackFor() default {};String[] rollbackForClassName() default {};Class<? extends Throwable>[] noRollbackFor() default {};String[] noRollbackForClassName() default {};}

PlatformTransactionManager

PlatformTransactionManager接口定義了事務的相關的3個方法,包括獲取事務,提交和回滾。

public interface PlatformTransactionManager extends TransactionManager {TransactionStatus getTransaction(@Nullable TransactionDefinition definition)throws TransactionException;void commit(TransactionStatus status) throws TransactionException;void rollback(TransactionStatus status) throws TransactionException;}

上圖為PlatformTransactionManager接口的實現類,其中以DataSourceTransactionManager最為常用。

TransactionStatus?

TransactionStatus用于控制事務的狀態。

public interface TransactionStatus extends TransactionExecution, SavepointManager, Flushable {boolean hasSavepoint();@Overridevoid flush();}public interface TransactionExecution {/**是否一個新事務。 */boolean isNewTransaction();void setRollbackOnly(); //事務一定會回滾,用來代替拋出異常。boolean isRollbackOnly();boolean isCompleted();}public interface SavepointManager {Object createSavepoint() throws TransactionException;void rollbackToSavepoint(Object savepoint) throws TransactionException;void releaseSavepoint(Object savepoint) throws TransactionException;}

?DefaultTransactionStatus

@Nullableprivate final Object transaction;private final boolean newTransaction;private final boolean newSynchronization;private final boolean readOnly;private final boolean debug;@Nullableprivate final Object suspendedResources;

AbstractPlatformTransactionManager

AbstractPlatformTransactionManager是PlatformTransactionManager抽象實現,主要處理以下幾個工作:

  • 處理事務傳遞性
  • 處理掛起事務,恢復事務時,同步更新線程變量TransactionSynchronizationManager
  • 處理注冊的回調方法,就是同一種類,放到一個list,然后在特定時間取出來for循環調用響應方法
  • 定義了整個的開啟事務,處理事務的骨架。

    屬性

    private int transactionSynchronization = SYNCHRONIZATION_ALWAYS;private int defaultTimeout = TransactionDefinition.TIMEOUT_DEFAULT; //是否允許嵌套事務private boolean nestedTransactionAllowed = false; //是否檢驗已存在事務private boolean validateExistingTransaction = false; //部分失敗是否導致全局回滾。private boolean globalRollbackOnParticipationFailure = true;private boolean failEarlyOnGlobalRollbackOnly = false; //提交失敗是否回滾。private boolean rollbackOnCommitFailure = false;

    getTransaction

    public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)throws TransactionException {//如果不存在事務定義,則使用默認事務定義。TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());//獲取事務。由子類實現。Object transaction = doGetTransaction();boolean debugEnabled = logger.isDebugEnabled();//判斷事務已存在。if (isExistingTransaction(transaction)) {// 如果事務存在,根據事務傳播特性處理事務。return handleExistingTransaction(def, transaction, debugEnabled);}// 為新事務設置超時。if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());}// 如果不存在事務,并且傳播特性為PROPAGATION_MANDATORY,則拋出異常。if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {throw new IllegalTransactionStateException("No existing transaction found for transaction marked with propagation 'mandatory'");}else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {//掛起事務。SuspendedResourcesHolder suspendedResources = suspend(null);if (debugEnabled) {logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);}try {//判斷是否新的事務同步。boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);//新事務。DefaultTransactionStatus status = newTransactionStatus(def, transaction, true, newSynchronization, debugEnabled, suspendedResources);//開始事務。doBegin(transaction, def);prepareSynchronization(status, def);return status;}catch (RuntimeException | Error ex) {resume(null, suspendedResources);throw ex;}}else {// Create "empty" transaction: no actual transaction, but potentially synchronization.if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {logger.warn("Custom isolation level specified but no actual transaction initiated; " +"isolation level will effectively be ignored: " + def);}//以非事務方式執行。boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);}}

    handleExistingTransaction

    處理已存在事務的情況。

    //處理已經存在事務的情況。private TransactionStatus handleExistingTransaction(TransactionDefinition definition, Object transaction, boolean debugEnabled)throws TransactionException {//PROPAGATION_NEVER,如果當前存在事務,則拋出異常。if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {throw new IllegalTransactionStateException("Existing transaction found for transaction marked with propagation 'never'");}//PROPAGATION_NOT_SUPPORTED:以非事務方式執行操作,如果當前存在事務,就把當前事務掛起。if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {if (debugEnabled) {logger.debug("Suspending current transaction");}//把當前事務掛起Object suspendedResources = suspend(transaction);boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);return prepareTransactionStatus(definition, null, false, newSynchronization, debugEnabled, suspendedResources);}//PROPAGATION_REQUIRES_NEW:一直新建事務,如果當前存在事務,把當前事務掛起。if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {if (debugEnabled) {logger.debug("Suspending current transaction, creating new transaction with name [" +definition.getName() + "]");}//把當前事務掛起SuspendedResourcesHolder suspendedResources = suspend(transaction);try {boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);DefaultTransactionStatus status = newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);//內嵌事務開始執行。doBegin(transaction, definition);prepareSynchronization(status, definition);return status;}catch (RuntimeException | Error beginEx) {resumeAfterBeginException(transaction, suspendedResources, beginEx);throw beginEx;}}//如果當前存在事務,則在嵌套事務內執行。if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {if (!isNestedTransactionAllowed()) {throw new NestedTransactionNotSupportedException("Transaction manager does not allow nested transactions by default - " +"specify 'nestedTransactionAllowed' property with value 'true'");}if (debugEnabled) {logger.debug("Creating nested transaction with name [" + definition.getName() + "]");}if (useSavepointForNestedTransaction()) {// Create savepoint within existing Spring-managed transaction,// through the SavepointManager API implemented by TransactionStatus.// Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.DefaultTransactionStatus status =prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);status.createAndHoldSavepoint();return status;}else {boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);DefaultTransactionStatus status = newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, null);doBegin(transaction, definition);prepareSynchronization(status, definition);return status;}}// Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.if (debugEnabled) {logger.debug("Participating in existing transaction");}//檢查已存在事務。if (isValidateExistingTransaction()) {if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {Constants isoConstants = DefaultTransactionDefinition.constants;throw new IllegalTransactionStateException("Participating transaction with definition [" +definition + "] specifies isolation level which is incompatible with existing transaction: " +(currentIsolationLevel != null ?isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :"(unknown)"));}}if (!definition.isReadOnly()) {if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {throw new IllegalTransactionStateException("Participating transaction with definition [" +definition + "] is not marked as read-only but existing transaction is");}}}boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);} protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {if (status.isNewSynchronization()) {TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?definition.getIsolationLevel() : null);TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());TransactionSynchronizationManager.initSynchronization();}}

    ?

    protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {//判斷是否激活同步。if (TransactionSynchronizationManager.isSynchronizationActive()) {List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();try {Object suspendedResources = null;if (transaction != null) {suspendedResources = doSuspend(transaction);}//以下都是復制狀態。String name = TransactionSynchronizationManager.getCurrentTransactionName();TransactionSynchronizationManager.setCurrentTransactionName(null);boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();TransactionSynchronizationManager.setActualTransactionActive(false);return new SuspendedResourcesHolder(suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);}catch (RuntimeException | Error ex) {// doSuspend failed - original transaction is still active...doResumeSynchronization(suspendedSynchronizations);throw ex;}}else if (transaction != null) {// Transaction active but no synchronization active.Object suspendedResources = doSuspend(transaction);return new SuspendedResourcesHolder(suspendedResources);}else {// Neither transaction nor synchronization active.return null;}}

    需要由子類實現的方法

    protected abstract Object doGetTransaction() throws TransactionException;protected abstract void doBegin(Object transaction, TransactionDefinition definition) throws TransactionException;protected Object doSuspend(Object transaction) throws TransactionException {throw new TransactionSuspensionNotSupportedException("Transaction manager [" + getClass().getName() + "] does not support transaction suspension");}

    commit

    @Overridepublic final void commit(TransactionStatus status) throws TransactionException {//不可重復提交if (status.isCompleted()) {throw new IllegalTransactionStateException("Transaction is already completed - do not call commit or rollback more than once per transaction");}DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;// 如果在事務鏈中已經被標記回滾,那么不會嘗試提交事務,直接回滾if (defStatus.isLocalRollbackOnly()) {if (defStatus.isDebug()) {logger.debug("Transactional code has requested rollback");}// 這里會進行回滾,不拋出一個異常processRollback(defStatus, false);return;}//設置了全局回滾。if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {if (defStatus.isDebug()) {logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");}// 這里會進行回滾,并且拋出一個異常processRollback(defStatus, true);return;}//執行提交processCommit(defStatus);}

    processCommit?

    private void processCommit(DefaultTransactionStatus status) throws TransactionException {try {boolean beforeCompletionInvoked = false;try {boolean unexpectedRollback = false;//準備提交prepareForCommit(status);//提交前triggerBeforeCommit(status);//triggerBeforeCompletion(status);beforeCompletionInvoked = true;if (status.hasSavepoint()) {//有安全點,說明此時的事務是嵌套事務NESTED,這個事務外面還有事務,這里不提交,只是釋放保存點。if (status.isDebug()) {logger.debug("Releasing transaction savepoint");}unexpectedRollback = status.isGlobalRollbackOnly();status.releaseHeldSavepoint();}else if (status.isNewTransaction()) {// 判斷是否是新事務。這里如果是子事務,只有PROPAGATION_NESTED狀態才會走到這里提交,也說明了此狀態子事務提交和外層事務是隔離的if (status.isDebug()) {logger.debug("Initiating transaction commit");}unexpectedRollback = status.isGlobalRollbackOnly();// 這里才真正去提交!doCommit(status);}else if (isFailEarlyOnGlobalRollbackOnly()) {unexpectedRollback = status.isGlobalRollbackOnly();}// Throw UnexpectedRollbackException if we have a global rollback-only// marker but still didn't get a corresponding exception from commit.if (unexpectedRollback) {throw new UnexpectedRollbackException("Transaction silently rolled back because it has been marked as rollback-only");}}catch (UnexpectedRollbackException ex) {// 提交失敗處理。triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);throw ex;}catch (TransactionException ex) {// can only be caused by doCommitif (isRollbackOnCommitFailure()) {doRollbackOnCommitException(status, ex);}else {triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);}throw ex;}catch (RuntimeException | Error ex) {if (!beforeCompletionInvoked) {triggerBeforeCompletion(status);}doRollbackOnCommitException(status, ex);throw ex;}// Trigger afterCommit callbacks, with an exception thrown there// propagated to callers but the transaction still considered as committed.try {//提交成功觸發triggerAfterCommit(status);}finally {triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);}}finally {cleanupAfterCompletion(status);}}

    需要由子類實現的方法

    protected abstract void doCommit(DefaultTransactionStatus status) throws TransactionException;

    rollback

    @Overridepublic final void rollback(TransactionStatus status) throws TransactionException {if (status.isCompleted()) {throw new IllegalTransactionStateException("Transaction is already completed - do not call commit or rollback more than once per transaction");}DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;processRollback(defStatus, false);}

    processRollback?

    private void processRollback(DefaultTransactionStatus status, boolean unexpected) {try {boolean unexpectedRollback = unexpected;try {//處理前觸發器triggerBeforeCompletion(status);//如果有保存點,僅回滾到保存點if (status.hasSavepoint()) {if (status.isDebug()) {logger.debug("Rolling back transaction to savepoint");}status.rollbackToHeldSavepoint();}else if (status.isNewTransaction()) {if (status.isDebug()) {logger.debug("Initiating transaction rollback");}//新的事務,則執行新事務的回滾。doRollback(status);}else {// Participating in larger transactionif (status.hasTransaction()) {if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {if (status.isDebug()) {logger.debug("Participating transaction failed - marking existing transaction as rollback-only");}doSetRollbackOnly(status);}else {if (status.isDebug()) {logger.debug("Participating transaction failed - letting transaction originator decide on rollback");}}}else {logger.debug("Should roll back transaction but cannot - no transaction available");}// Unexpected rollback only matters here if we're asked to fail earlyif (!isFailEarlyOnGlobalRollbackOnly()) {unexpectedRollback = false;}}}catch (RuntimeException | Error ex) {triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);throw ex;}//處理后觸發器triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);// Raise UnexpectedRollbackException if we had a global rollback-only markerif (unexpectedRollback) {throw new UnexpectedRollbackException("Transaction rolled back because it has been marked as rollback-only");}}finally {//處理后清理。cleanupAfterCompletion(status);}}

    ?

    需要由子類實現的方法

    protected abstract void doRollback(DefaultTransactionStatus status) throws TransactionException;

    SuspendedResourcesHolder

    掛起事務狀態Holder,用于保存掛起事務信息,以便后面回復。

    //掛起的資源private final Object suspendedResources;//事務同步器private List<TransactionSynchronization> suspendedSynchronizations;//事務名稱private String name;//是否只讀事務private boolean readOnly;//隔離級別private Integer isolationLevel;//是否激活private boolean wasActive;

    事務狀態控制

    TransactionSynchronization

    TransactionSynchronization接口定義了一系列的回調方法,對應一個事務執行的不同階段:掛起、恢復、flush、提交(前、后)、完成(事務成功或失敗)等。當事務運行到對應階段時,事務管理器會從TransactionSynchronizationManager維護的synchronizations中拿出所有的回調器,逐個回調其中的對應方法。

    public interface TransactionSynchronization extends Flushable {/** 正確提交時完成狀態 */int STATUS_COMMITTED = 0;/** 回滾后完成狀態 */int STATUS_ROLLED_BACK = 1;/** 未知狀態:混合完成或系統錯誤 */int STATUS_UNKNOWN = 2;/**掛起同步器。從TransactionSynchronizationManager 解綁資源*/default void suspend() {}/**繼續同步器。重新為TransactionSynchronizationManager 綁定資源*/default void resume() {}/**Flush指定sessioin到數據存儲*/@Overridedefault void flush() {}/*** 事務提交前調用(before "beforeCompletion").*/default void beforeCommit(boolean readOnly) {}/*** 事務commit/rollback完成前調用.在beforeCommit之后,忽略beforeCommit是否拋出異常,任何返回結果。不要在子類拋出TransactionException 異常。*/default void beforeCompletion() {}/*** 事務提交后調用. 事務已經提交,但是事務資源可能是激活狀態和可訪問。* 不要在子類拋出TransactionException 異常。*/default void afterCommit() {}/*** 事務 提交后調用,可以清理資源了。不要在子類拋出TransactionException 異常。*/default void afterCompletion(int status) {}} public abstract class TransactionSynchronizationAdapter implements TransactionSynchronization, Ordered {@Overridepublic int getOrder() {return Ordered.LOWEST_PRECEDENCE;}@Overridepublic void suspend() {}@Overridepublic void resume() {}@Overridepublic void flush() {}@Overridepublic void beforeCommit(boolean readOnly) {}@Overridepublic void beforeCompletion() {}@Overridepublic void afterCommit() {}@Overridepublic void afterCompletion(int status) {}}

    TransactionSynchronizationManager

    TransactionSynchronizationManager用于管理TransactionSynchronization,由一系列的ThreadLocal對象構成。ThreadLocal類型屬性與SuspendedResourcesHolder屬性基本對應,都是事務狀態信息。

    public abstract class TransactionSynchronizationManager {//線程上下文中保存著【線程池對象:ConnectionHolder】的Map對象。線程可以通過該屬性獲取到同一個Connection對象。private static final ThreadLocal< Map<Object, Object> > resources = new NamedThreadLocal<>("Transactional resources");//事務同步器,是Spring交由程序員進行擴展的代碼,每個線程可以注冊N個事務同步器。private static final ThreadLocal< Set<TransactionSynchronization> > synchronizations = new NamedThreadLocal<>("Transaction synchronizations");// 事務的名稱 private static final ThreadLocal< String > currentTransactionName = new NamedThreadLocal<>("Current transaction name");// 事務是否是只讀 private static final ThreadLocal< Boolean > currentTransactionReadOnly = new NamedThreadLocal<>("Current transaction read-only status");// 事務的隔離級別private static final ThreadLocal< Integer > currentTransactionIsolationLevel = new NamedThreadLocal<>("Current transaction isolation level");// 事務是否開啟 actual:真實的private static final ThreadLocal< Boolean > actualTransactionActive = new NamedThreadLocal<>("Actual transaction active"); }

    TransactionSynchronizationUtils

    TransactionSynchronizationUtils執行同步器操作,內部調用TransactionSynchronizationManager.getSynchronizations()獲取所有同步器,然后循環調用對應操作。

    public static void triggerBeforeCompletion() {for (TransactionSynchronization synchronization : TransactionSynchronizationManager.getSynchronizations()) {try {synchronization.beforeCompletion();}catch (Throwable tsex) {logger.error("TransactionSynchronization.beforeCompletion threw exception", tsex);}}}

    ?

    超強干貨來襲 云風專訪:近40年碼齡,通宵達旦的技術人生

    總結

    以上是生活随笔為你收集整理的Spring Boot Transaction 源码解析(一)的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。