深入解析EventPoller:Disruptor的輪詢式事件處理機制

EventPoller 是什么?

EventPoller?是 Disruptor 框架中一種 基于輪詢(poll-based) 的事件消費機制。它與我們更常見的?BatchEventProcessor(基于獨立的消費者線程)形成了對比。核心區別在于:

  • BatchEventProcessor?(推模式): Disruptor 會為你創建一個專門的線程。一旦有事件發布,BatchEventProcessor?會在一個無限循環中自動、持續地處理事件,并推送給你的事件處理器(EventHandler)。你只需要提供處理邏輯,不用關心線程管理。
  • EventPoller?(拉模式):?EventPoller?不會自己創建線程。它提供一個?poll()?方法,讓你可以在任何你選擇的線程中,主動地去“拉取”和處理事件。控制權完全在你手中。

這種設計使得?EventPoller?非常適合與那些生命周期不受 Disruptor 控制的現有線程進行集成。

我們來看一下?EventPoller.java?中的關鍵部分:

?內部組件(核心字段)

EventPoller?的實現依賴于四個核心的 final 字段,它們在構造時被注入,定義了 Poller 的行為。

// ... existing code ...
public class EventPoller<T>
{private final DataProvider<T> dataProvider;private final Sequencer sequencer;private final Sequence sequence;private final Sequence gatingSequence;
// ... existing code ...
  • dataProvider: 事件的提供者,通常就是?RingBufferEventPoller?通過它來獲取指定序號(sequence)的事件對象。
  • sequencer: 序列號管理器。這是 Disruptor 的核心組件,負責協調生產者和消費者之間的進度。EventPoller?用它來查詢當前可消費的事件序列范圍。
  • sequence:?EventPoller?自身的消費進度序列。它記錄了當前 Poller 已經成功處理到的事件的 sequence。每次?poll?調用成功后,這個?sequence?會被更新。
  • gatingSequence: 門控序列。這是一個非常重要的概念。EventPoller?在消費事件前,必須確保其依賴的前置消費者已經處理完這些事件。gatingSequence?就代表了這些前置依賴的進度。EventPoller?能消費到的最大序列號,不能超過?gatingSequence?的當前值。如果沒有其他消費者依賴,它通常會直接依賴生產者的游標(cursor)。

Handler<T>?接口

這是?EventPoller?的核心回調接口,你需要實現它來定義事件處理邏輯。

// ... existing code ...public interface Handler<T>{/*** Called for each event to consume it** @param event the event* @param sequence the sequence of the event* @param endOfBatch whether this event is the last in the batch* @return whether to continue consuming events. If {@code false}, the poller will not feed any more events*         to the handler until {@link EventPoller#poll(Handler)} is called again* @throws Exception any exceptions thrown by the handler will be propagated to the caller of {@code poll}*/boolean onEvent(T event, long sequence, boolean endOfBatch) throws Exception;}
// ... existing code ...
  • onEvent(T event, long sequence, boolean endOfBatch):
    • event: 當前需要處理的事件對象。
    • sequence: 事件在 Ring Buffer 中的序號。
    • endOfBatch: 標志這是否是當前?poll()?調用所能獲取到的一批事件中的最后一個。
    • 返回值 (boolean): 這是關鍵!
      • 返回?true:?EventPoller?會繼續嘗試處理下一個可用的事件(如果存在的話)。
      • 返回?false:?poll()?方法會立即停止處理并返回,即使后面還有可用的事件。

?在?PullWithPoller.java?示例中,handler 總是返回?false,實現了每次?poll?只處理一個事件的效果。

// ... existing code ...private static Object getNextValue(final EventPoller<DataEvent<Object>> poller) throws Exception{final Object[] out = new Object[1];poller.poll((event, sequence, endOfBatch) ->{out[0] = event.copyOfData();// Return false so that only one event is processed at a time.return false;});return out[0];}
// ... existing code ...

poll(Handler<T> eventHandler)?方法

這是?EventPoller?的“引擎”,你會在你的線程循環中反復調用它。

// ... existing code ...public PollState poll(final Handler<T> eventHandler) throws Exception{final long currentSequence = sequence.get();long nextSequence = currentSequence + 1;final long availableSequence = sequencer.getHighestPublishedSequence(nextSequence, gatingSequence.get());if (nextSequence <= availableSequence){
// ... existing code ...try{do{final T event = dataProvider.get(nextSequence);processNextEvent = eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);processedSequence = nextSequence;nextSequence++;}while (nextSequence <= availableSequence && processNextEvent);}
// ... existing code ...return PollState.PROCESSING;}else if (sequencer.getCursor() >= nextSequence){return PollState.GATING;}else{return PollState.IDLE;}}
// ... existing code ...

它返回一個?PollState?枚舉,告訴你輪詢的結果:

  • PollState.PROCESSING: 本次?poll()?調用成功處理了一個或多個事件。
  • PollState.GATING: 有新的事件已經被發布了,但是被前置的消費者(gatingSequence)阻塞了。你需要等待前置消費者處理完,才能繼續。
  • PollState.IDLE: Ring Buffer 中沒有任何新的、可供處理的事件。

PollState?枚舉

poll?方法的返回值,用于告訴調用者當前輪詢的結果。

// ... existing code ...public enum PollState{/*** The poller processed one or more events*/PROCESSING,/*** The poller is waiting for gated sequences to advance before events become available*/GATING,/*** No events need to be processed*/IDLE}
// ... existing code ...
  • PROCESSING: 本次?poll?成功處理了至少一個事件。
  • GATING: 當前沒有可處理的事件,因為被?gatingSequence?阻塞了。意思是生產者已經發布了新的事件,但是前置依賴的消費者還沒跟上。調用者看到這個狀態,通常會選擇?Thread.yield()?或短暫休眠,等待依賴方前進。
  • IDLE: 當前沒有可處理的_任何_事件。意思是連生產者都還沒有發布新的事件。調用者看到這個狀態,可以認為工作隊列是空的。

EventPoller?的用法

想象一個場景:你正在開發一個游戲,你有一個主游戲循環(Game Loop)線程。你希望在這個主循環中處理來自網絡模塊的事件(例如玩家移動、聊天消息等),而這些事件是通過 Disruptor 傳遞的。你不能為了處理事件而阻塞游戲循環,也不想再創建一個新線程。這時?EventPoller?就是完美的解決方案。

步驟 1: 創建?EventPoller

通常,我們不直接調用?new EventPoller(...),而是使用?RingBuffer?的工廠方法?newPoller()

// 假設你已經設置好了 Disruptor 和 RingBuffer
RingBuffer<MyEvent> ringBuffer = disruptor.getRingBuffer();// 如果你的 Poller 消費需要依賴其他消費者,你需要提供它們的 Sequence
// 如果沒有依賴,可以不傳參數
// 例如,依賴 consumer1 和 consumer2
Sequence[] gatingSequences = new Sequence[]{consumer1.getSequence(), consumer2.getSequence()};
EventPoller<MyEvent> poller = ringBuffer.newPoller(gatingSequences);// 重要: 創建 Poller 后,需要將它的 Sequence 添加回 RingBuffer 的 Gating 列表中
// 這樣,生產者(Publisher)才會等待你的 Poller,避免覆蓋還未處理的事件
ringBuffer.addGatingSequences(poller.getSequence());
步驟 2: 實現?Handler

定義你的事件處理邏輯。

public class MyGameEventHandler implements EventPoller.Handler<MyEvent> {@Overridepublic boolean onEvent(MyEvent event, long sequence, boolean endOfBatch) {// 在這里處理游戲事件,例如更新玩家位置System.out.printf("處理事件: %s, 序號: %d, 是否是批次末尾: %b%n",event.toString(), sequence, endOfBatch);// 通常返回 true,讓 poll() 處理完所有可用事件return true;}
}
步驟 3: 在你的線程中輪詢

在你的主線程(例如游戲循環)中,調用?poll()

// 在你的游戲循環線程中
MyGameEventHandler handler = new MyGameEventHandler();
boolean running = true;while (running) {// --- 游戲邏輯的其他部分:渲染、物理計算等 ---updateGamePhysics();renderGraphics();// --- 從 Disruptor 中拉取并處理事件 ---try {// 調用 poll(),它會處理當前所有可用的事件,然后立即返回EventPoller.PollState state = poller.poll(handler);// 如果沒有事件,可以做一些其他工作或短暫休眠,避免CPU空轉if (state == EventPoller.PollState.IDLE || state == EventPoller.PollState.GATING) {// yield or sleepThread.yield();}} catch (Exception e) {// 處理異常e.printStackTrace();}
}

?

    poll詳解

    這是?EventPoller?最核心的方法,我們來逐行分析它的邏輯。

    // ... existing code ...public PollState poll(final Handler<T> eventHandler) throws Exception{final long currentSequence = sequence.get();long nextSequence = currentSequence + 1;final long availableSequence = sequencer.getHighestPublishedSequence(nextSequence, gatingSequence.get());if (nextSequence <= availableSequence){boolean processNextEvent;long processedSequence = currentSequence;try{do{final T event = dataProvider.get(nextSequence);processNextEvent = eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);processedSequence = nextSequence;nextSequence++;}while (nextSequence <= availableSequence && processNextEvent);}finally{sequence.set(processedSequence);}return PollState.PROCESSING;}else if (sequencer.getCursor() >= nextSequence){return PollState.GATING;}else{return PollState.IDLE;}}
    // ... existing code ...
    
    1. 獲取序列

      • currentSequence = sequence.get(): 獲取當前 Poller 的消費進度。
      • nextSequence = currentSequence + 1: 確定我們想要消費的下一個事件的序列號。
      • availableSequence = sequencer.getHighestPublishedSequence(...): 這是關鍵一步。它向?sequencer?查詢,在?nextSequence?和?gatingSequence.get()?之間,生產者已經發布的最大可用序列號是多少。這個返回值?availableSequence?就是本次?poll?調用可以處理的事件序列號的上限。
    2. 處理可用事件 (if?分支)

      • if (nextSequence <= availableSequence): 如果為?true,說明至少有一個事件是可用的。
      • do-while?循環:
        • 循環處理從?nextSequence?到?availableSequence?的所有事件。
        • dataProvider.get(nextSequence): 從 RingBuffer 中獲取事件。
        • eventHandler.onEvent(...): 調用用戶提供的?Handler?來處理事件。
        • processNextEvent = ...:?Handler?的返回值決定是否繼續循環。
        • processedSequence = nextSequence: 在事件被成功傳遞給?handler?后,更新?processedSequence?變量。
      • finally { sequence.set(processedSequence); }:?至關重要。無論?do-while?循環是正常結束還是因為?handler?拋出異常而中斷,finally?塊都會執行。它將 Poller 自身的?sequence?更新為最后一個已成功處理的事件的序列號。這保證了消費進度的正確性和持久化,下次調用?poll?時能從正確的位置開始。
      • return PollState.PROCESSING: 返回“處理中”狀態。
    3. 等待 (else if?和?else?分支)

      • else if (sequencer.getCursor() >= nextSequence): 如果沒有可用事件(即?nextSequence > availableSequence),但生產者的游標?sequencer.getCursor()?已經超過了我們想消費的?nextSequence,這說明我們是被?gatingSequence?卡住了。返回?GATING
      • else: 如果連生產者的游標都還沒到?nextSequence,說明根本沒有新事件。返回?IDLE

    ?

    總結

      EventPoller?是 Disruptor 提供的一個強大而靈活的工具。它犧牲了?BatchEventProcessor?的易用性和自動化的線程管理,換來了對消費流程的完全控制。

      適用場景

      • 當你的消費邏輯需要在某個現有線程(例如,游戲主循環、網絡IO線程)中執行時。
      • 當你需要實現比 Disruptor 內置等待策略更復雜的消費調度邏輯時。
      • 當你需要以非阻塞的方式檢查是否有新事件,并根據結果執行不同邏輯分支時。

      注意事項

      • 你需要自己管理輪詢循環。
      • 你需要自己處理當沒有事件時(IDLE?或?GATING?狀態)的等待策略,以防止 CPU 100% 忙等。可以使用?Thread.yield()Thread.sleep()?或更高級的等待策略。
      • 別忘了將?poller.getSequence()?添加回?ringBuffer?的?gatingSequences?中。

      使用模式: 典型的使用方式是在你自己的循環中調用?poll(),并根據返回的?PollState?決定下一步行動:

      // 偽代碼
      while (isRunning) {PollState state = poller.poll(myHandler);switch (state) {case IDLE:case GATING:// 沒有事件或被阻塞,可以出讓CPU或做點別的事Thread.yield(); break;case PROCESSING:// 事件被處理了,可能還有更多,可以立即再次嘗試break;}
      }
      

      本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
      如若轉載,請注明出處:http://www.pswp.cn/pingmian/93718.shtml
      繁體地址,請注明出處:http://hk.pswp.cn/pingmian/93718.shtml
      英文地址,請注明出處:http://en.pswp.cn/pingmian/93718.shtml

      如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

      相關文章

      K8S-Secret資源對象

      目錄 一、Secret概述 二、Secret 類型 kubectl 創建類型 三、Secret 使用 Opaque 類型 Secret 的使用 創建 yaml 一、Secret概述 k8s secrets用于存儲和管理一些敏感數據&#xff0c;比如密碼&#xff0c;token&#xff0c;密鑰等敏感信息。它把 Pod 想要訪問的加密數據…

      lua入門以及在Redis中的應用

      1.基本語法1.1變量lua的變量有&#xff1a;無效值nil&#xff0c;布爾值boolean&#xff0c;數字number、字符串string、函數function、自定義類型userdata、線程thread、表table&#xff08;key-value結構&#xff09;1.2循環數值循環for i起始值, 結束值 ,間隔值 do---option…

      淘寶電商大數據采集【采集內容||采集方法|工具||合規性||應用】

      淘寶電商大數據采集是指通過技術手段、工具或平臺&#xff0c;系統性收集淘寶&#xff08;及旗下天貓等&#xff09;生態內的各類數據&#xff0c;用于分析市場趨勢、用戶行為、商品表現、競品動態等&#xff0c;為電商運營、決策提供數據支持。以下從采集內容、工具方法、合規…

      ROS2核心模塊

      1.創建工作空間先創建工作空間ws01_plumbing&#xff0c;終端下進入工作空間的src目錄&#xff0c;執行如下命令&#xff1a;ros2 pkg create --build-type ament_cmake base_interfaces_demo2.話題通信話題通信是ROS中使用頻率最高的一種通信模式&#xff0c;話題通信是基于發…

      Mac 上安裝并使用 frpc(FRP 內網穿透客戶端)指南

      一、先裝好 Homebrew&#xff08;macOS 的包管理器&#xff09;打開終端&#xff08;Terminal&#xff09;&#xff0c;先裝命令行開發工具 xcode-select --install彈窗點“安裝”&#xff0c;等待 3~5 分鐘。一鍵安裝 Homebrew /bin/bash -c "$(curl -fsSL https://raw.g…

      04_接口與包管理

      第4課:接口與包管理 課程目標 深入理解Go語言接口的概念和用法 掌握接口的組合和空接口 學會使用Go Modules進行包管理 理解包的導入和導出規則 1. 接口基礎 1.1 接口定義 // 基本接口定義 type Shape interface {Area() float64Perimeter()

      福昕PDF編輯軟件高級版下載與詳細圖文安裝教程!!

      軟件下載 【軟件名稱】&#xff1a; 福昕PDF編輯器高級版 【軟件大小】&#xff1a;668.9MBa a【系統要求】&#xff1a;awin10/win11或更高 福昕&#xff0c;軟件下載&#xff08;夸克網盤需手機打開&#xff09;&#xff1a;&#xff1a;福昕丨夸克網盤-資源免費下載 軟件介…

      利用無事務方式插入數據庫解決并發插入問題(最小主鍵id思路)

      一、背景 由于某業務需要回退某產品數據緩存列表Asset資源&#xff0c;主任務執行后&#xff0c;通過并行執行批量子任務進行數據回退&#xff0c;子任務中會記錄緩存列表Asset和緩存列表行AssetLine數據&#xff0c;并行執行過程會出現緩存列表行AssetLine重復插入問題&#…

      如何制作免費的比特幣冷錢包

      本文主要從技術上討論冷錢包的操作機制和原理&#xff0c;并不作為投資建議。對于國外的比特幣玩家&#xff0c;或者打算長期囤幣來對抗通貨膨脹的&#xff0c;或者是想短期持有的&#xff0c;那么將比特幣存儲在哪里是一個Common的問題。一般是兩類選擇。第一種選擇是存儲在交…

      新手向:Python制作簡易音樂播放器

      使用Python構建簡易音樂播放器音樂播放器是現代數字生活中不可或缺的工具&#xff0c;從智能手機到電腦系統&#xff0c;幾乎每個設備都內置了音樂播放功能。對于Python初學者來說&#xff0c;開發一個簡易的音樂播放器是一個很好的實踐項目&#xff0c;既能學習編程基礎&#…

      【StarRocks】TabletChecker邏輯

      TabletChecker是StarRocks FE里的一個組件,它的主要工作是檢查出所有的處于不健康狀態的tablets。 注意,它的職責就是check(檢查)。 至于tablet修復、均衡等調度工作不是TabletChecker的職責。 相關配置項 // 20秒執行一次check,代碼里是執行runAfterCatalogReady()publi…

      低空經濟展 | 優翼仿真攜eVTOL全動飛行模擬器亮相2025深圳eVTOL展

      2025深圳eVTOL展將于2025年9月23-25日在深圳坪山燕子湖國際會展中心舉行。展會以“低空經濟?eVTOL?航空應急救援?商載大型無人運輸機”為主題&#xff0c;以 “2天大會3天展覽項目考察飛行表演頒獎盛典項目路演”的多元模式&#xff0c;打造覆蓋 eVTOL全產業鏈的專業化合作平…

      AI驅動商業革新:開源大模型與零售精準營銷引領產業升級

      在當今數字化浪潮中&#xff0c;AI 正以迅猛之勢滲透至商業的每一處脈絡&#xff0c;掀起一場影響深遠的變革風暴&#xff0c;從根本上改寫著商業運轉的底層邏輯&#xff0c;創造出無數嶄新的商業契機。基礎模型領域&#xff0c;新的突破正在重塑行業格局。Meta 旗下的 LLaMA 3…

      【表的操作】

      文章目錄 一、查看所有表 1、語法 二、創建表 1、語法 2、?例 3、表在磁盤上對應的?件 4、創建數據加時使?校驗語句[if not exists] 三、查看表結構 1、語法 2、?例 四、修改表 1、語法 2、?例 (1)向表中添加?列 (2)修改某列的?度 (3)重命名某列 (4)刪除某個字段…

      【Java后端】Spring Boot 全局異常處理最佳實踐

      Spring Boot 全局異常處理最佳實踐 在日常開發中&#xff0c;異常處理幾乎是繞不過去的一個話題。尤其在 后端 API 項目 中&#xff0c;如果沒有統一的異常處理機制&#xff0c;很容易出現以下問題&#xff1a; Controller 層代碼里充斥著 try-catch&#xff0c;顯得冗余。前端…

      K8S-Configmap資源

      目錄 一、核心概念? ?定義? ?核心價值? ?與Secret的區別? ?二、核心特性? ?數據存儲? ?生命周期? ?作用域? 什么是 Configmap&#xff1f; Configmap 能解決哪些問題&#xff1f; ConfigMap 的主要作用 三、命令行直接創建 四、通過文件創建&#xf…

      MySQL InnoDB事務acid特性的原理和隔離級別的實現原理

      InnoDB存儲引擎 InnoDB存儲結構表空間 則每張表都會有一個表空間&#xff08;xxx.ibd&#xff09;&#xff0c;一個mysql實例可以對應多個表空間 系統表空間 存儲數據字典&#xff08;表結構定義、索引信息等&#xff09;、Change Buffer、Doublewrite Bufferundo log&#xff…

      Linux系統之部署nullboard任務管理工具

      Linux系統之部署nullboard任務管理工具一、nullboard介紹1.1 nullboard簡介1.2 任務看板工具介紹1.3 nullboard使用場景二、本次實踐介紹2.1 本地環境規劃2.2 本次實踐介紹三、安裝httpd軟件3.1 檢查yum倉庫3.2 安裝httpd軟件3.3 啟動httpd服務3.4 查看httpd服務狀態3.5 防火墻…

      Qt設置軟件使用期限【新版防修改系統時間】

      在工業軟件或其他領域中&#xff0c;經常會對軟件進行授權&#xff0c;軟件需要付費進行有期限的使用。以下是我用Qt設計的設置軟件使用期限的兩種方案。 主體思想&#xff1a; 1.軟件需要綁定機器&#xff0c;讓用戶無法通過復制在另一臺機器上運行。 2.由廠家提供激活碼供用戶…

      【JavaEE】多線程(線程安全問題)

      有些代碼在單個線程環境下執行正確&#xff0c;如果同樣的代碼在多個線程下同時執行可能就會出現問題&#xff0c;這個就是線程安全問題&#xff08;或者稱線程不安全問題&#xff09;&#xff0c;簡而言之就是&#xff1a;線程安全問題是由于多線程出現的問題&#xff0c;原因…