EventBus 類解析
當我們開發軟件時,各個對象之間的數據共享和合作是必須的。 但是這里比較難做的是 怎樣保證消息之間的傳輸高效并且減少各個模塊之間的耦合。 當組件的職責不清楚時,一個組件還要承擔另一個組件的職責,這樣的系統我們就認為是高耦合。 當我們的系統變得高耦合時,任何一個小的改動都會對系統造成影響。 為了解決設計上的問題,我們設計了基于事件的設計模型。 在事件驅動編程模型中,對象可以發布/訂閱 事件. 事件監聽者就是監聽事件的發生,我們在第六章中已經看到過RemovalListener, 在這一章中,我們將討論Guava的EventBus類,了解它的發布/訂閱事件是怎么使用的。
這一章,我們將覆蓋下面的知識點:
-- EventBus 和 AsyncEventBus類
-- 怎樣是用EventBus訂閱事件
-- 使用EventBus發布事件
-- 編寫事件處理器,并且根據我們的需求選擇合適的處理器
-- 與DI工具協作
EventBus
EventBus類是guava中關注消息的發布和訂閱的類,簡單的說訂閱者通過EventBus注冊并訂閱事件,發布者將事件發送到EventBus中,EventBus將事件順序的通知給時間訂閱者,所以 這里面有一個重要的注意點,事件處理器必須迅速的處理,否則可能會導致時間堆積。
創建EventBus實例
創建一個EventBus實例,只需要簡單的調用構造方法:
EventBus eventBus = new EventBus();
也提供了一個帶參數的構造類,目的只是為了加上一個標識:
EventBus eventBus = new EventBus(TradeAccountEvent.class.getName());
訂閱事件
為了接受到一個事件,我們需要做下面3個步驟:
- 這個類需要定義一個只接受一個參數的public方法, 參數的類型要和訂閱的事件類型一只。
- 需要在方法上加上@Subscribe注解
- 最后我們調用EventBus的register方法注冊對象
發布事件
我們可以調用EventBus.post方法發送事件,EventBus會輪流調用所有的接受類型是發送事件類型的訂閱者,但是這里面有一個比較強大的東西,就是。。。。。。。。。。。
定義事件處理方法
如前面提到了事件處理方法只能接受一個參數,EventBus會輪流順序調用訂閱的方法,因此事件處理方法必須很快的給予響應,如果說時間處理的方法中有需要進行長時間運算的過程,我們建議另起一個線程處理。
并發
EventBus不會起多個線程去調用時間處理方法,除非我們在事件的處理方法上加上注解@AllowCOncurrentEvent,加上這個注解后我們就會認為這個事件處理方法是線程安全的.Annotating a handler method with the @
AllowConcurrentEvent annotation by itself will not register a method with EventBus
現在我們來看看怎么使用EventBus,我們來看一些例子.
訂閱事件
我們假設我們已經像如下的方式定義了一個事件:
public class TradeAccountEvent {
private double amount;
private Date tradeExecutionTime;
private TradeType tradeType;
private TradeAccount tradeAccount;
public TradeAccountEvent(TradeAccount account, double amount,
Date tradeExecutionTime, TradeType tradeType) {
checkArgument(amount > 0.0, "Trade can't be less than
zero");
this.amount = amount;
this.tradeExecutionTime =
checkNotNull(tradeExecutionTime,"ExecutionTime can't be null");
this.tradeAccount = checkNotNull(account,"Account can't be
null");
this.tradeType = checkNotNull(tradeType,"TradeType can't
be null");
}
由上面可以看出,無論是買或者賣的事件發生,我們都會創建一個TradeAccountEvent對象,現在讓我們考慮一下我們當這個事件被執行時我們希望監聽者能夠接收到,我們定義SimpleTradeAuditor類:
public class SimpleTradeAuditor {
private List<TradeAccountEvent> tradeEvents =
Lists.newArrayList();
public SimpleTradeAuditor(EventBus eventBus){
eventBus.register(this);
}
@Subscribe
public void auditTrade(TradeAccountEvent tradeAccountEvent){
tradeEvents.add(tradeAccountEvent);
System.out.println("Received trade "+tradeAccountEvent);
}
}
我們可以快速的看一下代碼,在構造方法中,我們接受一個EventBus實例,接著我們注冊SimpleTradeAuditor類到EventBus中,接受事件TradeAccountEvents. 通過指定@Subscribe注解說明哪個方法是事件處理器. 上面例子中的處理方式:將event加入到list中,并且在控制臺中打印出來.
事件發布 例子
現在我們看一下怎樣發布一個事件,看下面的類:
public class SimpleTradeExecutor {
private EventBus eventBus;
public SimpleTradeExecutor(EventBus eventBus) {
this.eventBus = eventBus;
}
public void executeTrade(TradeAccount tradeAccount, double
amount, TradeType tradeType){
TradeAccountEvent tradeAccountEvent =
processTrade(tradeAccount, amount, tradeType);
eventBus.post(tradeAccountEvent);
}
private TradeAccountEvent processTrade(TradeAccount
tradeAccount, double amount, TradeType tradeType){
Date executionTime = new Date();
String message = String.format("Processed trade for %s of
amount %n type %s @
%s",tradeAccount,amount,tradeType,executionTime);
TradeAccountEvent tradeAccountEvent = new TradeAccountEvent(tr
adeAccount,amount,executionTime,tradeType);
System.out.println(message);
return tradeAccountEvent;
}
}
像上面的SimpleTradeAuditor類一樣,在SimpleTradeExecutor的構造方法中我們也接受一個EventBus作為構造參數. 和 SimpleTradeAuditor類,為了方便后面使用,我們使用了一個成員變量引用了eventbus類,盡管大多數情況下,在兩個類中使用同一個eventBus實例是不好的,我們將在后面的例子中去看怎樣使用多個EventBus實例。 但是在這個例子中,我們使用同一個實例. SimpleTradeExecutor類有一個公開的方法,executeTrade接受我們處理一個trade的所有信息。 在這個例子中我們調用processTrade方法,傳入了必要的信心并且在控制臺中打印了交易已經被執行,并且返回一個TradeAccountEvent實例。 當processTrade 方法執行完,我們調用EventBus的post方法將TradeAccountEvent作為參數, 這樣所有訂閱TradeAccountEvent事件的訂閱者都會收到這個消息。 這樣我們就可以看到,publish 類和 scribe類通過消息解耦了
精確訂閱
我們剛才了解了怎樣使用EventBus訂閱發布事件。 我們知道 EventBus事件的發布與訂閱是基于事件類型的, 這樣我們就可以通過事件類型將事件發送給不同的訂閱者。 比如: 如果我們我們想分別訂閱 買和賣事件。 首先我們創建兩種類型的事件:
public class SellEvent extends TradeAccountEvent {
public SellEvent(TradeAccount tradeAccount, double amount, Date
tradExecutionTime) {
super(tradeAccount, amount, tradExecutionTime, TradeType.
SELL);
}
}
public class BuyEvent extends TradeAccountEvent {
public BuyEvent(TradeAccount tradeAccount, double amount, Date
tradExecutionTime) {
super(tradeAccount, amount, tradExecutionTime, TradeType.BUY);
}
}
現在我們創建了兩種不同類型的事件實例,SellEvent和BuyEvent,兩個事件都繼承了TradeAccountEvent。 我們能夠實現分別的訂閱,我們先創建一個能夠訂閱SellEvent的實例:
public class TradeSellAuditor {
private List<SellEvent> sellEvents = Lists.newArrayList();
public TradeSellAuditor(EventBus eventBus) {
eventBus.register(this);
}
@Subscribe
public void auditSell(SellEvent sellEvent){
sellEvents.add(sellEvent);
System.out.println("Received SellEvent "+sellEvent);
}
public List<SellEvent> getSellEvents() {
return sellEvents;
}
}
從功能點上來看,這個實例和我們之前的SimpleTradeAuditor差不多,只不過上面的這個實例只接受SellEvent事件,下面我們再創建一個只接受BuyEvent事件的實例:
public class TradeBuyAuditor {
private List<BuyEvent> buyEvents = Lists.newArrayList();
public TradeBuyAuditor(EventBus eventBus) {
eventBus.register(this);
}
@Subscribe
public void auditBuy(BuyEvent buyEvent){
buyEvents.add(buyEvent);
System.out.println("Received TradeBuyEvent "+buyEvent);
}
public List<BuyEvent> getBuyEvents() {
return buyEvents;
}
}
現在我們只需要重構我們的SimpleTradeExecutor類去基于buy或者sell創建正確的TradeAccountEvent。
public class BuySellTradeExecutor {
… deatails left out for clarity same as SimpleTradeExecutor
//The executeTrade() method is unchanged from SimpleTradeExecutor
private TradeAccountEvent processTrade(TradeAccount tradeAccount,
double amount, TradeType tradeType) {
Date executionTime = new Date();
String message = String.format("Processed trade for %s of
amount %n type %s @ %s", tradeAccount, amount, tradeType,
executionTime);
TradeAccountEvent tradeAccountEvent;
if (tradeType.equals(TradeType.BUY)) {
tradeAccountEvent = new BuyEvent(tradeAccount, amount,
executionTime);
} else {
tradeAccountEvent = new SellEvent(tradeAccount,
amount, executionTime);
}
System.out.println(message);
return tradeAccountEvent;
}
}
這里我們創建了和SimpleTradeExecutor功能相似的類:BuySellTradeExecutor,只不過BuySellTradeExecutor根據交易類型創建了不同的事件,BuyEvent和SellEvent。 我們發布了不同的事件,注冊了不通的訂閱者,但是EventBus對這樣的改變沒有感知。 為了接受這兩個事件,我們不需要創建兩個類,我們只需要像如下這種方式就可以:
public class AllTradesAuditor {
private List<BuyEvent> buyEvents = Lists.newArrayList();
private List<SellEvent> sellEvents = Lists.newArrayList();public AllTradesAuditor(EventBus eventBus) {
eventBus.register(this);
}
@Subscribe
public void auditSell(SellEvent sellEvent){
sellEvents.add(sellEvent);
System.out.println("Received TradeSellEvent "+sellEvent);
}
@Subscribe
public void auditBuy(BuyEvent buyEvent){
buyEvents.add(buyEvent);
System.out.println("Received TradeBuyEvent "+buyEvent);
}
}
上面我們創建一個實例,有兩個事件處理方法,這樣AllTradeAuditor會接受所有的Trade事件。 哪個方法被調用取決于EventBus發送什么類型的事件。 為了驗證一下,我們可以寫一個方法接受Object類型的參數,這樣就可以收到所有的事件。
下面我們來考慮一下我們有多個EventBus實例。 如果我們把BuySellTradeExecutor類拆分成兩個類,這樣我們就可以注入兩個不同的EventBus實例,但是在訂閱類中就要注意注入的是哪個類了。 關于這個例子我們在這里不討論了,代碼可以見
bbejeck.guava.chapter7.config 包。
取消事件訂閱
我們訂閱事件的時候,肯定也會想到在某個時間點我們需要取消訂閱某個事件。 取消事件訂閱只需要調用eventBus.unregister方法。 如果我們知道我們在某個時刻想要停止對某個事件的處理,我們可以按照如下的方式處理:
public void unregister(){
this.eventBus.unregister(this);
}
一旦上面的方法被調用,就不在會收到任何事件,其他的沒有取消訂閱的會繼續收到事件.
異步事件總線
我們之前一直強調事件處理器的處理邏輯要簡單。 因為EventBus是順序的處理每一個事件的。 我們還有另外一種處理方式: AsyncEventBus. AsyncEventBus提供了和EventBus相同的功能。只是在處理事件的時候采用了Java.util.concurrent.Executor 調用事件處理器。
創建一個異步EventBus實例
創建AsyncEvent和創建一個EventBus差不多:
AsyncEventBus asyncEventBus = new AsyncEventBus(executorService);
我們創建一個傳入ExecutorService實例的AsyncEvent實例。 我們還有一個接受兩個參數的構造函數,接受另外一個string表明ExecutorService的身份。 AysncEventBus在事件處理器需要花費時間比較長的場景下比較適合。
DeadEvent
當一個事件沒有監聽者,我們就會將這樣的事件包裝成DeadEvent,這樣有一個方法監聽DeadEvent我們就可以知道哪些事件沒有監聽者。
public class DeadEventSubscriber {
private static final Logger logger =
Logger.getLogger(DeadEventSubscriber.class);
public DeadEventSubscriber(EventBus eventBus) {
eventBus.register(this);
}
@Subscribe
public void handleUnsubscribedEvent(DeadEvent deadEvent){
logger.warn("No subscribers for "+deadEvent.getEvent());
}
}
上面的例子中我們簡單的注冊了一個監聽DeadEvent的監聽者,簡單的記錄了沒有被監聽的事件。
依賴注入
為了確保我們注冊的監聽者和發布者是同一個EventBus實例,我們使用Spring來實現EventBus的注入。 在下面的例子中,我們將展示怎樣使用Spring框架去配置SimpleTradeAuditor 和 SimpleTradeExecutor類。首先我們需要對SimpleTradeAuditor和SimpleTradeExecutor類做如下的改變:
@Component
public class SimpleTradeExecutor {
private EventBus eventBus;
@Autowired
public SimpleTradeExecutor(EventBus eventBus) {
this.eventBus = checkNotNull(eventBus, "EventBus can't be
null");
}@Component
public class SimpleTradeAuditor {
private List<TradeAccountEvent> tradeEvents =
Lists.newArrayList();
@Autowired
public SimpleTradeAuditor(EventBus eventBus){
checkNotNull(eventBus,"EventBus can't be null");
eventBus.register(this);
}
我們首先在兩個類上加上了@Component注解。 這樣可以讓Spring把這兩個類當作可以注入的bean。這里我們使用的構造方法注入。所以我們加上了@Autowired注解。 加上了@Autowired注解spring就可以幫助我們注入EventBus類。
@Configuration
@ComponentScan(basePackages = {"bbejeck.guava.chapter7.publisher",
"bbejeck.guava.chapter7.subscriber"})
public class EventBusConfig {
@Bean
public EventBus eventBus() {
return new EventBus();
}
}
這里我們的類上加上了@Configuration注解,這樣spring就把這個類當作Context,在這個類中我們返回了EventBus實例。這樣spring就對上面的兩個類注入了同一個EventBus,這正是我們想要的,至于spring是怎么做到的,不在本書考慮的范圍。
總結
在這一章,我們講解了怎樣使用Guava進行事件驅動編程,來降低模塊之間的耦合,我們講解了怎么創建EventBus實例,并且怎樣注冊監聽者和發布者。 并且我們也剖析了怎樣更具事件類型去監聽事件。 最后我們還學習了使用AsyncEventBus類,可以讓我們異步的發送事件。我們也學習了怎樣使用DeadEvent類去確保我們監聽了所有的事件。 最后,我們還學習了使用依賴注入來使得我們可以更容易創建基于事件的系統。