Guava是Google開源的一個Java基礎類庫,它在Google內部被廣泛使用。Guava提供了很多功能模塊比如:集合、并發庫、緩存等,EventBus是其中的一個module,本篇結合EventBus源碼來談談它的設計與實現。
概要
首先,我們先來預覽一下EventBus模塊的全部類圖:
類并不是多而且幾乎沒有太多繼承關系。
下面,我們來看一下各個類的職責:
- EventBus:核心類,代表了一個事件總線。Publish事件也由它發起。
- AsyncEventBus:在分發事件的時候,將其壓入一個全局隊列的異步分發模式。
- Subscriber:對某個事件的處理器抽象,封裝了事件的訂閱者以及處理器,并負責事件處理(該類的類名及其語義有些不明確,后續會談到)。
- SubscriberRegistry:訂閱注冊表,它用于存儲Subscriber跟Event的對應關系,以便于EventBus在publish一個事件時,可以找到它對應的Subscriber。
- Dispatcher:事件分發器,它定義了事件的分發策略。
- @Subscribe:用于標識事件處理器的注解,當EventBus publish一個事件后,相應的Subscriber將會得到通知并執行事件處理器。
- @AllowConcurrentEvents:該注解跟@Subscribe一同使用,標識該訂閱者的處理方法為線程安全的,該注解還用于標識該方法將可能會被EventBus在多線程環境下執行。
- DeadEvent:死信(沒有訂閱者關注的事件)對象。
- SubscribeExceptionHandler:訂閱者拋出異常的處理器。
- SubscribeExceptionContext:訂閱者拋出異常的上下文對象。
在對每個類進行分解之前,我們再來看一下各個類之間的關聯關系:
分“類”解讀
EventBus
它有這么幾個字段:
- identifier:事件總線的標識,這說明在一個應用里是可以有多個EventBus的。如果不指明它的值,它將以“default”作為其默認名稱。
- executor:它是Executor接口的實例,用于對訂閱者處理事件方法的執行。這里需要注意的是,該字段的實例化是在EventBus內部構造器中,并不是從外部注入進來的,另外真正的執行訂閱者方法的時機也不由EventBus負責,而是由Subscriber負責,因此該字段會被公開給外部訪問。
- exceptionHandler:它是SubscribeExceptionHandler的實例,用于處理訂閱者在執行事件處理方法時拋出的異常。EventBus可以接收一個外部定義的異常處理器,也可以采用內部缺省的日志記錄處理器。
- subscribers:訂閱者注冊表,用于存儲所有的事件以及事件處理器、訂閱對象的對應關系。
- dispatcher:事件分發器,用于分發事件給訂閱對象的事件處理器,該對象在EventBus構造方法內部初始化,默認的實現是PerThreadQueuedDispatcher,該分發器將事件存入隊列,并保證在同一個線程上發送的事件能夠按照他們發布的順序被分發給所有的訂閱者。
EventBus提供了幾個核心方法:
- register:注冊subscriber;
- unregister:移除注冊過的subscriber;
- post:發布事件;
你可以將EventBus看做是一個代理,這些方法真正的實現者都是上面的這些對象。
AsyncEventBus
一個支持異步發布模式的EventBus,它覆蓋了EventBus的默認構造方法,指定了一個異步的分發器:LegacyAsyncDispatcher,這個分發器基于一個全局的隊列來暫存未發布的事件。
Subscriber
之前也提到Subscriber的名稱是比較容易混淆的。這個類的名稱看似表示一個訂閱者對象,但其實是用來封裝“一個訂閱者的一個事件處理器”對象。因為當一個訂閱者存在多個處理方法被標注為@Subscribe的時候,那么每個處理方法都對應于一個獨立的Subscriber對象的實例。我個人覺得這個名稱與其具體的實現語義有些混淆。當然也許實現者認為:一個對象以及一個事件處理器就是一個Subscriber的話,那是沒有問題的。因此這里為了理解方便,你可以將其看做是一個封裝了訂閱者對象以及一個訂閱者處理器方法的實體類。
Subscriber的訪問級別是package的,它還承擔了執行事件處理的責任。通過一個create靜態工廠方法創建它:
static Subscriber create(EventBus bus, Object listener, Method method) {return isDeclaredThreadSafe(method)? new Subscriber(bus, listener, method): new SynchronizedSubscriber(bus, listener, method);}
它接收三個參數:
- bus:EventBus的實例,通過它來獲取事件的執行器(executor)
- listener:真實的訂閱者對象
- method:訂閱對象的事件處理方法的Method實例
在實現中,它會先判斷該處理器方法上是否被標注有@AllowConcurrentEvents注解,如果有,則實例化Subscriber類的一個實例;如果沒有,則不允許eventbus在多線程的環境中調用處理器方法,所以這里專門為此提供了一個同步的訂閱者對象:SynchronizedSubscriber來保證線程安全。
該類的兩個關鍵方法之一:
dispatchEvent:
final void dispatchEvent(final Object event) {executor.execute(new Runnable() {@Overridepublic void run() {try {invokeSubscriberMethod(event);} catch (InvocationTargetException e) {bus.handleSubscriberException(e.getCause(), context(event));}}});}
它調用一個多線程執行器來執行事件處理器方法。
另一個方法:invokeSubscriberMethod以反射的方式調用事件處理器方法。
另外,該類對Object的equals方法進行了override并標識為final。主要是為了避免同一個對象對某個事件進行重復訂閱,在SubscriberRegistry中會有相應的判等操作。當然這里Subscriber也override并final了hashCode方法。這是最佳實踐,不必多談,如果不了解的可以去看看《Effective Java》。
該類還有個內部類,就是我們上面談到的SynchronizedSubscriber,它繼承了Subscriber,與Subscriber唯一的不同就是在invokeSubscriberMethod的執行上做了同步。
SubscriberRegistry
針對單個EventBus的訂閱與事件的關系維護。在內部用來存儲訂閱者關系的對象是java并發包下的并發Map:ConcurrentMap,該map以Class對象為鍵,值的類型是CopyOnWriteArraySet<Subscriber>集合類型。
SubscriberRegistry直接依賴EventBus對象,所以在構造器中需要注入EventBus的實例。
SubscriberRegistry里有兩個關鍵的實例方法:register/unregister。
register
接收訂閱者對象作為參數并建立Event跟Subscriber的關聯關系。
我們來看看它的實現:
void register(Object listener) {Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);for (Map.Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {Class<?> eventType = entry.getKey();Collection<Subscriber> eventMethodsInListener = entry.getValue();CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);if (eventSubscribers == null) {CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<Subscriber>();eventSubscribers = MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet);}eventSubscribers.addAll(eventMethodsInListener);}}
它首先獲得一個Multimap實例(它是Google Guava集合框架提供的一個多值Map類型,也就是說一個key可以對應多個value),該Multimap用于存儲事件類型對應的該訂閱者內所有關于該事件的處理器方法集合,其key為事件的Class類型。這里在for循環的中通過asMap獲取其map視圖,即可將Multimap對應的多個值存儲到一個Collection中。
也就是說這里for循環的每個entry,表示的是一個事件的Class實例對應的一組Subscriber的集合,即eventMethodsInListener。
然后根據該事件的Class對象從注冊表中獲取對應的存儲Subscriber實例的集合,如果不存在則創建該集合,然后將該訂閱者內所有的事件處理器方法都加入到注冊表中去。
unregister
unregister的實現跟register有些類似,先查找該訂閱者所有的事件類型與處理器的對應關系。然后,遍歷所有的事件類型,移除針對當前訂閱者的所有Subscriber實例。
findAllSubscribers
register/unregister方法都調用了findAllSubscribers方法,它有一些特別之處,這里需要單獨拎出來提一下。
findAllSubscribers用于查找事件類型以及事件處理器的對應關系。查找注解需要涉及到反射,通過反射來獲取標注在方法上的注解。因為Guava針對EventBus的注冊采取的是“隱式契約”而非接口這種“顯式契約”。而類與接口是存在繼承關系的,所有很有可能某個訂閱者其父類(或者父類實現的某個接口)也訂閱了某個事件。因此這里的查找需要順著繼承鏈向上查找父類的方法是否也被注解標注,代碼實現:
private Multimap<Class<?>, Subscriber> findAllSubscribers(Object listener) {Multimap<Class<?>, Subscriber> methodsInListener = HashMultimap.create();Class<?> clazz = listener.getClass();for (Method method : getAnnotatedMethods(clazz)) {Class<?>[] parameterTypes = method.getParameterTypes();Class<?> eventType = parameterTypes[0];methodsInListener.put(eventType, Subscriber.create(bus, listener, method));}return methodsInListener;}
同樣涉及這個問題的,還有根據事件類型獲取Subscriber實例的方法:getSubscribers。
getSubscribers
Iterator<Subscriber> getSubscribers(Object event) {ImmutableSet<Class<?>> eventTypes = flattenHierarchy(event.getClass());List<Iterator<Subscriber>> subscriberIterators =Lists.newArrayListWithCapacity(eventTypes.size());for (Class<?> eventType : eventTypes) {CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);if (eventSubscribers != null) {// eager no-copy snapshotsubscriberIterators.add(eventSubscribers.iterator());}}return Iterators.concat(subscriberIterators.iterator());}
Dispatcher
abstract void dispatch(Object event, Iterator<Subscriber> subscribers);
該方法用于將一個指定的事件分發給所有的訂閱者。
另外在Dispatcher提供了三個不同的分發器實現:
PerThreadQueuedDispatcher
它比較常用,針對每個線程構建一個隊列用于暫存事件對象。保證所有的事件都按照他們publish的順序從單一的線程上發出。保證從單一線程上發出,沒什么特別的地方,主要是在內部定義了一個隊列,將其放在ThreadLocal中,用以跟特定的線程關聯。
LegacyAsyncDispatcher
另一個異步分發器的實現:LegacyAsyncDispatcher,之前在介紹AsyncEventBus的時候提到,它就是用這種實現來分發事件。
它在內部通過一個ConcurrentLinkedQueue<EventWithSubscriber>的全局隊列來存儲事件。從關鍵方法:dispatch的實現來看,它跟PerThreadQueuedDispatcher的區別主要是兩個循環上的差異(這里基于隊列的緩存事件的方式,肯定會存在兩個循環:循環取隊列里的事件以及循環發送給Subscriber)。
PerThreadQueuedDispatcher:是兩層嵌套循環,外層是遍歷隊列取事件,內存是遍歷事件的訂閱處理器。
LegacyAsyncDispatcher:是一前一后兩個循環。前面一個是遍歷事件訂閱處理器,并構建一個事件實體對象存入隊列。后一個循環是遍歷該事件實體對象隊列,取出事件實體對象中的事件進行分發。
ImmediateDispatcher
其實以上兩個基于中間隊列的分發實現都可以看做是異步模式,而ImmediateDispatcher則是同步模式:只要有事件發生就會立即分發并被立即得到處理。ImmediateDispatcher從感官上看類似于線性并順序執行,而采用隊列的方式有多線程匯聚到一個公共隊列的由發散到聚合的模型。因此,ImmediateDispatcher的分發方式是一種深度優先的方式,而使用隊列是一種廣度優先的方式。
DeadEvent
它是一個實體對象,封裝了沒有訂閱者的事件。DeadEvent由兩個屬性組成:
- source:事件源(通常指發布事件的EventBus對象)
- event:事件對象
DeadEvent對象的產生:當通過某個EventBus的實例發布一個事件的時候,沒有找到事件訂閱者并且它本身又不是一個DeadEvent的實例時,將由EventBus構建一個DeadEvent類的實例。
總結
Guava的EventBus源碼還是比較簡單、清晰的。從源碼來看,它一番常用的Observer的設計方式,放棄采用統一的接口、統一的事件對象類型。轉而采用基于注解掃描的綁定方式。
其實無論是強制實現統一的接口,還是基于注解的實現方式都是在構建一種關聯關系(或者說滿足某種契約)。很明顯接口的方式是編譯層面上強制的顯式契約,而注解的方式則是運行時動態綁定的隱式契約關系。接口的方式是傳統的方式,編譯時確定觀察者關系,清晰明了,但通常要求有一致的事件類型、方法簽名。而基于注解實現的機制,剛好相反,編譯時因為沒有接口的語法層面上的依賴關系,顯得不那么清晰,至少靜態分析工具很難展示觀察者關系,但無需一致的方法簽名、事件參數,至于多個訂閱者類之間的繼承關系,可以繼承接收事件的通知,可以看作既是其優點也是其缺點。
原文發布時間為:2015-06-01
本文作者:vinoYang
本文來自云棲社區合作伙伴CSDN博客,了解相關信息可以關注CSDN博客。