日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 前端技术 > javascript >内容正文

javascript

Spring中异步注解@Async的使用、原理及使用时可能导致的问题

發布時間:2025/3/16 javascript 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spring中异步注解@Async的使用、原理及使用时可能导致的问题 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

前言

最近,很多同學碰到了下面這個問題,添加了Spring提供的一個異步注解@Async循環依賴無法被解決了,下面是一些讀者的留言跟群里同學碰到的問題:

本著講一個知識點就要講明白、講透徹的原則,我決定單獨寫一篇這樣的文章對@Async這個注解做一下詳細的介紹,這個注解帶來的問題遠遠不止循環依賴這么簡單,如果對它不夠熟悉的話建議慎用。

??

文章要點

??

@Async的基本使用

這個注解的作用在于可以讓被標注的方法異步執行,但是有兩個前提條件

1. 配置類上添加@EnableAsync注解

2. 需要異步執行的方法的所在類由Spring管理

3. 需要異步執行的方法上添加了@Async注解

我們通過一個Demo體會下這個注解的作用吧

第一步,配置類上開啟異步:

@EnableAsync @Configuration @ComponentScan("com.dmz.spring.async") public?class?Config?{}

第二步,

@Component??//?這個類本身要被Spring管理 public?class?DmzAsyncService?{@Async??//?添加注解表示這個方法要異步執行public?void?testAsync(){try?{TimeUnit.SECONDS.sleep(1);}?catch?(InterruptedException?e)?{e.printStackTrace();}System.out.println("testAsync?invoked");} }

第三步,測試異步執行

public?class?Main?{public?static?void?main(String[]?args)?{AnnotationConfigApplicationContext?ac?=?new?AnnotationConfigApplicationContext(Config.class);DmzAsyncService?bean?=?ac.getBean(DmzAsyncService.class);bean.testAsync();System.out.println("main函數執行完成");} } //?程序執行結果如下: //?main函數執行完成 //?testAsync?invoked

通過上面的例子我們可以發現,DmzAsyncService中的testAsync方法是異步執行的,那么這背后的原理是什么呢?我們接著分析

?

原理分析

我們在分析某一個技術的時候,最重要的事情是,一定一定要找到代碼的入口,像Spring這種都很明顯,入口必定是在@EnableAsync這個注解上面,我們來看看這個注解干了啥事(本文基于5.2.x版本)

@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented //?這里是重點,導入了一個ImportSelector @Import(AsyncConfigurationSelector.class) public?@interface?EnableAsync?{//?這個配置可以讓程序員配置需要被檢查的注解,默認情況下檢查的就是@Async注解Class<??extends?Annotation>?annotation()?default?Annotation.class;//?默認使用jdk代理boolean?proxyTargetClass()?default?false;//?默認使用Spring?AOPAdviceMode?mode()?default?AdviceMode.PROXY;//?在后續分析我們會發現,這個注解實際往容器中添加了一個//?AsyncAnnotationBeanPostProcessor,這個后置處理器實現了Ordered接口//?這個配置主要代表了AsyncAnnotationBeanPostProcessor執行的順序int?order()?default?Ordered.LOWEST_PRECEDENCE; }

上面這個注解做的最重要的事情就是導入了一個AsyncConfigurationSelector,這個類的源碼如下:

public?class?AsyncConfigurationSelector?extends?AdviceModeImportSelector<EnableAsync>?{private?static?final?String?ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME?="org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";@Override@Nullablepublic?String[]?selectImports(AdviceMode?adviceMode)?{switch?(adviceMode)?{//?默認會使用SpringAOP進行代理case?PROXY:return?new?String[]?{ProxyAsyncConfiguration.class.getName()};case?ASPECTJ:return?new?String[]?{ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};default:return?null;}}}

這個類的作用是像容器中注冊了一個ProxyAsyncConfiguration,這個類的繼承關系如下:


我們先看下它的父類AbstractAsyncConfiguration,其源碼如下:

@Configuration public?abstract?class?AbstractAsyncConfiguration?implements?ImportAware?{@Nullableprotected?AnnotationAttributes?enableAsync;@Nullableprotected?Supplier<Executor>?executor;@Nullableprotected?Supplier<AsyncUncaughtExceptionHandler>?exceptionHandler;//?這里主要就是檢查將其導入的類上是否有EnableAsync注解//?如果沒有的話就報錯@Overridepublic?void?setImportMetadata(AnnotationMetadata?importMetadata)?{this.enableAsync?=?AnnotationAttributes.fromMap(importMetadata.getAnnotationAttributes(EnableAsync.class.getName(),?false));if?(this.enableAsync?==?null)?{throw?new?IllegalArgumentException("@EnableAsync?is?not?present?on?importing?class?"?+?importMetadata.getClassName());}}//?將容器中配置的AsyncConfigurer注入//?異步執行嘛,所以我們可以配置使用的線程池//?另外也可以配置異常處理器@Autowired(required?=?false)void?setConfigurers(Collection<AsyncConfigurer>?configurers)?{if?(CollectionUtils.isEmpty(configurers))?{return;}if?(configurers.size()?>?1)?{throw?new?IllegalStateException("Only?one?AsyncConfigurer?may?exist");}AsyncConfigurer?configurer?=?configurers.iterator().next();this.executor?=?configurer::getAsyncExecutor;this.exceptionHandler?=?configurer::getAsyncUncaughtExceptionHandler;}}

再來看看ProxyAsyncConfiguration這個類的源碼

@Configuration @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public?class?ProxyAsyncConfiguration?extends?AbstractAsyncConfiguration?{@Bean(name?=?TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)@Role(BeanDefinition.ROLE_INFRASTRUCTURE)public?AsyncAnnotationBeanPostProcessor?asyncAdvisor()?{AsyncAnnotationBeanPostProcessor?bpp?=?new?AsyncAnnotationBeanPostProcessor();//?將通過AsyncConfigurer配置好的線程池跟異常處理器設置到這個后置處理器中bpp.configure(this.executor,?this.exceptionHandler);Class<??extends?Annotation>?customAsyncAnnotation?=?this.enableAsync.getClass("annotation");if?(customAsyncAnnotation?!=?AnnotationUtils.getDefaultValue(EnableAsync.class,?"annotation"))?{bpp.setAsyncAnnotationType(customAsyncAnnotation);}bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));return?bpp;}}

這個類本身是一個配置類,它的作用是向容器中添加一個AsyncAnnotationBeanPostProcessor。到這一步我們基本上就可以明白了,@Async注解的就是通過AsyncAnnotationBeanPostProcessor這個后置處理器生成一個代理對象來實現異步的,接下來我們就具體看看AsyncAnnotationBeanPostProcessor是如何生成代理對象的,我們主要關注一下幾點即可:

  • 是在生命周期的哪一步完成的代理?

  • 切點的邏輯是怎么樣的?它會對什么樣的類進行攔截?

  • 通知的邏輯是怎么樣的?是如何實現異步的?

  • 基于上面幾個問題,我們進行逐一分析

    是在生命周期的哪一步完成的代理?

    我們抓住重點,AsyncAnnotationBeanPostProcessor是一個后置處理器器,按照我們對Spring的了解,大概率是在這個后置處理器的postProcessAfterInitialization方法中完成了代理,直接定位到這個方法,這個方法位于父類AbstractAdvisingBeanPostProcessor中,具體代碼如下:

    public?Object?postProcessAfterInitialization(Object?bean,?String?beanName)?{//?沒有通知,或者是AOP的基礎設施類,那么不進行代理if?(this.advisor?==?null?||?bean?instanceof?AopInfrastructureBean)?{return?bean;}//?對已經被代理的類,不再生成代理,只是將通知添加到代理類的邏輯中//?這里通過beforeExistingAdvisors決定是將通知添加到所有通知之前還是添加到所有通知之后//?在使用@Async注解的時候,beforeExistingAdvisors被設置成了true//?意味著整個方法及其攔截邏輯都會異步執行if?(bean?instanceof?Advised)?{Advised?advised?=?(Advised)?bean;if?(!advised.isFrozen()?&&?isEligible(AopUtils.getTargetClass(bean)))?{if?(this.beforeExistingAdvisors)?{advised.addAdvisor(0,?this.advisor);}else?{advised.addAdvisor(this.advisor);}return?bean;}}//?判斷需要對哪些Bean進行來代理if?(isEligible(bean,?beanName))?{ProxyFactory?proxyFactory?=?prepareProxyFactory(bean,?beanName);if?(!proxyFactory.isProxyTargetClass())?{evaluateProxyInterfaces(bean.getClass(),?proxyFactory);}proxyFactory.addAdvisor(this.advisor);customizeProxyFactory(proxyFactory);return?proxyFactory.getProxy(getProxyClassLoader());}return?bean; }

    果不其然,確實是在這個方法中完成的代理。接著我們就要思考,切點的過濾規則是什么呢?

    切點的邏輯是怎么樣的?

    其實也不難猜到肯定就是類上添加了@Async注解或者類中含有被@Async注解修飾的方法。基于此,我們看看這個isEligible這個方法的實現邏輯,這個方位位于AbstractBeanFactoryAwareAdvisingPostProcessor中,也是AsyncAnnotationBeanPostProcessor的父類,對應代碼如下:

    //?AbstractBeanFactoryAwareAdvisingPostProcessor的isEligible方法 //?調用了父類 protected?boolean?isEligible(Object?bean,?String?beanName)?{return?(!AutoProxyUtils.isOriginalInstance(beanName,?bean.getClass())?&&super.isEligible(bean,?beanName)); }protected?boolean?isEligible(Object?bean,?String?beanName)?{return?isEligible(bean.getClass()); }protected?boolean?isEligible(Class<?>?targetClass)?{Boolean?eligible?=?this.eligibleBeans.get(targetClass);if?(eligible?!=?null)?{return?eligible;}if?(this.advisor?==?null)?{return?false;}//?這里完成的判斷eligible?=?AopUtils.canApply(this.advisor,?targetClass);this.eligibleBeans.put(targetClass,?eligible);return?eligible; }

    實際上最后就是根據advisor來確定是否要進行代理,advisor實際就是一個綁定了切點的通知,那么AsyncAnnotationBeanPostProcessor這個advisor是什么時候被初始化的呢?我們直接定位到AsyncAnnotationBeanPostProcessor的setBeanFactory方法,其源碼如下:

    public?void?setBeanFactory(BeanFactory?beanFactory)?{super.setBeanFactory(beanFactory);//?在這里new了一個AsyncAnnotationAdvisorAsyncAnnotationAdvisor?advisor?=?new?AsyncAnnotationAdvisor(this.executor,?this.exceptionHandler);if?(this.asyncAnnotationType?!=?null)?{advisor.setAsyncAnnotationType(this.asyncAnnotationType);}advisor.setBeanFactory(beanFactory);//?完成了初始化this.advisor?=?advisor; }

    我們來看看AsyncAnnotationAdvisor中的切點匹配規程是怎么樣的,直接定位到這個類的buildPointcut方法中,其源碼如下:

    protected?Pointcut?buildPointcut(Set<Class<??extends?Annotation>>?asyncAnnotationTypes)?{ComposablePointcut?result?=?null;for?(Class<??extends?Annotation>?asyncAnnotationType?:?asyncAnnotationTypes)?{//?就是根據這兩個匹配器進行匹配的Pointcut?cpc?=?new?AnnotationMatchingPointcut(asyncAnnotationType,?true);Pointcut?mpc?=?new?AnnotationMatchingPointcut(null,?asyncAnnotationType,?true);if?(result?==?null)?{result?=?new?ComposablePointcut(cpc);}else?{result.union(cpc);}result?=?result.union(mpc);}return?(result?!=?null???result?:?Pointcut.TRUE); }

    代碼很簡單,就是根據cpc跟mpc兩個匹配器來進行匹配的,第一個是檢查類上是否有@Async注解,第二個是檢查方法是是否有@Async注解。

    那么,到現在為止,我們已經知道了它在何時創建代理,會為什么對象創建代理,最后我們還需要解決一個問題,代理的邏輯是怎么樣的,異步到底是如何實現的?

    通知的邏輯是怎么樣的?是如何實現異步的?

    前面也提到了advisor是一個綁定了切點的通知,前面分析了它的切點,那么現在我們就來看看它的通知邏輯,直接定位到AsyncAnnotationAdvisor中的buildAdvice方法,源碼如下:

    protected?Advice?buildAdvice(@Nullable?Supplier<Executor>?executor,?@Nullable?Supplier<AsyncUncaughtExceptionHandler>?exceptionHandler)?{AnnotationAsyncExecutionInterceptor?interceptor?=?new?AnnotationAsyncExecutionInterceptor(null);interceptor.configure(executor,?exceptionHandler);return?interceptor; }

    簡單吧,加了一個攔截器而已,對于interceptor類型的對象,我們關注它的核心方法invoke就行了,代碼如下:

    public?Object?invoke(final?MethodInvocation?invocation)?throws?Throwable?{Class<?>?targetClass?=?(invocation.getThis()?!=?null???AopUtils.getTargetClass(invocation.getThis())?:?null);Method?specificMethod?=?ClassUtils.getMostSpecificMethod(invocation.getMethod(),?targetClass);final?Method?userDeclaredMethod?=?BridgeMethodResolver.findBridgedMethod(specificMethod);//?異步執行嘛,先獲取到一個線程池AsyncTaskExecutor?executor?=?determineAsyncExecutor(userDeclaredMethod);if?(executor?==?null)?{throw?new?IllegalStateException("No?executor?specified?and?no?default?executor?set?on?AsyncExecutionInterceptor?either");}//?然后將這個方法封裝成一個?Callable對象傳入到線程池中執行Callable<Object>?task?=?()?->?{try?{Object?result?=?invocation.proceed();if?(result?instanceof?Future)?{return?((Future<?>)?result).get();}}catch?(ExecutionException?ex)?{handleError(ex.getCause(),?userDeclaredMethod,?invocation.getArguments());}catch?(Throwable?ex)?{handleError(ex,?userDeclaredMethod,?invocation.getArguments());}return?null;};//?將任務提交到線程池return?doSubmit(task,?executor,?invocation.getMethod().getReturnType()); }


    ??

    導致的問題及解決方案

    問題1:循環依賴報錯

    就像在這張圖里這個讀者問的問題,

    分為兩點回答:

    第一:循環依賴為什么不能被解決?

    這個問題其實很簡單,我從兩個方面分析了循環依賴的處理流程

  • 簡單對象間的循環依賴處理

  • AOP對象間的循環依賴處理

  • 按照這種思路,@Async注解導致的循環依賴應該屬于AOP對象間的循環依賴,也應該能被處理。但是,重點來了,解決AOP對象間循環依賴的核心方法是三級緩存,如下:


    在三級緩存緩存了一個工廠對象,這個工廠對象會調用getEarlyBeanReference方法來獲取一個早期的代理對象的引用,其源碼如下:

    protected?Object?getEarlyBeanReference(String?beanName,?RootBeanDefinition?mbd,?Object?bean)?{Object?exposedObject?=?bean;if?(!mbd.isSynthetic()?&&?hasInstantiationAwareBeanPostProcessors())?{for?(BeanPostProcessor?bp?:?getBeanPostProcessors())?{//?看到這個判斷了嗎,通過@EnableAsync導入的后置處理器//?AsyncAnnotationBeanPostProcessor根本就不是一個SmartInstantiationAwareBeanPostProcessor//?這就意味著即使我們通過AsyncAnnotationBeanPostProcessor創建了一個代理對象//?但是早期暴露出去的用于給別的Bean進行注入的那個對象還是原始對象if?(bp?instanceof?SmartInstantiationAwareBeanPostProcessor)?{SmartInstantiationAwareBeanPostProcessor?ibp?=?(SmartInstantiationAwareBeanPostProcessor)?bp;exposedObject?=?ibp.getEarlyBeanReference(exposedObject,?beanName);}}}return?exposedObject; }

    看完上面的代碼循環依賴的問題就很明顯了,因為早期暴露的對象跟最終放入容器中的對象不是同一個,所以報錯了。

    解決方案

    就以上面讀者給出的Demo為例,只需要在為B注入A時添加一個@Lazy注解即可

    @Component public?class?B?implements?BService?{@Autowired@Lazyprivate?A?a;public?void?doSomething()?{} }

    這個注解的作用在于,當為B注入A時,會為A生成一個代理對象注入到B中,當真正調用代理對象的方法時,底層會調用getBean(a)去創建A對象,然后調用方法,這個注解的處理時機是在org.springframework.beans.factory.support.DefaultListableBeanFactory#resolveDependency方法中,處理這個注解的代碼位于org.springframework.context.annotation.ContextAnnotationAutowireCandidateResolver#buildLazyResolutionProxy

    問題2:默認線程池不會復用線程

    我覺得這是這個注解最坑的地方,沒有之一!我們來看看它默認使用的線程池是哪個,在前文的源碼分析中,我們可以看到決定要使用線程池的方法是org.springframework.aop.interceptor.AsyncExecutionAspectSupport#determineAsyncExecutor。其源碼如下:

    protected?AsyncTaskExecutor?determineAsyncExecutor(Method?method)?{AsyncTaskExecutor?executor?=?this.executors.get(method);if?(executor?==?null)?{Executor?targetExecutor;//?可以在@Async注解中配置線程池的名字String?qualifier?=?getExecutorQualifier(method);if?(StringUtils.hasLength(qualifier))?{targetExecutor?=?findQualifiedExecutor(this.beanFactory,?qualifier);}else?{//?獲取默認的線程池targetExecutor?=?this.defaultExecutor.get();}if?(targetExecutor?==?null)?{return?null;}executor?=?(targetExecutor?instanceof?AsyncListenableTaskExecutor??(AsyncListenableTaskExecutor)?targetExecutor?:?new?TaskExecutorAdapter(targetExecutor));this.executors.put(method,?executor);}return?executor; }

    最終會調用到org.springframework.aop.interceptor.AsyncExecutionInterceptor#getDefaultExecutor這個方法中

    protected?Executor?getDefaultExecutor(@Nullable?BeanFactory?beanFactory)?{Executor?defaultExecutor?=?super.getDefaultExecutor(beanFactory);return?(defaultExecutor?!=?null???defaultExecutor?:?new?SimpleAsyncTaskExecutor()); }

    可以看到,它默認使用的線程池是SimpleAsyncTaskExecutor。我們不看這個類的源碼,只看它上面的文檔注釋,如下:


    主要說了三點

  • 為每個任務新起一個線程

  • 默認線程數不做限制

  • 不復用線程

  • 就這三點,你還敢用嗎?只要你的任務耗時長一點,說不定服務器就給你來個OOM。

    解決方案

    最好的辦法就是使用自定義的線程池,主要有這么幾種配置方法

  • 在之前的源碼分析中,我們可以知道,可以通過AsyncConfigurer來配置使用的線程池

  • 如下:

    public?class?DmzAsyncConfigurer?implements?AsyncConfigurer?{@Overridepublic?Executor?getAsyncExecutor()?{//?創建自定義的線程池} }
  • 直接在@Async注解中配置要使用的線程池的名稱

  • 如下:

    public?class?A?implements?AService?{private?B?b;@Autowiredpublic?void?setB(B?b)?{System.out.println(b);this.b?=?b;}@Async("dmzExecutor")public?void?doSomething()?{} }@EnableAsync @Configuration @ComponentScan("com.dmz.spring.async") @Aspect public?class?Config?{@Bean("dmzExecutor")public?Executor?executor(){//?創建自定義的線程池return?executor;} }


    ?

    總結

    本文主要介紹了Spring中異步注解的使用、原理及可能碰到的問題,針對每個問題文中也給出了方案。希望通過這篇文章能幫助你徹底掌握@Async注解的使用,知其然并知其所以然!

    有道無術,術可成;有術無道,止于術

    歡迎大家關注Java之道公眾號

    好文章,我在看??

    總結

    以上是生活随笔為你收集整理的Spring中异步注解@Async的使用、原理及使用时可能导致的问题的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。