大綱
1.Disruptor簡介
2.Disruptor和BlockingQueue的壓測對比
3.Disruptor的編程模型
4.Disruptor的數據結構與生產消費模型
5.RingBuffer + Disruptor + Sequence相關類
6.Disruptor的WaitStrategy消費者等待策略
7.EventProcessor + EventHandler等類
8.Disruptor的運行原理圖
9.復雜業務需求下的編碼方案和框架
10.Disruptor的串行操作
11.Disruptor的并行操作
12.Disruptor的多邊形操作
13.Disruptor的多生產者和多消費者
1.Disruptor簡介
(1)Disruptor是什么
(2)Disruptor的特點
(3)Disruptor的核心
(1)Disruptor是什么
Martin Fowler在自己網站上寫了一篇LMAX架構的文章,在文章中他介紹了LMAX是一種新型零售金融交易平臺,能夠以很低的延遲產生大量的交易。LMAX是建立在JVM平臺上,其核心是一個業務邏輯處理器,能夠在一個線程里每秒處理6百萬訂單。LMAX業務邏輯處理器完全是運行在內存中,使用事件驅動方式,其核心是Disruptor。
(2)Disruptor的特點
大大簡化了并發程序開發的難度,性能上比Java提供的一些并發包還好。
Disruptor是一個高性能異步處理框架,實現了觀察者模式。Disruptor是無鎖的、是CPU友好的。Disruptor不會清除緩存中的數據,只會覆蓋緩存中的數據,不需要進行垃圾回收。Disruptor業務邏輯是純內存操作,使用事件驅動方式。
(3)Disruptor的核心
Disruptor核心是一個RingBuffer,RingBuffer是一個數組,沒有首尾指針。RingBuffer是一個首尾相接的環,用于在不同線程之間傳遞數據。
如果RingBuffer滿了,是繼續覆蓋還是等待消費,由生產者和消費者決定。假設RingBuffer滿了,生產者有兩個選擇:選擇一是等待RingBuffer有空位再填充,選擇二是直接覆蓋。同時消費者也有兩種選擇:選擇一是等待RingBuffer滿了再消費,選擇二是RingBuffer填充一個就消費一個。
RingBuffer有一個序號Sequence,這個序號指向數組中下一個可用元素。隨著數據不斷地填充這個數組,這個序號會一直增長,直到繞過這個環。序號指向的元素,可以通過mod計算:序號 % 長度 = 索引。建議將長度設為2的n次方,有利于二進制計算:序號 & (長度 - 1) = 索引。
Sequence通過順序遞增的序號來進行編號,以此管理正在進行交換的數據(事件)。對數據處理的過程總是沿著需要逐個遞增處理,從而實現線程安全。一個Sequence用于跟蹤標識某個特定的事件處理者的處理進度。
2.Disruptor和BlockingQueue的壓測對比
Disruptor的性能是ArrayBlockingQueue的3倍+,這里的測試代碼都是基于單線程的單生產者單消費者模式運行的。但是Disruptor本身就支持多生產者多消費者模型,測試中使用單線程明顯降低了其性能。而ArrayBlockingQueue在多生產者多消費者場景下,其性能又會比單生產者單消費者場景下更低。因此,在實際應用中,Disruptor的性能會是ArrayBlockingQueue的3倍+。
public interface Constants {int EVENT_NUM_OHM = 100000000;int EVENT_NUM_FM = 50000000;int EVENT_NUM_OM = 10000000;
}public class ArrayBlockingQueue4Test {public static void main(String[] args) {//初始化一個大小為100000000的有界隊列ArrayBlockingQueue,為了避免在測試時由于擴容影響性能,所以一開始就初始化大小為1億final ArrayBlockingQueue<Data> queue = new ArrayBlockingQueue<Data>(100000000);//開始時間final long startTime = System.currentTimeMillis();//向容器中添加元素new Thread(new Runnable() {public void run() {long i = 0;//首先把數據投遞到有界隊列ArrayBlockingQueue,單線程的生產者while (i < Constants.EVENT_NUM_OHM) {Data data = new Data(i, "c" + i);try {queue.put(data);} catch (InterruptedException e) {e.printStackTrace();}i++;}}}).start();//從容器中取出元素new Thread(new Runnable() {public void run() {int k = 0;//然后才開始消費有界隊列中的數據,單線程的消費者while (k < Constants.EVENT_NUM_OHM) {try {queue.take();} catch (InterruptedException e) {e.printStackTrace();}k++;}//結束時間long endTime = System.currentTimeMillis();//整個main函數就是單線程運行,處理1千萬數據,大概耗時3.6秒System.out.println("ArrayBlockingQueue costTime = " + (endTime - startTime) + "ms");}}).start();}
}public class DisruptorSingle4Test {public static void main(String[] args) {int ringBufferSize = 65536;final Disruptor<Data> disruptor = new Disruptor<Data>(new EventFactory<Data>() {public Data newInstance() {return new Data();}},ringBufferSize,//設置為單線程運行Executors.newSingleThreadExecutor(),//單生產者模式ProducerType.SINGLE,//new BlockingWaitStrategy()new YieldingWaitStrategy());//創建一個消費者事件處理器DataConsumer consumer = new DataConsumer();//消費數據disruptor.handleEventsWith(consumer);disruptor.start();//單線程的消費者new Thread(new Runnable() {public void run() {RingBuffer<Data> ringBuffer = disruptor.getRingBuffer();for (long i = 0; i < Constants.EVENT_NUM_OHM; i++) {long seq = ringBuffer.next();Data data = ringBuffer.get(seq);data.setId(i);data.setName("c" + i);//發布一個數據被消費的事件ringBuffer.publish(seq);}}}).start();}
}public class DataConsumer implements EventHandler<Data> {private long startTime;private int i;public DataConsumer() {this.startTime = System.currentTimeMillis();}public void onEvent(Data data, long seq, boolean bool) throws Exception {i++;if (i == Constants.EVENT_NUM_OHM) {long endTime = System.currentTimeMillis();//處理1千萬的數據,大概耗時1.1秒System.out.println("Disruptor costTime = " + (endTime - startTime) + "ms");//可見Disruptor的性能是ArrayBlockingQueue的3倍+}}
}
3.Disruptor的編程模型
(1)Disruptor的使用步驟
(2)Disruptor的使用演示
(1)Disruptor的使用步驟
步驟一:建立一個Event工廠類,用于創建數據(Event類實例對象)
步驟二:建立一個監聽事件類(Event處理器),用于處理數據(Event類實例對象)
步驟三:創建Disruptor實例,配置一系列參數
步驟四:編寫生產者組件,向Disruptor容器投遞數據
(2)Disruptor的使用演示
一.引入pom依賴
<dependency><groupId>com.lmax</groupId><artifactId>disruptor</artifactId><version>3.3.2</version>
</dependency>
二.建立Event工廠類用于創建數據
Event工廠類創建的數據就是Event類實例對象。
public class OrderEvent {//訂單的價格private long value;public long getValue() {return value;}public void setValue(long value) {this.value = value;}
}public class OrderEventFactory implements EventFactory<OrderEvent> {//返回一個空的數據對象(OrderEvent對象實例)public OrderEvent newInstance() {return new OrderEvent();}
}
三.建立監聽事件類用于處理數據
監聽事件類就是Event處理器,處理的數據就是Event類實例對象。
public class OrderEventHandler implements EventHandler<OrderEvent> {public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {Thread.sleep(1000);System.err.println("消費者: " + event.getValue());}
}
四.創建Disruptor對象實例
public class Main {public static void main(String[] args) {//參數準備OrderEventFactory orderEventFactory = new OrderEventFactory();int ringBufferSize = 4;ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());//參數一:eventFactory,消息(Event)工廠對象//參數二:ringBufferSize,容器的長度//參數三:executor,線程池(建議使用自定義線程池),RejectedExecutionHandler//參數四:ProducerType,單生產者還是多生產者//參數五:waitStrategy,等待策略//1.實例化Disruptor對象Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>(orderEventFactory,ringBufferSize,executor,ProducerType.SINGLE,//單生產者new BlockingWaitStrategy());//2.添加Event處理器,用于處理事件//也就是構建Disruptor與消費者的一個關聯關系disruptor.handleEventsWith(new OrderEventHandler());//3.啟動disruptordisruptor.start();...}
}
五.編寫生產者組件向Disruptor容器投遞數據
public class Main {public static void main(String[] args) {//參數準備OrderEventFactory orderEventFactory = new OrderEventFactory();int ringBufferSize = 4;ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());//參數一:eventFactory,消息(Event)工廠對象//參數二:ringBufferSize,容器的長度//參數三:executor,線程池(建議使用自定義線程池),RejectedExecutionHandler//參數四:ProducerType,單生產者還是多生產者//參數五:waitStrategy,等待策略//1.實例化Disruptor對象Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>(orderEventFactory,ringBufferSize,executor,ProducerType.SINGLE,new BlockingWaitStrategy());//2.添加Event處理器,用于處理事件//也就是構建Disruptor與消費者的一個關聯關系disruptor.handleEventsWith(new OrderEventHandler());//3.啟動disruptordisruptor.start();//4.獲取實際存儲數據的容器: RingBufferRingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();OrderEventProducer producer = new OrderEventProducer(ringBuffer);ByteBuffer bb = ByteBuffer.allocate(8);for (long i = 0; i < 5; i++) {bb.putLong(0, i);//向容器中投遞數據producer.sendData(bb);}disruptor.shutdown();executor.shutdown();}
}public class OrderEventProducer {private RingBuffer<OrderEvent> ringBuffer;public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {this.ringBuffer = ringBuffer;}public void sendData(ByteBuffer data) {//1.在生產者發送消息時, 首先需要從ringBuffer里獲取一個可用的序號long sequence = ringBuffer.next();try {//2.根據這個序號, 找到具體的"OrderEvent"元素//注意:此時獲取的OrderEvent對象是一個沒有被賦值的"空對象"OrderEvent event = ringBuffer.get(sequence);//3.進行實際的賦值處理event.setValue(data.getLong(0));} finally {//4.提交發布操作ringBuffer.publish(sequence);}}
}
4.Disruptor的數據結構與生產消費模型
(1)Disruptor的核心與原理
(2)Disruptor的RingBuffer數據結構
(3)Disruptor的生產消費模型
(1)Disruptor的核心與原理
Disruptor的核心是RingBuffer,生產者向RingBuffer中寫入元素,消費者從RingBuffer中消費元素。
(2)Disruptor的RingBuffer數據結構
RingBuffer是一個首尾相接的環(數組),用于在不同上下文(線程)之間傳遞數據。
RingBuffer擁有一個序號,這個序號指向數組中下一個可用的元素。隨著生產者不停地往RingBuffer寫入元素,這個序號也會一直增長,直到這個序號繞過這個環。
要找到RingBuffer數組中當前序號指向的元素,可以通過mod操作:序號 % 數組長度 = 數組索引。建議將長度設為2的n次方,有利于二進制計算:序號 & (長度 - 1) = 索引。
(3)Disruptor的生產消費模型
一.消費快生產慢
如果消費者從RingBuffer消費元素的速度大于生產者寫入元素的速度,那么當消費者發現RingBuffer沒有元素時,就要停下等待生產者寫入元素。
二.生產快消費慢
如果生產者向RingBuffer寫入元素的速度大于消費者消費元素的速度,那么當生產者發現RingBuffer已經滿了,就要停下等待消費者消費元素。
因為RingBuffer數組的長度是有限的,生產者寫入到RingBuffer的末尾時,會從RingBuffer的開始位置繼續寫入,這時候生產者就可能會追上消費者。
5.RingBuffer + Disruptor + Sequence相關類
(1)RingBuffer類
(2)Disruptor類
(3)Sequence類
(4)Sequencer接口
(5)SequenceBarrier類
(1)RingBuffer類
RingBuffer不僅是基于數組的緩存,也是創建Sequencer與定義WaitStrategy的入口。
(2)Disruptor類
Disruptor類可認為是一個持有RingBuffer、消費者線程池、消費者集合等引用的輔助類。
(3)Sequence類
通過順序遞增的序號來編號,管理正在進行交換的數據(事件)。對數據(事件)的處理總是沿著序號逐個遞增,所以能夠實現多線程下的并發安全與原子性。
一個Sequence用于跟蹤標識某個特定的事件處理者的處理進度,也就是事件處理者在RingBuffer中的處理進度。每一個Producer和Consumer都有一個自己的Sequence。
Sequence可以看成是一個AtomicLong類型字段,用于標識進度。Sequence還可以防止不同Sequence之間CPU緩存的偽共享問題。
Sequence的兩個作用:
作用一:用于遞增標識進度
作用二:用于消除偽共享
(4)Sequencer接口
一.Sequencer包含Sequence
二.Sequencer接口有兩個實現類
第一個實現類是SingleProducerSequencer
第二個實現類是MultiProducerSequencer
(5)SequenceBarrier類
作用一:用于保持對RingBuffer的生產者和消費者之間的平衡關系,比如讓生產者或消費者進行等待、喚醒生產者或消費者
作用二:決定消費者是否還有可處理的事件
6.Disruptor的WaitStrategy消費者等待策略
(1)WaitStrategy接口的作用
(2)消費者等待策略的種類
(3)BlockingWaitStrategy
(4)SleepingWaitStrategy
(5)YieldingWaitStrategy
(1)WaitStrategy接口的作用
決定一個消費者將會如何等待生產者將Event投遞到Disruptor。
(2)消費者等待策略的種類
BlockingWaitStrategy,通過阻塞的方式進行等待
SleepingWaitStrategy,通過休眠的方式進行等待
YieldingWaitStrategy,通過線程間的切換的方式進行等待
(3)BlockingWaitStrategy
BlockingWaitStrategy是最低效的等待策略,但是對CPU的消耗最小,并且在各種不同部署環境中能提供一致的性能表現。該策略需要使用到Java中的鎖,也就是會通過ReentrantLock來阻塞消費者線程。而Disruptor本身是一個無鎖并發框架,所以如果追求高性能,就不要選擇這種策略。
(4)SleepingWaitStrategy
SleepingWaitStrategy是性能一般的等待策略,其性能表現和BlockingWaitStrategy差不多。但由于SleepingWaitStrategy是無鎖的,所以對生產者線程的影響最小。該策略對CPU的消耗一般,通過在單個線程循環 + yield切換線程實現,所以這種策略特別適合于異步日志類似的場景。
(5)YieldingWaitStrategy
YieldingWaitStrategy的性能是最好的,適合于低延遲的系統。不過該策略對CPU的消耗最高,因為完全基于yield切換線程來實現。推薦用于要求高性能且事件處理線程數小于CPU邏輯核心數的場景中,尤其是當CPU開啟了超線程特性的時候。
7.EventProcessor + EventHandler等類
(1)Event對象
(2)EventProcessor接口
(3)EventHandler接口
(4)WorkProcessor類
(1)Event對象
Disruptor中的Event指的是從生產者到消費者過程中所處理的數據對象。Disruptor中沒有代碼表示Event,它用泛型表示,完全由用戶定義。比如創建一個RingBuffer對象時,其中的泛型就表示著這個Event對象。
(2)EventProcessor接口
EventProcessor用于處理Disruptor中的Event,擁有消費者的Sequence,它有一個實現類叫BatchEventProcessor。
由于EventProcessor接口繼承自Runnable接口,所以BatchEventProcessor類會實現Runnable接口的run()方法。
其實BatchEventProcessor類是Disruptor框架中最核心的類,因為它的run()方法會不斷輪詢并獲取數據對象,然后把數據對象(Event)交給消費者去處理,也就是即回調EventHandler接口的實現類對象的onEvent()方法。
(3)EventHandler接口
EventHandler是由用戶實現的并且代表了Disruptor中的一個消費者接口,也就是消費者邏輯需要在EventHandler接口的onEvent()方法實現。
(4)WorkProcessor類
WorkProcessor類可確保每個Sequence只被一個Processor消費。注意:在單消費者模式下,使用的是EventHandler,對應于EventProcessor。在多消費者模式下,使用的是WorkHandler,對應于WorkProcessor。
8.Disruptor的運行原理圖
9.復雜業務需求下的編碼方案和框架
(1)方案選擇
(2)框架選擇
(1)方案選擇
方案一:完全解耦的模式,比如一個子業務線也開一個項目,此時重復代碼會比較多。
方案二:模版方法模式,如果業務快速迭代,可能也會需要經常重構底層的模版方法。
(2)框架選擇
一.使用有限狀態機框架
二.使用Disruptor框架
10.Disruptor的串行操作
Disruptor的串行操作,可以通過鏈式調用handleEventsWith()方法來實現。
如果使用RingBuffer對象來發布事件,那么需要先從RingBuffer對象中獲取一個可用的序號,然后根據序號獲取Event對象并對Event對象賦值,最后調用RingBuffer的publish()方法發布事件。
如果使用Disruptor對象來發布事件,那么直接調用Disruptor的publishEvent()方法發布事件即可。
此外,實際應用中不建議通過Executors來創建線程池,而應通過ThreadPoolExecutor構造函數具體指定線程池的每一個參數。因為Executors創建的線程池還是可能有安全隱患,比如Executors的newFixedThreadPool()方法使用的是無界隊列,其使用的LinkedBlockingQueue是一個可選是否有界的阻塞隊列。
//Disruptor中的Event
public class Trade {private String id;private String name;private double price;private AtomicInteger count = new AtomicInteger(0);public Trade() {}public String getId() {return id;}public void setId(String id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}public double getPrice() {return price;}public void setPrice(double price) {this.price = price;}public AtomicInteger getCount() {return count;}public void setCount(AtomicInteger count) {this.count = count;}
}public class Main {@SuppressWarnings("unchecked")public static void main(String[] args) throws Exception {//實際應用中不建議這樣創建線程池,而應通過ThreadPoolExecutor構造函數具體指定每個參數//因為這種創建的線程池還是有安全隱患,比如newFixedThreadPool()使用的是無界隊列//LinkedBlockingQueue是一個可選是否有界的阻塞隊列ExecutorService es1 = Executors.newFixedThreadPool(8);//構建一個線程池用于提交任務ExecutorService es2 = Executors.newFixedThreadPool(1);//1.構建DisruptorDisruptor<Trade> disruptor = new Disruptor<Trade>(new EventFactory<Trade>() {public Trade newInstance() {return new Trade();}},1024 * 1024,es1,ProducerType.SINGLE,new BusySpinWaitStrategy());//2.把消費者設置到Disruptor中,也就是使用Disruptor.handleEventsWith()方法//串行操作,通過鏈式編程實現disruptor.handleEventsWith(new Handler1()).handleEventsWith(new Handler2()).handleEventsWith(new Handler3());//3.啟動disruptor并獲取RingBufferRingBuffer<Trade> ringBuffer = disruptor.start();CountDownLatch latch = new CountDownLatch(1);long begin = System.currentTimeMillis();//通過線程池向Disruptor發布事件(生產數據)es2.submit(new TradePublisher(latch, disruptor));latch.await();disruptor.shutdown();es1.shutdown();es2.shutdown();System.err.println("總耗時: " + (System.currentTimeMillis() - begin));}
}public class TradePublisher implements Runnable {private static int PUBLISH_COUNT = 10;private Disruptor<Trade> disruptor;private CountDownLatch latch;public TradePublisher(CountDownLatch latch, Disruptor<Trade> disruptor) {this.disruptor = disruptor;this.latch = latch;}public void run() {TradeEventTranslator eventTranslator = new TradeEventTranslator();for (int i = 0; i < PUBLISH_COUNT; i++) {//新的發布事件的方式,另一種方式就是通過傳入的RingBuffer的publish()方法發布事件disruptor.publishEvent(eventTranslator);}latch.countDown();}
}class TradeEventTranslator implements EventTranslator<Trade> {private Random random = new Random();public void translateTo(Trade event, long sequence) {this.generateTrade(event);}private void generateTrade(Trade event) {event.setPrice(random.nextDouble() * 9999);}
}public class Handler1 implements EventHandler<Trade>, WorkHandler<Trade> {//實現EventHandler的onEvent()方法,可以監聽生產者發布的事件public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {this.onEvent(event);}//實現WorkHandler的onEvent()方法,也可以監聽生產者發布的事件public void onEvent(Trade event) throws Exception {System.err.println("handler 1 : SET NAME");Thread.sleep(1000);event.setName("H1");}
}public class Handler2 implements EventHandler<Trade> {public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {System.err.println("handler 2 : SET ID");Thread.sleep(2000);event.setId(UUID.randomUUID().toString());}
}public class Handler3 implements EventHandler<Trade> {public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {System.err.println("handler 3 : NAME: " + event.getName() + ", ID: " + event.getId() + ", PRICE: " + event.getPrice() + " INSTANCE : " + event.toString());}
}
11.Disruptor的并行操作
Disruptor的并行操作可以有兩種方式實現:方式一是調用handleEventsWith()方法時傳入多個handler對象,方式二是分別多次調用handleEventsWith()方法。
public class Main {@SuppressWarnings("unchecked")public static void main(String[] args) throws Exception {//實際應用中不建議這樣創建線程池,而應通過ThreadPoolExecutor構造函數具體指定每個參數//因為這種創建的線程池還是有安全隱患,比如newFixedThreadPool()使用的是無界隊列//LinkedBlockingQueue是一個可選是否有界的阻塞隊列ExecutorService es1 = Executors.newFixedThreadPool(8);//構建一個線程池用于提交任務ExecutorService es2 = Executors.newFixedThreadPool(1);//1.構建DisruptorDisruptor<Trade> disruptor = new Disruptor<Trade>(new EventFactory<Trade>() {public Trade newInstance() {return new Trade();}},1024 * 1024,es1,ProducerType.SINGLE,new BusySpinWaitStrategy());//2.把消費者設置到Disruptor中,也就是使用Disruptor.handleEventsWith()方法//Disruptor的并行操作可以有兩種方式實現//方式一:調用handleEventsWith方法時傳入多個handler對象disruptor.handleEventsWith(new Handler1(), new Handler2(), new Handler3());//方式二:分別多次調用handleEventsWith()方法//disruptor.handleEventsWith(new Handler1());//disruptor.handleEventsWith(new Handler2());//disruptor.handleEventsWith(new Handler3());//3.啟動disruptor并獲取RingBufferRingBuffer<Trade> ringBuffer = disruptor.start();CountDownLatch latch = new CountDownLatch(1);long begin = System.currentTimeMillis();//通過線程池向Disruptor發布事件(生產數據)es2.submit(new TradePublisher(latch, disruptor));latch.await();disruptor.shutdown();es1.shutdown();es2.shutdown();System.err.println("總耗時: " + (System.currentTimeMillis() - begin));}
}public class TradePublisher implements Runnable {private static int PUBLISH_COUNT = 10;private Disruptor<Trade> disruptor;private CountDownLatch latch;public TradePublisher(CountDownLatch latch, Disruptor<Trade> disruptor) {this.disruptor = disruptor;this.latch = latch;}public void run() {TradeEventTranslator eventTranslator = new TradeEventTranslator();for (int i = 0; i < PUBLISH_COUNT; i++) {//新的發布事件的方式,另一種方式就是通過傳入的RingBuffer的publish()方法發布事件disruptor.publishEvent(eventTranslator);}latch.countDown();}
}class TradeEventTranslator implements EventTranslator<Trade> {private Random random = new Random();public void translateTo(Trade event, long sequence) {this.generateTrade(event);}private void generateTrade(Trade event) {event.setPrice(random.nextDouble() * 9999);}
}public class Handler1 implements EventHandler<Trade>, WorkHandler<Trade> {//實現EventHandler的onEvent()方法,可以監聽生產者發布的事件public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {this.onEvent(event);}//實現WorkHandler的onEvent()方法,也可以監聽生產者發布的事件public void onEvent(Trade event) throws Exception {System.err.println("handler 1 : SET NAME");Thread.sleep(1000);event.setName("H1");}
}public class Handler2 implements EventHandler<Trade> {public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {System.err.println("handler 2 : SET ID");Thread.sleep(2000);event.setId(UUID.randomUUID().toString());}
}public class Handler3 implements EventHandler<Trade> {public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {System.err.println("handler 3 : NAME: " + event.getName() + ", ID: " + event.getId() + ", PRICE: " + event.getPrice() + " INSTANCE : " + event.toString());}
}
12.Disruptor的多邊形操作
(1)Disruptor的菱形操作
(2)Disruptor的六邊形操作
Disruptor可以實現串并行同時編碼。
(1)Disruptor的菱形操作
可以理解為先并行執行,然后再串行執行,類似于CyclicBarrier。
菱形操作方式一:調用handleEventsWith()方法時傳入多個參數 + 鏈式調用。
菱形操作方式二:調用handleEventsWith()方法時傳入多個參數 + 使用then()方法。
public class Main {@SuppressWarnings("unchecked")public static void main(String[] args) throws Exception {//實際應用中不建議這樣創建線程池,而應通過ThreadPoolExecutor構造函數具體指定每個參數//因為這種創建的線程池還是有安全隱患,比如newFixedThreadPool()使用的是無界隊列//LinkedBlockingQueue是一個可選是否有界的阻塞隊列ExecutorService es1 = Executors.newFixedThreadPool(8);//構建一個線程池用于提交任務ExecutorService es2 = Executors.newFixedThreadPool(1);//1.構建DisruptorDisruptor<Trade> disruptor = new Disruptor<Trade>(new EventFactory<Trade>() {public Trade newInstance() {return new Trade();}},1024 * 1024,es1,ProducerType.SINGLE,new BusySpinWaitStrategy());//2.把消費者設置到Disruptor中,也就是使用Disruptor.handleEventsWith()方法//菱形操作一disruptor.handleEventsWith(new Handler1(), new Handler2()).handleEventsWith(new Handler3());//菱形操作二//EventHandlerGroup<Trade> ehGroup = disruptor.handleEventsWith(new Handler1(), new Handler2());//ehGroup.then(new Handler3());//3.啟動disruptor并獲取RingBufferRingBuffer<Trade> ringBuffer = disruptor.start();CountDownLatch latch = new CountDownLatch(1);long begin = System.currentTimeMillis();//通過線程池向Disruptor發布事件(生產數據)es2.submit(new TradePublisher(latch, disruptor));latch.await();disruptor.shutdown();es1.shutdown();es2.shutdown();System.err.println("總耗時: " + (System.currentTimeMillis() - begin));}
}public class TradePublisher implements Runnable {private static int PUBLISH_COUNT = 10;private Disruptor<Trade> disruptor;private CountDownLatch latch;public TradePublisher(CountDownLatch latch, Disruptor<Trade> disruptor) {this.disruptor = disruptor;this.latch = latch;}public void run() {TradeEventTranslator eventTranslator = new TradeEventTranslator();for (int i = 0; i < PUBLISH_COUNT; i++) {//新的發布事件的方式,另一種方式就是通過傳入的RingBuffer的publish()方法發布事件disruptor.publishEvent(eventTranslator);}latch.countDown();}
}class TradeEventTranslator implements EventTranslator<Trade> {private Random random = new Random();public void translateTo(Trade event, long sequence) {this.generateTrade(event);}private void generateTrade(Trade event) {event.setPrice(random.nextDouble() * 9999);}
}public class Handler1 implements EventHandler<Trade>, WorkHandler<Trade> {//實現EventHandler的onEvent()方法,可以監聽生產者發布的事件public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {this.onEvent(event);}//實現WorkHandler的onEvent()方法,也可以監聽生產者發布的事件public void onEvent(Trade event) throws Exception {System.err.println("handler 1 : SET NAME");Thread.sleep(1000);event.setName("H1");}
}public class Handler2 implements EventHandler<Trade> {public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {System.err.println("handler 2 : SET ID");Thread.sleep(2000);event.setId(UUID.randomUUID().toString());}
}public class Handler3 implements EventHandler<Trade> {public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {System.err.println("handler 3 : NAME: " + event.getName() + ", ID: " + event.getId() + ", PRICE: " + event.getPrice() + " INSTANCE : " + event.toString());}
}
(2)Disruptor的六邊形操作
通過Disruptor的after()方法 + 菱形操作,可實現六邊形操作。
注意在單消費者模式下:一個EventHandler會對應一個BatchEventProcessor,所以如果有n個EventHandler監聽Disruptor,那么初始化Disruptor時的線程池就要有n個線程,否則可能導致多邊形操作失效。
在單消費者模式下,如果有非常多EventHandler,就需要非常多線程。此時是不合理的,所以如果有很多EventHandler,可采用多消費者模式。
public class Main {@SuppressWarnings("unchecked")public static void main(String[] args) throws Exception {//實際應用中不建議這樣創建線程池,而應通過ThreadPoolExecutor構造函數具體指定每個參數//因為這種創建的線程池還是有安全隱患,比如newFixedThreadPool()使用的是無界隊列//LinkedBlockingQueue是一個可選是否有界的阻塞隊列ExecutorService es1 = Executors.newFixedThreadPool(8);//構建一個線程池用于提交任務ExecutorService es2 = Executors.newFixedThreadPool(1);//1.構建DisruptorDisruptor<Trade> disruptor = new Disruptor<Trade>(new EventFactory<Trade>() {public Trade newInstance() {return new Trade();}},1024 * 1024,es1,ProducerType.SINGLE,new BusySpinWaitStrategy());//2.把消費者設置到Disruptor中,也就是使用Disruptor.handleEventsWith()方法//六邊形操作Handler1 h1 = new Handler1();Handler2 h2 = new Handler2();Handler3 h3 = new Handler3();Handler4 h4 = new Handler4();Handler5 h5 = new Handler5();disruptor.handleEventsWith(h1, h4);disruptor.after(h1).handleEventsWith(h2);disruptor.after(h4).handleEventsWith(h5);disruptor.after(h2, h5).handleEventsWith(h3);//3.啟動disruptor并獲取RingBufferRingBuffer<Trade> ringBuffer = disruptor.start();CountDownLatch latch = new CountDownLatch(1);long begin = System.currentTimeMillis();//通過線程池向Disruptor發布事件(生產數據)es2.submit(new TradePublisher(latch, disruptor));latch.await();disruptor.shutdown();es1.shutdown();es2.shutdown();System.err.println("總耗時: " + (System.currentTimeMillis() - begin));}
}public class TradePublisher implements Runnable {private static int PUBLISH_COUNT = 10;private Disruptor<Trade> disruptor;private CountDownLatch latch;public TradePublisher(CountDownLatch latch, Disruptor<Trade> disruptor) {this.disruptor = disruptor;this.latch = latch;}public void run() {TradeEventTranslator eventTranslator = new TradeEventTranslator();for (int i = 0; i < PUBLISH_COUNT; i++) {//新的發布事件的方式,另一種方式就是通過傳入的RingBuffer的publish()方法發布事件disruptor.publishEvent(eventTranslator);}latch.countDown();}
}class TradeEventTranslator implements EventTranslator<Trade> {private Random random = new Random();public void translateTo(Trade event, long sequence) {this.generateTrade(event);}private void generateTrade(Trade event) {event.setPrice(random.nextDouble() * 9999);}
}public class Handler1 implements EventHandler<Trade>, WorkHandler<Trade> {//實現EventHandler的onEvent()方法,可以監聽生產者發布的事件public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {this.onEvent(event);}//實現WorkHandler的onEvent()方法,也可以監聽生產者發布的事件public void onEvent(Trade event) throws Exception {System.err.println("handler 1 : SET NAME");Thread.sleep(1000);event.setName("H1");}
}public class Handler2 implements EventHandler<Trade> {public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {System.err.println("handler 2 : SET ID");Thread.sleep(2000);event.setId(UUID.randomUUID().toString());}
}public class Handler3 implements EventHandler<Trade> {public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {System.err.println("handler 3 : NAME: " + event.getName() + ", ID: " + event.getId() + ", PRICE: " + event.getPrice() + " INSTANCE : " + event.toString());}
}public class Handler4 implements EventHandler<Trade> {public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {System.err.println("handler 4 : SET PRICE");Thread.sleep(1000);event.setPrice(17.0);}
}public class Handler5 implements EventHandler<Trade> {public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {System.err.println("handler 5 : GET PRICE: " + event.getPrice());Thread.sleep(1000);event.setPrice(event.getPrice() + 3.0);}
}
13.Disruptor的多生產者和多消費者
注意一:使用多消費者模式時,每個消費者都需要實現WorkHandler接口,而不是EventHandler接口。單消費者模式,使用的是EventHandler,對應于EventProcessor。多消費者模式,使用的是WorkHandler,對應于WorkProcessor。
注意二:使用多消費者模式時,需要構建消費者工作池WorkerPool。
注意三:使用多消費者模式時,每個消費者需要一個Sequence來標記當前消費的最小序號。這樣生產者投遞消息時才能遍歷消費者的Sequence找出最小的序號,然后寫到最小的序號位置進行阻塞等待。
比如下圖中,在某一時刻:消費者1消費了序號0和2,但序號1還沒有消費完畢。消費者2消費了序號3和4,消費者3消費了序號5。此時,在RingBuffer中,雖然序號0、2、3、4、5都可以覆蓋了,但由于序號1還沒被消費,所以生產者最多只能覆蓋到序號0的位置。然后等待序號1被消費者1消費完畢后,才能繼續往RingBuffer投遞消息。
//Disruptor中的 Event
public class Order {private String id;private String name;private double price;public Order() {}public String getId() {return id;}public void setId(String id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}public double getPrice() {return price;}public void setPrice(double price) {this.price = price;}
}public class Main {public static void main(String[] args) throws InterruptedException {//1.創建RingBufferRingBuffer<Order> ringBuffer = RingBuffer.create(ProducerType.MULTI,//多生產者new EventFactory<Order>() {public Order newInstance() {return new Order();}},1024 * 1024,new YieldingWaitStrategy());//2.通過ringBuffer創建一個屏障SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();//3.創建消費者數組,每個消費者Consumer都需要實現WorkHandler接口Consumer[] consumers = new Consumer[10];for (int i = 0; i < consumers.length; i++) {consumers[i] = new Consumer("C" + i);}//4.構建多消費者工作池WorkerPool,因為多消費者模式下需要使用WorkerPoolWorkerPool<Order> workerPool = new WorkerPool<Order>(ringBuffer,sequenceBarrier,new EventExceptionHandler(),consumers);//5.設置多個消費者的sequence序號,用于單獨統計每個消費者的消費進度, 并且設置到RingBuffer中ringBuffer.addGatingSequences(workerPool.getWorkerSequences());//6.啟動workerPoolworkerPool.start(Executors.newFixedThreadPool(5));final CountDownLatch latch = new CountDownLatch(1);for (int i = 0; i < 100; i++) {final Producer producer = new Producer(ringBuffer);new Thread(new Runnable() {public void run() {try {latch.await();} catch (Exception e) {e.printStackTrace();}for (int j = 0; j < 100; j++) {producer.sendData(UUID.randomUUID().toString());}}}).start();}Thread.sleep(2000);System.err.println("----------等待線程創建完畢,才開始生產數據----------");latch.countDown();Thread.sleep(10000);System.err.println("任務總數:" + consumers[2].getCount());}static class EventExceptionHandler implements ExceptionHandler<Order> {public void handleEventException(Throwable ex, long sequence, Order event) {}public void handleOnStartException(Throwable ex) {}public void handleOnShutdownException(Throwable ex) {}}
}public class Consumer implements WorkHandler<Order> {private static AtomicInteger count = new AtomicInteger(0);private String consumerId;private Random random = new Random();public Consumer(String consumerId) {this.consumerId = consumerId;}public void onEvent(Order event) throws Exception {Thread.sleep(1 * random.nextInt(5));System.err.println("當前消費者: " + this.consumerId + ", 消費信息ID: " + event.getId());count.incrementAndGet();}public int getCount() {return count.get();}
}public class Producer {private RingBuffer<Order> ringBuffer;public Producer(RingBuffer<Order> ringBuffer) {this.ringBuffer = ringBuffer;}public void sendData(String uuid) {long sequence = ringBuffer.next();try {Order order = ringBuffer.get(sequence);order.setId(uuid);} finally {ringBuffer.publish(sequence);}}
}