EventPoller 是什么?
EventPoller
?是 Disruptor 框架中一種 基于輪詢(poll-based) 的事件消費機制。它與我們更常見的?BatchEventProcessor
(基于獨立的消費者線程)形成了對比。核心區別在于:
BatchEventProcessor
?(推模式): Disruptor 會為你創建一個專門的線程。一旦有事件發布,BatchEventProcessor
?會在一個無限循環中自動、持續地處理事件,并推送給你的事件處理器(EventHandler
)。你只需要提供處理邏輯,不用關心線程管理。EventPoller
?(拉模式):?EventPoller
?不會自己創建線程。它提供一個?poll()
?方法,讓你可以在任何你選擇的線程中,主動地去“拉取”和處理事件。控制權完全在你手中。
這種設計使得?EventPoller
?非常適合與那些生命周期不受 Disruptor 控制的現有線程進行集成。
我們來看一下?EventPoller.java
?中的關鍵部分:
?內部組件(核心字段)
EventPoller
?的實現依賴于四個核心的 final 字段,它們在構造時被注入,定義了 Poller 的行為。
// ... existing code ...
public class EventPoller<T>
{private final DataProvider<T> dataProvider;private final Sequencer sequencer;private final Sequence sequence;private final Sequence gatingSequence;
// ... existing code ...
dataProvider
: 事件的提供者,通常就是?RingBuffer
。EventPoller
?通過它來獲取指定序號(sequence)的事件對象。sequencer
: 序列號管理器。這是 Disruptor 的核心組件,負責協調生產者和消費者之間的進度。EventPoller
?用它來查詢當前可消費的事件序列范圍。sequence
:?EventPoller
?自身的消費進度序列。它記錄了當前 Poller 已經成功處理到的事件的 sequence。每次?poll
?調用成功后,這個?sequence
?會被更新。gatingSequence
: 門控序列。這是一個非常重要的概念。EventPoller
?在消費事件前,必須確保其依賴的前置消費者已經處理完這些事件。gatingSequence
?就代表了這些前置依賴的進度。EventPoller
?能消費到的最大序列號,不能超過?gatingSequence
?的當前值。如果沒有其他消費者依賴,它通常會直接依賴生產者的游標(cursor)。
Handler<T>
?接口
這是?EventPoller
?的核心回調接口,你需要實現它來定義事件處理邏輯。
// ... existing code ...public interface Handler<T>{/*** Called for each event to consume it** @param event the event* @param sequence the sequence of the event* @param endOfBatch whether this event is the last in the batch* @return whether to continue consuming events. If {@code false}, the poller will not feed any more events* to the handler until {@link EventPoller#poll(Handler)} is called again* @throws Exception any exceptions thrown by the handler will be propagated to the caller of {@code poll}*/boolean onEvent(T event, long sequence, boolean endOfBatch) throws Exception;}
// ... existing code ...
onEvent(T event, long sequence, boolean endOfBatch)
:event
: 當前需要處理的事件對象。sequence
: 事件在 Ring Buffer 中的序號。endOfBatch
: 標志這是否是當前?poll()
?調用所能獲取到的一批事件中的最后一個。- 返回值 (boolean): 這是關鍵!
- 返回?
true
:?EventPoller
?會繼續嘗試處理下一個可用的事件(如果存在的話)。 - 返回?
false
:?poll()
?方法會立即停止處理并返回,即使后面還有可用的事件。
- 返回?
?在?PullWithPoller.java
?示例中,handler 總是返回?false
,實現了每次?poll
?只處理一個事件的效果。
// ... existing code ...private static Object getNextValue(final EventPoller<DataEvent<Object>> poller) throws Exception{final Object[] out = new Object[1];poller.poll((event, sequence, endOfBatch) ->{out[0] = event.copyOfData();// Return false so that only one event is processed at a time.return false;});return out[0];}
// ... existing code ...
poll(Handler<T> eventHandler)
?方法
這是?EventPoller
?的“引擎”,你會在你的線程循環中反復調用它。
// ... existing code ...public PollState poll(final Handler<T> eventHandler) throws Exception{final long currentSequence = sequence.get();long nextSequence = currentSequence + 1;final long availableSequence = sequencer.getHighestPublishedSequence(nextSequence, gatingSequence.get());if (nextSequence <= availableSequence){
// ... existing code ...try{do{final T event = dataProvider.get(nextSequence);processNextEvent = eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);processedSequence = nextSequence;nextSequence++;}while (nextSequence <= availableSequence && processNextEvent);}
// ... existing code ...return PollState.PROCESSING;}else if (sequencer.getCursor() >= nextSequence){return PollState.GATING;}else{return PollState.IDLE;}}
// ... existing code ...
它返回一個?PollState
?枚舉,告訴你輪詢的結果:
PollState.PROCESSING
: 本次?poll()
?調用成功處理了一個或多個事件。PollState.GATING
: 有新的事件已經被發布了,但是被前置的消費者(gatingSequence
)阻塞了。你需要等待前置消費者處理完,才能繼續。PollState.IDLE
: Ring Buffer 中沒有任何新的、可供處理的事件。
PollState
?枚舉
poll
?方法的返回值,用于告訴調用者當前輪詢的結果。
// ... existing code ...public enum PollState{/*** The poller processed one or more events*/PROCESSING,/*** The poller is waiting for gated sequences to advance before events become available*/GATING,/*** No events need to be processed*/IDLE}
// ... existing code ...
PROCESSING
: 本次?poll
?成功處理了至少一個事件。GATING
: 當前沒有可處理的事件,因為被?gatingSequence
?阻塞了。意思是生產者已經發布了新的事件,但是前置依賴的消費者還沒跟上。調用者看到這個狀態,通常會選擇?Thread.yield()
?或短暫休眠,等待依賴方前進。IDLE
: 當前沒有可處理的_任何_事件。意思是連生產者都還沒有發布新的事件。調用者看到這個狀態,可以認為工作隊列是空的。
EventPoller
?的用法
想象一個場景:你正在開發一個游戲,你有一個主游戲循環(Game Loop)線程。你希望在這個主循環中處理來自網絡模塊的事件(例如玩家移動、聊天消息等),而這些事件是通過 Disruptor 傳遞的。你不能為了處理事件而阻塞游戲循環,也不想再創建一個新線程。這時?EventPoller
?就是完美的解決方案。
步驟 1: 創建?EventPoller
通常,我們不直接調用?new EventPoller(...)
,而是使用?RingBuffer
?的工廠方法?newPoller()
。
// 假設你已經設置好了 Disruptor 和 RingBuffer
RingBuffer<MyEvent> ringBuffer = disruptor.getRingBuffer();// 如果你的 Poller 消費需要依賴其他消費者,你需要提供它們的 Sequence
// 如果沒有依賴,可以不傳參數
// 例如,依賴 consumer1 和 consumer2
Sequence[] gatingSequences = new Sequence[]{consumer1.getSequence(), consumer2.getSequence()};
EventPoller<MyEvent> poller = ringBuffer.newPoller(gatingSequences);// 重要: 創建 Poller 后,需要將它的 Sequence 添加回 RingBuffer 的 Gating 列表中
// 這樣,生產者(Publisher)才會等待你的 Poller,避免覆蓋還未處理的事件
ringBuffer.addGatingSequences(poller.getSequence());
步驟 2: 實現?Handler
定義你的事件處理邏輯。
public class MyGameEventHandler implements EventPoller.Handler<MyEvent> {@Overridepublic boolean onEvent(MyEvent event, long sequence, boolean endOfBatch) {// 在這里處理游戲事件,例如更新玩家位置System.out.printf("處理事件: %s, 序號: %d, 是否是批次末尾: %b%n",event.toString(), sequence, endOfBatch);// 通常返回 true,讓 poll() 處理完所有可用事件return true;}
}
步驟 3: 在你的線程中輪詢
在你的主線程(例如游戲循環)中,調用?poll()
。
// 在你的游戲循環線程中
MyGameEventHandler handler = new MyGameEventHandler();
boolean running = true;while (running) {// --- 游戲邏輯的其他部分:渲染、物理計算等 ---updateGamePhysics();renderGraphics();// --- 從 Disruptor 中拉取并處理事件 ---try {// 調用 poll(),它會處理當前所有可用的事件,然后立即返回EventPoller.PollState state = poller.poll(handler);// 如果沒有事件,可以做一些其他工作或短暫休眠,避免CPU空轉if (state == EventPoller.PollState.IDLE || state == EventPoller.PollState.GATING) {// yield or sleepThread.yield();}} catch (Exception e) {// 處理異常e.printStackTrace();}
}
?
poll詳解
這是?EventPoller
?最核心的方法,我們來逐行分析它的邏輯。
// ... existing code ...public PollState poll(final Handler<T> eventHandler) throws Exception{final long currentSequence = sequence.get();long nextSequence = currentSequence + 1;final long availableSequence = sequencer.getHighestPublishedSequence(nextSequence, gatingSequence.get());if (nextSequence <= availableSequence){boolean processNextEvent;long processedSequence = currentSequence;try{do{final T event = dataProvider.get(nextSequence);processNextEvent = eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);processedSequence = nextSequence;nextSequence++;}while (nextSequence <= availableSequence && processNextEvent);}finally{sequence.set(processedSequence);}return PollState.PROCESSING;}else if (sequencer.getCursor() >= nextSequence){return PollState.GATING;}else{return PollState.IDLE;}}
// ... existing code ...
獲取序列:
currentSequence = sequence.get()
: 獲取當前 Poller 的消費進度。nextSequence = currentSequence + 1
: 確定我們想要消費的下一個事件的序列號。availableSequence = sequencer.getHighestPublishedSequence(...)
: 這是關鍵一步。它向?sequencer
?查詢,在?nextSequence
?和?gatingSequence.get()
?之間,生產者已經發布的最大可用序列號是多少。這個返回值?availableSequence
?就是本次?poll
?調用可以處理的事件序列號的上限。
處理可用事件 (
if
?分支):if (nextSequence <= availableSequence)
: 如果為?true
,說明至少有一個事件是可用的。do-while
?循環:- 循環處理從?
nextSequence
?到?availableSequence
?的所有事件。 dataProvider.get(nextSequence)
: 從 RingBuffer 中獲取事件。eventHandler.onEvent(...)
: 調用用戶提供的?Handler
?來處理事件。processNextEvent = ...
:?Handler
?的返回值決定是否繼續循環。processedSequence = nextSequence
: 在事件被成功傳遞給?handler
?后,更新?processedSequence
?變量。
- 循環處理從?
finally { sequence.set(processedSequence); }
:?至關重要。無論?do-while
?循環是正常結束還是因為?handler
?拋出異常而中斷,finally
?塊都會執行。它將 Poller 自身的?sequence
?更新為最后一個已成功處理的事件的序列號。這保證了消費進度的正確性和持久化,下次調用?poll
?時能從正確的位置開始。return PollState.PROCESSING
: 返回“處理中”狀態。
等待 (
else if
?和?else
?分支):else if (sequencer.getCursor() >= nextSequence)
: 如果沒有可用事件(即?nextSequence > availableSequence
),但生產者的游標?sequencer.getCursor()
?已經超過了我們想消費的?nextSequence
,這說明我們是被?gatingSequence
?卡住了。返回?GATING
。else
: 如果連生產者的游標都還沒到?nextSequence
,說明根本沒有新事件。返回?IDLE
。
?
總結
EventPoller
?是 Disruptor 提供的一個強大而靈活的工具。它犧牲了?BatchEventProcessor
?的易用性和自動化的線程管理,換來了對消費流程的完全控制。
適用場景:
- 當你的消費邏輯需要在某個現有線程(例如,游戲主循環、網絡IO線程)中執行時。
- 當你需要實現比 Disruptor 內置等待策略更復雜的消費調度邏輯時。
- 當你需要以非阻塞的方式檢查是否有新事件,并根據結果執行不同邏輯分支時。
注意事項:
- 你需要自己管理輪詢循環。
- 你需要自己處理當沒有事件時(
IDLE
?或?GATING
?狀態)的等待策略,以防止 CPU 100% 忙等。可以使用?Thread.yield()
、Thread.sleep()
?或更高級的等待策略。 - 別忘了將?
poller.getSequence()
?添加回?ringBuffer
?的?gatingSequences
?中。
使用模式: 典型的使用方式是在你自己的循環中調用?poll()
,并根據返回的?PollState
?決定下一步行動:
// 偽代碼
while (isRunning) {PollState state = poller.poll(myHandler);switch (state) {case IDLE:case GATING:// 沒有事件或被阻塞,可以出讓CPU或做點別的事Thread.yield(); break;case PROCESSING:// 事件被處理了,可能還有更多,可以立即再次嘗試break;}
}