聊透Spring事件機制

語言: CN / TW / HK

 事件機制是Spring為企業級開發提供的神兵利器之一,它提供了一種低耦合、無侵入的解決方式,是我們行走江湖必備保命技能。但其實Spring事件的設計其實並不複雜,它由三部分組成:事件、釋出器、監聽器。事件是主體,釋出器負責釋出事件,監聽器負責處理事件。
image.png

 在簡單瞭解Spring事件的機制之後,本文將從原始碼的角度出發,和大家一起探討:Spring事件的核心工作機制,並看一下作為企業級開發工具,Spring事件是如何支援全域性異常處理和非同步執行的。最後會和大家討論目前Spring事件機制的一些缺陷和問題,話不多說,我們開始吧。

image.png

1. Spring事件如何使用

 所謂千里之行始於足下,在研究Spring的事件的機制之前,我們先來看一下Spring事件是如何使用的。通常情況下,我們使用自定義事件和內建事件,自定義事件主要是配合業務使用,自定義事件則多是做系統啟動時的初始化工作或者收尾工作。

1.1 自定義事件的使用

  • 定義自定義事件
     自定義一個事件在使用上很簡單,繼承ApplicationEvent即可: ```java // 事件需要繼承ApplicationEvent public class MyApplicationEvent extends ApplicationEvent { private Long id; public MyApplicationEvent(Long id) { super(id); this.id = id; }

    public Long getId() { return id; } } - **釋出自定義事件**<br> &emsp;現在自定義事件已經有了,該如何進行釋出呢?Spring提供了`ApplicationEventPublisher`進行事件的釋出,我們平常使用最多的`ApplicationContext`也繼承了該釋出器,所以我們可以直接使用applicationContext進行事件的釋出。java // 釋出MyApplicationEvent型別事件 applicationContext.publishEvent(new MyApplicationEvent(1L)); - **處理自定義事件**<br> &emsp;現在事件已經發布了,誰負責處理事件呢?當然是監聽器了,Spring要求監聽器需要實現`ApplicationListener`介面,同時需要`通過泛型引數指定處理的事件型別`。有了監聽器需要處理的事件型別資訊,Spring在進行事件廣播的時候,就能找到需要廣播的監聽器了,從而準確傳遞事件了。java // 需要繼承ApplicationListener,並指定事件型別 public class MyEventListener implements ApplicationListener { // 處理指定型別的事件 @Override public void onApplicationEvent(MyApplicationEvent event) { System.out.println(Thread.currentThread().getName() + "接受到事件:"+event.getSource()); } } ```

1.2 Spring內建事件

1.2.1 ContextRefreshedEvent

 在ConfigurableApplicationContextrefresh()執行完成時,會發出ContextRefreshedEvent事件。refresh()是Spring最核心的方法,該方法內部完成的Spring容器的啟動,是研究Spring的重中之重。在該方法內部,當Spring容器啟動完成,會在finishRefresh()發出ContextRefreshedEvent事件,通知容器重新整理完成。我們一起來看一下原始碼:

```java // ConfigurableApplicationContext.java public void refresh() throws BeansException, IllegalStateException { try { // ...省略部分非關鍵程式碼 //完成普通單例Bean的例項化(非延遲的) this.finishBeanFactoryInitialization(beanFactory);

    // 初始化宣告週期處理器,併發出對應的時間通知
    this.finishRefresh();
}

}

protected void finishRefresh() { // ...省略部分非核心程式碼 // 釋出上下文已經重新整理完成的事件 this.publishEvent(new ContextRefreshedEvent(this)); } ```

 其實這是Spring提供給我們的拓展點,此時容器已經啟動完成,容器中的bean也已經建立完成,對應的屬性、init()、Aware回撥等,也全部執行。很適合我們做一些系統啟動後的準備工作,此時我們就可以監聽該事件,作為系統啟動後初始預熱的契機。其實Spring內部也是這樣使用ContextRefreshedEvent的, 比如我們常用的Spring內建的排程器,就是在接收到該事件後,才進行排程器的執行的。

java public class ScheduledAnnotationBeanPostProcessor implements ApplicationListener<ContextRefreshedEvent> { @Override public void onApplicationEvent(ContextRefreshedEvent event) { if (event.getApplicationContext() == this.applicationContext) { finishRegistration(); } } }

1.2.2 ContextStartedEvent

 在ConfigurableApplicationContextstart()執行完成時,會發出ContextStartedEvent事件。

java @Override public void start() { this.getLifecycleProcessor().start(); this.publishEvent(new ContextStartedEvent(this)); }ContextRefreshedEvent事件的觸發是所有的單例bean建立完成後釋出,此時實現了Lifecycle介面的bean還沒有回撥start(),當這些start()被呼叫後,才會釋出ContextStartedEvent事件。

1.2.3 ContextClosedEvent

 在ConfigurableApplicationContextclose()執行完成時,會發出ContextStartedEvent事件。此時IOC容器已經關閉,但尚未銷燬所有的bean。 ```java @Override public void close() { synchronized (this.startupShutdownMonitor) { this.doClose(); } }

protected void doClose() { // 釋出ContextClosedEvent事件 this.publishEvent(new ContextClosedEvent(this)); } ```

1.2.4 ContextStoppedEvent

 在ConfigurableApplicationContextstop()執行完成時,會發出ContextStartedEvent事件。 java @Override public void stop() { this.getLifecycleProcessor().stop(); this.publishEvent(new ContextStoppedEvent(this)); }  該事件在ContextClosedEvent事件觸發之後才會觸發,此時單例bean還沒有被銷燬,要先把他們都停掉才可以釋放資源,銷燬bean。

2. Spring事件是如何運轉的

 經過第一章節的探討,我們已經清楚Spring事件是如何使用的,然而這只是皮毛而已,我們的目標是把Spring事件機制脫光扒淨的展示給大家看。所以這一章節我們深入探討一下,Spring事件的執行機制,重點我們看一下: - 事件是怎麼廣播給監聽器的?會不會發送阻塞? - 系統中bean那麼多,ApplicationListener是被如何識別為監聽器的? - 監聽器處理事件的時候,是同步處理還是非同步處理的? - 處理的時候發生異常怎麼辦,後面的監聽器還能執行嗎?

 乍一看是不是問題還挺多,沒事,不要著急,讓我們一起來開啟愉快的探索路程,看看Spring是怎麼玩轉事件的吧。

2.1 事件釋出

 在第一章節,我們直接通過applicationContext釋出了事件,同時也提到了,它之所以能釋出事件,是因為它是ApplicationEventPublisher的子類,因此是具備事件釋出能力的。但按照介面隔離原則,如果我們只需要進行事件釋出,applicationContext提供的能力太多,還是推薦直接使用ApplicationEventPublisher進行操作。

2.1.1 獲取事件釋出器的方式

 我們先來ApplicationEventPublisher的提供的能力,它是一個介面,結構如下: ```java @FunctionalInterface public interface ApplicationEventPublisher { //釋出ApplicationEvent事件 default void publishEvent(ApplicationEvent event) { publishEvent((Object) event); }

//釋出PayloadApplicationEvent事件
void publishEvent(Object event);

} `` &emsp;通過原始碼我們發現ApplicationEventPublisher僅僅提供了事件釋出的能力,支援自定義型別和PayloadApplicationEvent型別(如果沒有定義事件型別,預設包裝為該型別)。那我們如何獲取該釋出器呢,我們最常使用的@Autowired`注入是否可以呢,試一下唄。

  • 通過@Autowired 注入 ApplicationEventPublisher image-20220901164129936.png
     通過debug,我們可以直觀的看到:是可以的,而且注入的就是ApplicationContext例項。也就是說注入ApplicationContext和注入ApplicationEventPublisher是等價的,都是一個ApplicationContext例項。

  • 通過ApplicationEventPublisherAware獲取 ApplicationEventPublisher
     除了@Autowired注入,Spring還提供了使用ApplicationEventPublisherAware獲取 ApplicationEventPublisher的方式,如果實現了這個感知介面,Spring會在合適的時機,回撥setApplicationEventPublisher(),將applicationEventPublisher傳遞給我們。使用起來也很方便。程式碼所示: ```java public class UserService implements ApplicationEventPublisherAware { private ApplicationEventPublisher applicationEventPublisher;

    public void login(String username, String password){ // 1: 進行登入處理 ... // 2: 傳送登入事件,用於記錄操作 applicationEventPublisher.publishEvent(new UserLoginEvent(userId)); }

    // Aware介面回撥注入applicationEventPublisher @Override public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) { this.applicationEventPublisher = applicationEventPublisher; } } `` &emsp;現在我們已經知道通過@AutowiredApplicationEventPublisherAware`回撥都能獲取到事件釋出器,兩種有什麼區別嗎? 其實區別不大,主要是呼叫時機的細小差別,另外就是默寫特殊場景下,@Autowired注入可能無法正常注入,實際開發中完成可以忽略不計。所以優先推薦小夥伴們使用ApplicationEventPublisherAware,如果覺得麻煩,使用@Autowired也未嘗不可。

image.png

如果使是自動注入模型,是無法通過setter()注入ApplicationEventPublisher的,因為在prepareBeanFactory時已經指定忽略此介面的注入了(beanFactory.ignoreDependencyInterface(ApplicationEventPublisherAware.class))。順便說一句,@Autowired不算自動注入哦。

2.1.2 事件的廣播方式

 現在我們已經知道,可以通過ApplicationEventPublisher傳送事件了,那麼這個事件傳送後肯定是要分發給對應的監聽器處理啊,誰處理這個分發邏輯呢?又是怎麼匹配對應的監聽器的呢?我們帶著這兩個問題來看ApplicationEventMulticaster

  • 事件是如何廣播的
     要探查事件是如何廣播的,需要跟隨事件釋出後的邏輯一起看一下:

```java @Override public void publishEvent(ApplicationEvent event) { this.publishEvent(event, null); }

protected void publishEvent(Object event, @Nullable ResolvableType eventType) { // ...省略部分程式碼 if (this.earlyApplicationEvents != null) { this.earlyApplicationEvents.add(applicationEvent); } else { // 將事件廣播給Listener this.getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType); } }

// 獲取事件廣播器 ApplicationEventMulticaster getApplicationEventMulticaster() throws IllegalStateException { if (this.applicationEventMulticaster == null) { throw new IllegalStateException("ApplicationEventMulticaster not initialized - " + "call 'refresh' before multicasting events via the context: " + this); } return this.applicationEventMulticaster; } ```  通過上面原始碼,我們發現釋出器直接把事件轉交給applicationEventMulticaster了,我們再去裡面看一下廣播器裡面做了什麼。

```java // SimpleApplicationEventMulticaster.java public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) { // ...省略部分程式碼 // getApplicationListeners 獲取符合的監聽器 for (ApplicationListener<?> listener : getApplicationListeners(event, type)) { // 執行每個監聽器的邏輯 invokeListener(listener, event); } }

private void doInvokeListener(ApplicationListener listener, ApplicationEvent event) { try { // 呼叫監聽器的onApplicationEvent方法進行處理 listener.onApplicationEvent(event); } } ```  看到這裡,我們發現事件的分發邏輯:先找到匹配的監聽器,然後逐個呼叫onApplicationEvent()進行事件處理。

image.png

  • 事件和監聽器是如何匹配的
     通過上述原始碼,我們發現通過getApplicationListeners(event, type)找到了所有匹配的監聽器,我們繼續跟蹤看一下是如何匹配的。

```java

protected Collection<ApplicationListener<?>> getApplicationListeners( ApplicationEvent event, ResolvableType eventType) { // 省略快取相關程式碼 return retrieveApplicationListeners(eventType, sourceType, newRetriever); }

private Collection<ApplicationListener<?>> retrieveApplicationListeners( ResolvableType eventType, @Nullable Class<?> sourceType, @Nullable CachedListenerRetriever retriever) { // 1: 獲取所有的ApplicationListener Set<ApplicationListener<?>> listeners; Set listenerBeans; synchronized (this.defaultRetriever) { listeners = new LinkedHashSet<>(this.defaultRetriever.applicationListeners); listenerBeans = new LinkedHashSet<>(this.defaultRetriever.applicationListenerBeans); }

for (ApplicationListener<?> listener : listeners) {
    // 2: 遍歷判斷是否匹配
    if (supportsEvent(listener, eventType, sourceType)) {
        if (retriever != null) {
            filteredListeners.add(listener);
        }
        allListeners.add(listener);
    }
}

}

protected boolean supportsEvent( ApplicationListener<?> listener, ResolvableType eventType, @Nullable Class<?> sourceType) { GenericApplicationListener smartListener = (listener instanceof GenericApplicationListener ? (GenericApplicationListener) listener : new GenericApplicationListenerAdapter(listener)); // supportsEventType 根據ApplicationListener的泛型, 和事件型別,看是否匹配 // supportsSourceType 根據事件源型別,判斷是否匹配 return (smartListener.supportsEventType(eventType) && smartListener.supportsSourceType(sourceType)); } ```  通過原始碼跟蹤,我們發現監聽器匹配是根據事件型別匹配的,先獲取容器中所有的監聽器,在用supportsEvent()去判斷對應的監聽器是否匹配事件。這裡匹配主要看兩點: 1. 判斷事件型別和監聽器上的泛型型別,是否匹配(子類也能匹配)。 2. 監聽器是否支援事件源型別,預設情況下,都是支援的。
如果兩者都匹配,就轉發給處理器處理。

image.png

  • ApplicationEventMulticaster是如何獲取的(選讀)
     在事件廣播時,Spring直接呼叫getApplicationEventMulticaster()去獲取屬性applicationEventMulticaster,並且當applicationEventMulticaster為空時,直接異常終止了。那麼就要求該成員變數提早初始化,那麼它是何時初始化的呢。

```java public void refresh() throws BeansException, IllegalStateException { // ...省略無關程式碼 // 初始化事件廣播器(轉發ApplicationEvent給對應的ApplicationListener處理) this.initApplicationEventMulticaster(); }

protected void initApplicationEventMulticaster() { ConfigurableListableBeanFactory beanFactory = this.getBeanFactory(); // spring容器中存在,直接返回 if (beanFactory.containsLocalBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME)) { this.applicationEventMulticaster = beanFactory.getBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, ApplicationEventMulticaster.class); if (this.logger.isTraceEnabled()) { this.logger.trace("Using ApplicationEventMulticaster [" + this.applicationEventMulticaster + "]"); } } else { // 容器中不存在,建立SimpleApplicationEventMulticaster,放入容器 this.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory); beanFactory.registerSingleton(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, this.applicationEventMulticaster); if (this.logger.isTraceEnabled()) { this.logger.trace("No '" + APPLICATION_EVENT_MULTICASTER_BEAN_NAME + "' bean, using " + "[" + this.applicationEventMulticaster.getClass().getSimpleName() + "]"); } } } `` &emsp;看到這裡,是不是豁然開朗,原來在容器啟動的時候,專門呼叫了initApplicationEventMulticaster()applicationEventMulticaster`進行了初始化,並放到了spring容器中。

其實這裡還有個問題,就是事件整體的初始化流程在BeanFactoryPostProcessor之後,如果在自定義的BeanFactoryPostProcessor釋出事件,此時applicationEventMulticaster還沒有初始化,監聽器也沒有註冊,是無法進行事件的廣播的。該問題在Spring3之前普遍存在,在最近的版本已經解決,其思路是:先將早期事件放入集合中,待廣播器、監聽器註冊後,再從集合中取出進行廣播。

2.2 事件監聽器

 監聽器是負責處理事件的,在廣播器將對應的事件廣播給它之後,它正式上崗開始處理事件。Spring預設的監聽器是同步執行的,並且支援一個事件由多個監聽器處理,並可通過@Order指定監聽器處理順序。

2.2.1 定義監聽器的方式

image.png - 實現ApplicationListener定義監聽器
 第一種方式定義的方式當然是通過直接繼承ApplicationListener,同時不要忘記通過泛型指定事件型別,它可是將事件廣播給監聽器的核心匹配標誌。 java public class MyEventListener implements ApplicationListener<MyApplicationEvent> { @Override public void onApplicationEvent(MyApplicationEvent event) { System.out.println(Thread.currentThread().getName() + "接受到事件:"+event.getSource()); } }

通過ApplicationListener定義的監聽器,本質上是一個單事件監聽器,也就是隻能處理一種型別的事件。

  • 使用@EventListener定義監聽器
     第二種方式我們還可以使用@EventListener標註方法為監聽器,該註解標註的方法上,方法引數為事件型別,標註該監聽器要處理的事件型別。 ```java public class AnnotationEventListener { // 使用@EventListener標註方法為監聽器,引數型別為事件型別 @EventListener public void onApplicationEvent(MyApplicationEvent event) { System.out.println(Thread.currentThread().getName() + "接受到事件:"+event.getSource()); }
@EventListener
public void onApplicationEvent(PayloadApplicationEvent payloadApplicationEvent) {
    System.out.println(Thread.currentThread().getName() + "接受到事件:"+payloadApplicationEvent.getPayload());
}

} ```

通過廣播器分發事件的邏輯,我們知道事件只能分發給ApplicationListener型別的監聽器例項處理,這裡僅僅是標註了@EventListener的方法,也能被是識別成ApplicationListener型別的監聽器嗎?答案是肯定的,只是Spring在底層進行了包裝,偷偷把@EventListener標註的方法包裝成了ApplicationListenerMethodAdapter,它也是ApplicationListener的子類,這樣就成功的把方法轉換成ApplicationListener例項了,後續章節我們會詳細揭露Spring偷樑換柱的小把戲,小夥伴們稍安勿躁。

2.2.2 ApplicationListener監聽器是如何被識別的

 本小節我們一起看一下監聽器是如何被是識別的,畢竟大多數情況下,我們只是直接加了@Component註解,然後實現了一下ApplicationListener介面,並沒有特殊指定為監聽器。那有沒有可能就是基於這個繼承關係,Spring自己在容器中進行型別查詢呢? ```java public void refresh() throws BeansException, IllegalStateException { try { // ...省略部分程式碼 // 初始化各種監聽器 this.registerListeners(); } }

// 註冊監聽器 protected void registerListeners() { // 1: 處理context.addApplicationListener(new MyEventListener()) 方式註冊的監聽器,並將監聽器註冊到廣播器中, for (ApplicationListener<?> listener : this.getApplicationListeners()) { this.getApplicationEventMulticaster().addApplicationListener(listener); }

// 2: 去Spring容器中獲取監聽器(處理掃描的或者register方式註冊的),同樣也是新增到廣播器中 String[] listenerBeanNames = this.getBeanNamesForType(ApplicationListener.class, true, false); for (String listenerBeanName : listenerBeanNames) { this.getApplicationEventMulticaster().addApplicationListenerBean(listenerBeanName); } } `` &emsp;通過上述原始碼跟蹤,我們發現原來在容器refresh()的時候,專門有個步驟是用來初始化各種監聽器的。它的具體實現是:先把通過addApplicationListener()直接指定的註冊為監聽器 -> 再通過型別查詢,把當做普通bean註冊到容器中,類似是ApplicationListener`的找了出來 -> 快取到ApplicationEventMulticaster中的監聽器集合中了。一路跟蹤下來,確實是根據型別查詢的,和我們的猜想完全一致。

2.2.3 @EventListener標註的處理器是如何識別註冊的

 本小節我們探究一下,標註了@EventListener的方法是如何被包裝成ApplicationListener例項的。我們直接從原始碼入手,Spring在例項化bean後,呼叫了afterSingletonsInstantiated()@EventListener的方法進行了保證,我們一起看一下。

```java public void refresh() throws BeansException, IllegalStateException { try { // ...省略部分程式碼 //完成普通單例Bean的例項化(非延遲的) this.finishBeanFactoryInitialization(beanFactory); // ...省略部分程式碼 } }

protected void finishBeanFactoryInitialization(ConfigurableListableBeanFactory beanFactory) { // ...省略部分程式碼 // 初始化非延遲載入的單例bean beanFactory.preInstantiateSingletons(); }

@Override public void preInstantiateSingletons() throws BeansException { List beanNames = new ArrayList<>(this.beanDefinitionNames); // 1: 完成bean的例項化 for (String beanName : beanNames) { //通過beanName獲取bean,bean不存在會建立bean getBean(beanName); }

// 2: 呼叫bean的後置處理方法
for (String beanName : beanNames) {
    // ...省略部分程式碼
    // 呼叫到EventListenerMethodProcessor的afterSingletonsInstantiated(),完成@EventListener的方法的轉換註冊
    smartSingleton.afterSingletonsInstantiated();
}

}

// EventListenerMethodProcessor.java public void afterSingletonsInstantiated() { // ...省略部分程式碼 for (String beanName : beanNames) { // ...省略部分程式碼 processBean(beanName, type); } }

private void processBean(final String beanName, final Class<?> targetType) { // 1: 解析bean上加了@EventListener的方法 Map annotatedMethods = null; try { annotatedMethods = MethodIntrospector.selectMethods(targetType, (MethodIntrospector.MetadataLookup) method -> AnnotatedElementUtils.findMergedAnnotation(method, EventListener.class)); }

// ...省略部分程式碼
// 2: 遍歷加了@EventListener的方法,註冊為事件監聽器
for (Method method : annotatedMethods.keySet()) {
    for (EventListenerFactory factory : factories) {
        if (factory.supportsMethod(method)) {
            Method methodToUse = AopUtils.selectInvocableMethod(method, context.getType(beanName));
            // 2.1 通過EventListenerFactory,將方法建立為監聽器例項(ApplicationListenerMethodAdapter)
            ApplicationListener<?> applicationListener = factory.createApplicationListener(beanName, targetType, methodToUse);
            if (applicationListener instanceof ApplicationListenerMethodAdapter) {
                    ((ApplicationListenerMethodAdapter) applicationListener).init(context, this.evaluator);
            }
            // 2.2 註冊為ApplicationListener
            context.addApplicationListener(applicationListener);
            break;
        }
    }
}
// ...省略部分程式碼

} `` &emsp;我們整理一下呼叫關係:refresh()->finishBeanFactoryInitialization(beanFactory) ->beanFactory.preInstantiateSingletons() ->eventListenerMethodProcessor.afterSingletonsInstantiated()->eventListenerMethodProcessor.processBean()`;在容器中所有的bean例項化後,會再次遍歷遍歷所有bean,呼叫SmartInitializingSingleton型別的bean的afterSingletonsInstantiated()的方法,此時符合條件的EventListenerMethodProcessor就會被呼叫,進而通過processBean(),先找出標註了@EventListener的方法,然後遍歷這些方法,通過EventListenerFactory工廠,包裝方法為EventListener例項,最後在註冊到容器中。至此,完成了查詢,轉換的過程。
image.png

關於@EventListener標註方法的解析時機,筆者首先想到的應該和@Bean的處理時機一致,在掃描類的時候,就解析出來加了@EventListener的方法,抽象為BeanDefinition放到容器中,後面例項化時候,和正常掃描出來的bean是一樣的例項化流程。但是查詢下來發現Spring並沒有這樣處理,而是在bean初始化後回撥階段處理的。究其原因,大概是@Bean真的是需要託付給Spring管理,而@EventListener只是一個標識,無需放入放入容器,防止對完暴露所致吧。

  • EventListenerMethodProcessors是如何註冊的
     通過上述的原始碼分析,我們清楚對於@EventListener的方法的處理,EventListenerMethodProcessor可謂是至關重要,那麼他是怎麼註冊到Spring中的。而且我們也沒有通過@EnableXXX進行開啟啊。其實Spring除了管理我們定義的bean,還會有一些內建的bean,來承接一些Spring核心工作,這些內建的bean一般在application容器建立的時候,就放入到Spring容器中了。下面我們來看一下是不是這樣: ```java // 構造方法 public AnnotationConfigApplicationContext() { // 1: 初始化BeanDefinition渲染器,註冊一下Spring內建的BeanDefinition this.reader = new AnnotatedBeanDefinitionReader(this); this.scanner = new ClassPathBeanDefinitionScanner(this); }

// AnnotatedBeanDefinitionReader.java public class AnnotatedBeanDefinitionReader { public AnnotatedBeanDefinitionReader(BeanDefinitionRegistry registry, Environment environment) { // ...省略部分程式碼 // 註冊一些內建後置處理器的BeanDefinition,是spring這兩個最核心的功能類 AnnotationConfigUtils.registerAnnotationConfigProcessors(this.registry); } }

// AnnotationConfigUtils.java public static Set registerAnnotationConfigProcessors( BeanDefinitionRegistry registry, @Nullable Object source) { // ...省略部分程式碼 // 註冊EventListenerMethodProcessor if (!registry.containsBeanDefinition(EVENT_LISTENER_PROCESSOR_BEAN_NAME)) { RootBeanDefinition def = new RootBeanDefinition(EventListenerMethodProcessor.class); def.setSource(source); beanDefs.add(registerPostProcessor(registry, def, EVENT_LISTENER_PROCESSOR_BEAN_NAME)); } } ```

2.3 非同步處理事件

 通過上面的分析,我們知道事件在廣播時是同步執行的,廣播流程為:先找到匹配的監聽器 -> 逐個呼叫onApplicationEvent()進行事件處理,整個過程是同步處理的。下面我們做一個測試驗證一下: ```java public void applicationListenerTest(){ AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(); context.register(AnnotationEventListener.class); context.refresh(); System.out.printf("執行緒:[%s],時間:[%s],開始釋出事件\n", new Date(), Thread.currentThread().getName()); context.publishEvent(new MyApplicationEvent(1L)); System.out.printf("執行緒:[%s],時間:[%s],釋出事件完成\n", new Date(), Thread.currentThread().getName()); context.stop(); }

public class AnnotationEventListener { @EventListener @Order(1) public void onApplicationEvent(MyApplicationEvent event) { Date start = new Date(); Thread.sleep(3000); System.out.printf("執行緒:[%s],監聽器1,接收時間:[%s],處理完成時間:[%s],接收到事件:%s\n", Thread.currentThread().getName(), start, new Date(), event.getSource()); }

@EventListener
@Order(2)
public void onApplicationEvent2(MyApplicationEvent event) {
    Date start = new Date();
    System.out.printf("執行緒:[%s],監聽器2,接收時間:[%s],處理完成時間:[%s],接收到事件:%s\n", Thread.currentThread().getName(), start, new Date(), event.getSource());
}

}

// 輸出資訊: // 執行緒:[main],時間[22:59:24],開始釋出事件 // 執行緒:[main],監聽器1,接收時間:[22:59:24],處理完成時間:[22:59:27],接收到事件:1 // 執行緒:[main],監聽器1,接收時間:[22:59:27],處理完成時間:[22:59:27],接收到事件:1 // 執行緒:[main],時間[22:59:27],,釋出事件完成 ```  通過上述示例程式碼,發現確實是同步呼叫的,處理執行緒都是main,監聽器1處理緩慢,監聽器2只能默默等待監聽器1處理後才能接收到事件。這能滿足我們的需求嗎,畢竟現在系統動輒就要求毫秒計返回,QPS沒有1000+你都不好意思出門,哪怕只有十個使用者😂。

 除了效能問題,我們基於真實業務場景出發,考慮一下什麼場景下,我們使用事件比較合適。個人使用最多的場景是:在執行某個業務時,需要通知別的業務方,該業務的執行情況時,會使用事件機制進行通知。就拿這個場景來說,我們考慮幾個問題: 1. 我們是否關心監聽者的執行時機? 2. 我們是否關心監聽者的執行結果?

 大多數情況下,其實我們並不關心的監聽者什麼時候執行,執行結果如何的。如果對執行結果有依賴,通常直接呼叫了,如果有可能,還能享受事務的便利,還藉助事件幹什麼呢。所以這裡其實有個需求,希望Spring事件的處理是非同步的,那如何實現呢?

2.3.1 通過注入taskExecutor,非同步處理事件

 通過前文的分析,我們知道事件的廣播是由ApplicationEventMulticaster進行處理的,那我們去看看,是否支援非同步處理呢。 ```java @Override public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) { // 獲取執行執行緒池 Executor executor = getTaskExecutor(); for (ApplicationListener<?> listener : getApplicationListeners(event, type)) { // 如果存線上程池,使用執行緒池非同步執行 if (executor != null) { executor.execute(() -> invokeListener(listener, event)); } // 如果不存線上程池,同步執行 else { invokeListener(listener, event); } } }

// 獲取執行緒池 protected Executor getTaskExecutor() { return this.taskExecutor; }

// 設定執行緒池 public void setTaskExecutor(@Nullable Executor taskExecutor) { this.taskExecutor = taskExecutor; } ```

 通過原始碼我們發現,其實Spring提供了使用執行緒池非同步執行的邏輯,前提是需要先設定執行緒池,只是這裡設定執行緒池的方式稍微麻煩些,需要通過applicationEventMulticaster例項的setTaskExecutor()設定,下面我們試一下是否可行。

```java public void applicationListenerTest(){ AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(); context.register(AnnotationEventListener.class); context.refresh(); ApplicationEventMulticaster multicaster = context.getBean(AbstractApplicationContext.APPLICATION_EVENT_MULTICASTER_BEAN_NAME, ApplicationEventMulticaster.class); if (multicaster instanceof SimpleApplicationEventMulticaster) { ((SimpleApplicationEventMulticaster) multicaster).setTaskExecutor(Executors.newFixedThreadPool(10)); } System.out.printf("執行緒:[%s],時間:[%s],開始釋出事件\n", new Date(), Thread.currentThread().getName()); context.publishEvent(new MyApplicationEvent(1L)); System.out.printf("執行緒:[%s],時間:[%s],釋出事件完成\n", new Date(), Thread.currentThread().getName()); context.stop(); }

public class AnnotationEventListener { @EventListener @Order(1) public void onApplicationEvent(MyApplicationEvent event) { Date start = new Date(); Thread.sleep(3000); System.out.printf("執行緒:[%s],監聽器1,接收時間:[%s],處理完成時間:[%s],接收到事件:%s\n", Thread.currentThread().getName(), start, new Date(), event.getSource()); }

@EventListener
@Order(2)
public void onApplicationEvent2(MyApplicationEvent event) {
    Date start = new Date();
    System.out.printf("執行緒:[%s],監聽器2,接收時間:[%s],處理完成時間:[%s],接收到事件:%s\n", Thread.currentThread().getName(), start, new Date(), event.getSource());
}

}

// 輸出資訊: // 執行緒:[main],時間[22:57:13],開始釋出事件 // 執行緒:[main],時間[22:57:13],,釋出事件完成 // 執行緒:[pool-2-thread-1],監聽器2,接收時間:[22:57:13],處理完成時間:[22:57:13],接收到事件:1 // 執行緒:[pool-2-thread-2],監聽器1,接收時間:[22:57:13],處理完成時間:[22:57:16],接收到事件:1 ```  經過測試發現:設定了執行緒池之後,監聽器確實是非同步執行的,並且是全域性生效,對所有型別的監聽器都適用。只是這裡的設定稍顯不便,需要先獲取到applicationEventMulticaster這個bean之後,再使用內建方法設定。

2.3.2 使用@Async,非同步處理事件

 通過注入執行緒池,是全域性生效的。如果我們專案中有些事件需要非同步處理,又有些事件需要同步執行的,怎麼辦,總不能告訴你的leader做不了吧。NO,那不是顯得我很沒有用。面對這種情況,我們可以藉助@Async註解,使單個監聽器非同步執行。我們測試一下:

```java // 使用@EnableAsync開啟非同步 @EnableAsync public class AnnotationEventListener {

@EventListener
@Order(1)
public void onApplicationEvent(MyApplicationEvent event) {
    Date start = new Date();
    Thread.sleep(3000);
    System.out.printf("執行緒:[%s],監聽器1,接收時間:[%s],處理完成時間:[%s],接收到事件:%s\n", Thread.currentThread().getName(), start, new Date(), event.getSource());
}

@EventListener
@Order(2)
public void onApplicationEvent2(MyApplicationEvent event) {
    Date start = new Date();
    Thread.sleep(1000);
    System.out.printf("執行緒:[%s],監聽器2,接收時間:[%s],處理完成時間:[%s],接收到事件:%s\n", Thread.currentThread().getName(), start, new Date(), event.getSource());
}

}

// 輸出資訊: // 執行緒:[main],時間[23:18:32],開始釋出事件 // 執行緒:[main],監聽器1,接收時間:[23:18:32],處理完成時間:[23:18:35],接收到事件:1 // 執行緒:[main],時間[23:18:35],,釋出事件完成 // 執行緒:[SimpleAsyncTaskExecutor-1],監聽器2,接收時間:[23:18:35],處理完成時間:[23:18:36],接收到事件:1 ```  經過測試發現:在@Async的加持下,確實可以控制某個監聽器非同步執行。其實@Async也是使用了執行緒池執行的,對@Async感興趣的同學可以自行查閱資料,這裡我們不做展開了。

image.png

2.4 全域性異常處理

 通過我們長時間的囉嗦,聰明的你肯定清楚:Spring事件的處理,預設是同步依次執行。那如果前面的監聽器出現了異常,並且沒有處理異常,會對後續的監聽器還能順利接收該事件嗎?其實不能的,因為異常中斷了事件的傳送了,這裡我們不做演示了,有興趣的同學們可以自行驗證一下。

那如果我們設定了非同步執行呢,還會有影響嗎,對執行緒池有所瞭解的同學肯定可以給出答案:不會,因為不是一個執行緒執行,是不會互相影響的。

 難道同步執行我們就要在每個監聽器都try catch一下,避免相互影響嗎,不能全域性處理嗎?當前可以了,貼心的Spring為了簡化我們的開發邏輯,特意提供了ErrorHandler來統一處理,話不多說,我們趕緊來試一下吧。

```java public class AnnotationEventListener {

@EventListener
@Order(1)
public void onApplicationEvent(MyApplicationEvent event) {
    Date start = new Date();
    // 製造異常
    int i = 1/0;
    System.out.printf("執行緒:[%s],監聽器1,接收時間:[%s],處理完成時間:[%s],接收到事件:%s\n", Thread.currentThread().getName(), start, new Date(), event.getSource());
}

@EventListener
@Order(2)
public void onApplicationEvent2(MyApplicationEvent event) {
    Date start = new Date();
    System.out.printf("執行緒:[%s],監聽器2,接收時間:[%s],處理完成時間:[%s],接收到事件:%s\n", Thread.currentThread().getName(), start, new Date(), event.getSource());
}

}

// 測試方法 public void applicationListenerTest() throws InterruptedException { AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(); context.register(AnnotationEventListener.class); context.refresh(); ApplicationEventMulticaster multicaster = context.getBean(AbstractApplicationContext.APPLICATION_EVENT_MULTICASTER_BEAN_NAME, ApplicationEventMulticaster.class); if (multicaster instanceof SimpleApplicationEventMulticaster) { // 簡單列印異常資訊 ((SimpleApplicationEventMulticaster) multicaster).setErrorHandler(t -> System.out.println(t)); } System.out.printf("執行緒:[%s],時間:[%s],開始釋出事件\n", new Date(), Thread.currentThread().getName()); context.publishEvent(new MyApplicationEvent(1L)); System.out.printf("執行緒:[%s],時間:[%s],釋出事件完成\n", new Date(), Thread.currentThread().getName()); context.stop(); }

// 輸出資訊: // 執行緒:[main],時間[23:35:15],開始釋出事件 // java.lang.ArithmeticException: / by zero // 執行緒:[main],監聽器2,接收時間:[23:35:15],處理完成時間:[23:35:15],接收到事件:1 // 執行緒:[main],時間[23:35:15],,釋出事件完成 ```  經過測試發現:設定了ErrorHandler之後,確實可以對異常進行統一的管理了,再也不用繁瑣的try catch了,今天又多了快樂划水五分鐘的理由呢。老規矩,我們不光要做到知其然,還要做到知其所以然,我們探究一下為什麼加了ErrorHandler之後,就可以全域性處理呢?

java protected void invokeListener(ApplicationListener<?> listener, ApplicationEvent event) { // 獲取ErrorHandler ErrorHandler errorHandler = getErrorHandler(); // 如果ErrorHandler存在,監聽器執行出現異常,交給errorHandler處理,不會傳遞向上丟擲異常。 if (errorHandler != null) { try { doInvokeListener(listener, event); } catch (Throwable err) { errorHandler.handleError(err); } } else { // 呼叫監聽器處理 doInvokeListener(listener, event); } }  經過閱讀原始碼,我們發現:Sring先查詢是否配置了ErrorHandler,如果配置了,在發生異常的時候,把異常資訊轉交給errorHandler處理,並且不會在向上傳遞異常了。這樣可以達到異常全域性處理的效果了。

3. Spring事件機制存在什麼問題

3.1 釋出阻塞

 Spring釋出事件的時候,由applicationEventMulticaster來處理分發邏輯,這是單執行緒處理,處理邏輯我們分析過,就是:找到事件對應的監聽器(有快取) -> 逐個分發給監聽器處理(默認同步,可非同步)。我們考慮一下這種設計會不會有效能問題了?同步執行的情況我們就不討論了,對應的場景一定是事件發生頻率較低,這種場景討論效能沒有意義。

 我們主要討論非同步模式,無論是@Async還是注入執行緒池,本質都是:通過執行緒池執行,並且執行緒池的執行緒是所有監聽器共同使用的(@Async對應的執行緒池供所有加了@Async的方法使用)。我們都清楚執行緒池的執行流程:先建立執行緒執行任務,之後會放到緩衝佇列,最後可能直接拒絕。

 基於共享執行緒池執行的監聽器的模式,有什麼問題呢?最嚴重的問題就是:監聽器的執行速度會互相影響、甚至會發生阻塞。假如某一個監聽器執行的很慢,把執行緒池中執行緒都佔用了,此時其他的事件雖然釋出但沒有資源執行,只能在快取佇列等待執行緒釋放,哪怕該事件的處理很快、很重要,也不行。

 其實這裡可以參考Netty的boss-work工作模型,廣播器只負責分發事件,排程執行監聽器的邏輯交給由具體的work執行緒負責會更合適。

3.2 無法訂製監聽器執行執行緒數

 正是由於每種事件產生的數量、處理邏輯、處理速度差異化可能很大,所以每個監聽器都有適合自己場景的執行緒數,所以為每個監聽器配置執行緒池就顯得尤為重要。Spring事件機制,無法單獨為事件(或者監聽器)設定執行緒池,只能共用執行緒池,無法做到精準控制,執行緒擁堵或者執行緒浪費出現的機率極大。當然,我們也可以在監聽器內部,接收到事件後使用自定義的執行緒池處理,但是我們更希望簡單化配置就能支援

 關於Spring事件機制存在的問題,筆者在專案中藉助記憶體佇列Disruptor儲存事件,採用雙匯流排的思想實現自研專案event-bus,解決了Spring事件機制不完美的部分。後續有機會再和大家分享該專案的詳細情況。