Disruptor—1.原理和使用簡介

大綱

1.Disruptor簡介

2.Disruptor和BlockingQueue的壓測對比

3.Disruptor的編程模型

4.Disruptor的數據結構與生產消費模型

5.RingBuffer + Disruptor + Sequence相關類

6.Disruptor的WaitStrategy消費者等待策略

7.EventProcessor + EventHandler等類

8.Disruptor的運行原理圖

9.復雜業務需求下的編碼方案和框架

10.Disruptor的串行操作

11.Disruptor的并行操作

12.Disruptor的多邊形操作

13.Disruptor的多生產者和多消費者

1.Disruptor簡介

(1)Disruptor是什么

(2)Disruptor的特點

(3)Disruptor的核心

(1)Disruptor是什么

Martin Fowler在自己網站上寫了一篇LMAX架構的文章,在文章中他介紹了LMAX是一種新型零售金融交易平臺,能夠以很低的延遲產生大量的交易。LMAX是建立在JVM平臺上,其核心是一個業務邏輯處理器,能夠在一個線程里每秒處理6百萬訂單。LMAX業務邏輯處理器完全是運行在內存中,使用事件驅動方式,其核心是Disruptor。

(2)Disruptor的特點

大大簡化了并發程序開發的難度,性能上比Java提供的一些并發包還好。

Disruptor是一個高性能異步處理框架,實現了觀察者模式。Disruptor是無鎖的、是CPU友好的。Disruptor不會清除緩存中的數據,只會覆蓋緩存中的數據,不需要進行垃圾回收。Disruptor業務邏輯是純內存操作,使用事件驅動方式。

(3)Disruptor的核心

Disruptor核心是一個RingBuffer,RingBuffer是一個數組,沒有首尾指針。RingBuffer是一個首尾相接的環,用于在不同線程之間傳遞數據。

如果RingBuffer滿了,是繼續覆蓋還是等待消費,由生產者和消費者決定。假設RingBuffer滿了,生產者有兩個選擇:選擇一是等待RingBuffer有空位再填充,選擇二是直接覆蓋。同時消費者也有兩種選擇:選擇一是等待RingBuffer滿了再消費,選擇二是RingBuffer填充一個就消費一個。

RingBuffer有一個序號Sequence,這個序號指向數組中下一個可用元素。隨著數據不斷地填充這個數組,這個序號會一直增長,直到繞過這個環。序號指向的元素,可以通過mod計算:序號 % 長度 = 索引。建議將長度設為2的n次方,有利于二進制計算:序號 & (長度 - 1) = 索引。

Sequence通過順序遞增的序號來進行編號,以此管理正在進行交換的數據(事件)。對數據處理的過程總是沿著需要逐個遞增處理,從而實現線程安全。一個Sequence用于跟蹤標識某個特定的事件處理者的處理進度。

2.Disruptor和BlockingQueue的壓測對比

Disruptor的性能是ArrayBlockingQueue的3倍+,這里的測試代碼都是基于單線程的單生產者單消費者模式運行的。但是Disruptor本身就支持多生產者多消費者模型,測試中使用單線程明顯降低了其性能。而ArrayBlockingQueue在多生產者多消費者場景下,其性能又會比單生產者單消費者場景下更低。因此,在實際應用中,Disruptor的性能會是ArrayBlockingQueue的3倍+。

public interface Constants {int EVENT_NUM_OHM = 100000000;int EVENT_NUM_FM = 50000000;int EVENT_NUM_OM = 10000000;
}public class ArrayBlockingQueue4Test {public static void main(String[] args) {//初始化一個大小為100000000的有界隊列ArrayBlockingQueue,為了避免在測試時由于擴容影響性能,所以一開始就初始化大小為1億final ArrayBlockingQueue<Data> queue = new ArrayBlockingQueue<Data>(100000000);//開始時間final long startTime = System.currentTimeMillis();//向容器中添加元素new Thread(new Runnable() {public void run() {long i = 0;//首先把數據投遞到有界隊列ArrayBlockingQueue,單線程的生產者while (i < Constants.EVENT_NUM_OHM) {Data data = new Data(i, "c" + i);try {queue.put(data);} catch (InterruptedException e) {e.printStackTrace();}i++;}}}).start();//從容器中取出元素new Thread(new Runnable() {public void run() {int k = 0;//然后才開始消費有界隊列中的數據,單線程的消費者while (k < Constants.EVENT_NUM_OHM) {try {queue.take();} catch (InterruptedException e) {e.printStackTrace();}k++;}//結束時間long endTime = System.currentTimeMillis();//整個main函數就是單線程運行,處理1千萬數據,大概耗時3.6秒System.out.println("ArrayBlockingQueue costTime = " + (endTime - startTime) + "ms");}}).start();}
}public class DisruptorSingle4Test {public static void main(String[] args) {int ringBufferSize = 65536;final Disruptor<Data> disruptor = new Disruptor<Data>(new EventFactory<Data>() {public Data newInstance() {return new Data();}},ringBufferSize,//設置為單線程運行Executors.newSingleThreadExecutor(),//單生產者模式ProducerType.SINGLE,//new BlockingWaitStrategy()new YieldingWaitStrategy());//創建一個消費者事件處理器DataConsumer consumer = new DataConsumer();//消費數據disruptor.handleEventsWith(consumer);disruptor.start();//單線程的消費者new Thread(new Runnable() {public void run() {RingBuffer<Data> ringBuffer = disruptor.getRingBuffer();for (long i = 0; i < Constants.EVENT_NUM_OHM; i++) {long seq = ringBuffer.next();Data data = ringBuffer.get(seq);data.setId(i);data.setName("c" + i);//發布一個數據被消費的事件ringBuffer.publish(seq);}}}).start();}
}public class DataConsumer implements EventHandler<Data> {private long startTime;private int i;public DataConsumer() {this.startTime = System.currentTimeMillis();}public void onEvent(Data data, long seq, boolean bool) throws Exception {i++;if (i == Constants.EVENT_NUM_OHM) {long endTime = System.currentTimeMillis();//處理1千萬的數據,大概耗時1.1秒System.out.println("Disruptor costTime = " + (endTime - startTime) + "ms");//可見Disruptor的性能是ArrayBlockingQueue的3倍+}}
}

3.Disruptor的編程模型

(1)Disruptor的使用步驟

(2)Disruptor的使用演示

(1)Disruptor的使用步驟

步驟一:建立一個Event工廠類,用于創建數據(Event類實例對象)
步驟二:建立一個監聽事件類(Event處理器),用于處理數據(Event類實例對象)
步驟三:創建Disruptor實例,配置一系列參數
步驟四:編寫生產者組件,向Disruptor容器投遞數據

(2)Disruptor的使用演示

一.引入pom依賴

<dependency><groupId>com.lmax</groupId><artifactId>disruptor</artifactId><version>3.3.2</version>
</dependency>

二.建立Event工廠類用于創建數據

Event工廠類創建的數據就是Event類實例對象。

public class OrderEvent {//訂單的價格private long value;public long getValue() {return value;}public void setValue(long value) {this.value = value;}
}public class OrderEventFactory implements EventFactory<OrderEvent> {//返回一個空的數據對象(OrderEvent對象實例)public OrderEvent newInstance() {return new OrderEvent();}
}

三.建立監聽事件類用于處理數據

監聽事件類就是Event處理器,處理的數據就是Event類實例對象

public class OrderEventHandler implements EventHandler<OrderEvent> {public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {Thread.sleep(1000);System.err.println("消費者: " + event.getValue());}
}

四.創建Disruptor對象實例

public class Main {public static void main(String[] args) {//參數準備OrderEventFactory orderEventFactory = new OrderEventFactory();int ringBufferSize = 4;ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());//參數一:eventFactory,消息(Event)工廠對象//參數二:ringBufferSize,容器的長度//參數三:executor,線程池(建議使用自定義線程池),RejectedExecutionHandler//參數四:ProducerType,單生產者還是多生產者//參數五:waitStrategy,等待策略//1.實例化Disruptor對象Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>(orderEventFactory,ringBufferSize,executor,ProducerType.SINGLE,//單生產者new BlockingWaitStrategy());//2.添加Event處理器,用于處理事件//也就是構建Disruptor與消費者的一個關聯關系disruptor.handleEventsWith(new OrderEventHandler());//3.啟動disruptordisruptor.start();...}
}

五.編寫生產者組件向Disruptor容器投遞數據

public class Main {public static void main(String[] args) {//參數準備OrderEventFactory orderEventFactory = new OrderEventFactory();int ringBufferSize = 4;ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());//參數一:eventFactory,消息(Event)工廠對象//參數二:ringBufferSize,容器的長度//參數三:executor,線程池(建議使用自定義線程池),RejectedExecutionHandler//參數四:ProducerType,單生產者還是多生產者//參數五:waitStrategy,等待策略//1.實例化Disruptor對象Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>(orderEventFactory,ringBufferSize,executor,ProducerType.SINGLE,new BlockingWaitStrategy());//2.添加Event處理器,用于處理事件//也就是構建Disruptor與消費者的一個關聯關系disruptor.handleEventsWith(new OrderEventHandler());//3.啟動disruptordisruptor.start();//4.獲取實際存儲數據的容器: RingBufferRingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();OrderEventProducer producer = new OrderEventProducer(ringBuffer);ByteBuffer bb = ByteBuffer.allocate(8);for (long i = 0; i < 5; i++) {bb.putLong(0, i);//向容器中投遞數據producer.sendData(bb);}disruptor.shutdown();executor.shutdown();}
}public class OrderEventProducer {private RingBuffer<OrderEvent> ringBuffer;public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {this.ringBuffer = ringBuffer;}public void sendData(ByteBuffer data) {//1.在生產者發送消息時, 首先需要從ringBuffer里獲取一個可用的序號long sequence = ringBuffer.next();try {//2.根據這個序號, 找到具體的"OrderEvent"元素//注意:此時獲取的OrderEvent對象是一個沒有被賦值的"空對象"OrderEvent event = ringBuffer.get(sequence);//3.進行實際的賦值處理event.setValue(data.getLong(0));} finally {//4.提交發布操作ringBuffer.publish(sequence);}}
}

4.Disruptor的數據結構與生產消費模型

(1)Disruptor的核心與原理

(2)Disruptor的RingBuffer數據結構

(3)Disruptor的生產消費模型

(1)Disruptor的核心與原理

Disruptor的核心是RingBuffer,生產者向RingBuffer中寫入元素,消費者從RingBuffer中消費元素。

(2)Disruptor的RingBuffer數據結構

RingBuffer是一個首尾相接的環(數組),用于在不同上下文(線程)之間傳遞數據。

RingBuffer擁有一個序號,這個序號指向數組中下一個可用的元素。隨著生產者不停地往RingBuffer寫入元素,這個序號也會一直增長,直到這個序號繞過這個環。

要找到RingBuffer數組中當前序號指向的元素,可以通過mod操作:序號 % 數組長度 = 數組索引。建議將長度設為2的n次方,有利于二進制計算:序號 & (長度 - 1) = 索引。

(3)Disruptor的生產消費模型

一.消費快生產慢

如果消費者從RingBuffer消費元素的速度大于生產者寫入元素的速度,那么當消費者發現RingBuffer沒有元素時,就要停下等待生產者寫入元素。

二.生產快消費慢

如果生產者向RingBuffer寫入元素的速度大于消費者消費元素的速度,那么當生產者發現RingBuffer已經滿了,就要停下等待消費者消費元素。

因為RingBuffer數組的長度是有限的,生產者寫入到RingBuffer的末尾時,會從RingBuffer的開始位置繼續寫入,這時候生產者就可能會追上消費者。

5.RingBuffer + Disruptor + Sequence相關類

(1)RingBuffer類

(2)Disruptor類

(3)Sequence類

(4)Sequencer接口

(5)SequenceBarrier類

(1)RingBuffer類

RingBuffer不僅是基于數組的緩存,也是創建Sequencer與定義WaitStrategy的入口。

(2)Disruptor類

Disruptor類可認為是一個持有RingBuffer、消費者線程池、消費者集合等引用的輔助類。

(3)Sequence類

通過順序遞增的序號來編號,管理正在進行交換的數據(事件)。對數據(事件)的處理總是沿著序號逐個遞增,所以能夠實現多線程下的并發安全與原子性。

一個Sequence用于跟蹤標識某個特定的事件處理者的處理進度,也就是事件處理者在RingBuffer中的處理進度。每一個Producer和Consumer都有一個自己的Sequence。

Sequence可以看成是一個AtomicLong類型字段,用于標識進度。Sequence還可以防止不同Sequence之間CPU緩存的偽共享問題。

Sequence的兩個作用:

作用一:用于遞增標識進度

作用二:用于消除偽共享

(4)Sequencer接口

一.Sequencer包含Sequence

二.Sequencer接口有兩個實現類

第一個實現類是SingleProducerSequencer

第二個實現類是MultiProducerSequencer

(5)SequenceBarrier類

作用一:用于保持對RingBuffer的生產者和消費者之間的平衡關系,比如讓生產者或消費者進行等待、喚醒生產者或消費者

作用二:決定消費者是否還有可處理的事件

6.Disruptor的WaitStrategy消費者等待策略

(1)WaitStrategy接口的作用

(2)消費者等待策略的種類

(3)BlockingWaitStrategy

(4)SleepingWaitStrategy

(5)YieldingWaitStrategy

(1)WaitStrategy接口的作用

決定一個消費者將會如何等待生產者將Event投遞到Disruptor。

(2)消費者等待策略的種類

BlockingWaitStrategy,通過阻塞的方式進行等待
SleepingWaitStrategy,通過休眠的方式進行等待
YieldingWaitStrategy,通過線程間的切換的方式進行等待

(3)BlockingWaitStrategy

BlockingWaitStrategy是最低效的等待策略,但是對CPU的消耗最小,并且在各種不同部署環境中能提供一致的性能表現。該策略需要使用到Java中的鎖,也就是會通過ReentrantLock來阻塞消費者線程。而Disruptor本身是一個無鎖并發框架,所以如果追求高性能,就不要選擇這種策略。

(4)SleepingWaitStrategy

SleepingWaitStrategy是性能一般的等待策略,其性能表現和BlockingWaitStrategy差不多。但由于SleepingWaitStrategy是無鎖的,所以對生產者線程的影響最小。該策略對CPU的消耗一般,通過在單個線程循環 + yield切換線程實現,所以這種策略特別適合于異步日志類似的場景。

(5)YieldingWaitStrategy

YieldingWaitStrategy的性能是最好的,適合于低延遲的系統。不過該策略對CPU的消耗最高,因為完全基于yield切換線程來實現。推薦用于要求高性能且事件處理線程數小于CPU邏輯核心數的場景中,尤其是當CPU開啟了超線程特性的時候。

7.EventProcessor + EventHandler等類

(1)Event對象

(2)EventProcessor接口

(3)EventHandler接口

(4)WorkProcessor類

(1)Event對象

Disruptor中的Event指的是從生產者到消費者過程中所處理的數據對象。Disruptor中沒有代碼表示Event,它用泛型表示,完全由用戶定義。比如創建一個RingBuffer對象時,其中的泛型就表示著這個Event對象。

(2)EventProcessor接口

EventProcessor用于處理Disruptor中的Event,擁有消費者的Sequence,它有一個實現類叫BatchEventProcessor。

由于EventProcessor接口繼承自Runnable接口,所以BatchEventProcessor類會實現Runnable接口的run()方法。

其實BatchEventProcessor類是Disruptor框架中最核心的類,因為它的run()方法會不斷輪詢并獲取數據對象,然后把數據對象(Event)交給消費者去處理,也就是即回調EventHandler接口的實現類對象的onEvent()方法。

(3)EventHandler接口

EventHandler是由用戶實現的并且代表了Disruptor中的一個消費者接口,也就是消費者邏輯需要在EventHandler接口的onEvent()方法實現。

(4)WorkProcessor類

WorkProcessor類可確保每個Sequence只被一個Processor消費。注意:在單消費者模式下,使用的是EventHandler,對應于EventProcessor。在多消費者模式下,使用的是WorkHandler,對應于WorkProcessor。

8.Disruptor的運行原理圖

9.復雜業務需求下的編碼方案和框架

(1)方案選擇

(2)框架選擇

(1)方案選擇

方案一:完全解耦的模式,比如一個子業務線也開一個項目,此時重復代碼會比較多。

方案二:模版方法模式,如果業務快速迭代,可能也會需要經常重構底層的模版方法。

(2)框架選擇

一.使用有限狀態機框架

二.使用Disruptor框架

10.Disruptor的串行操作

Disruptor的串行操作,可以通過鏈式調用handleEventsWith()方法來實現。

如果使用RingBuffer對象來發布事件,那么需要先從RingBuffer對象中獲取一個可用的序號,然后根據序號獲取Event對象并對Event對象賦值,最后調用RingBuffer的publish()方法發布事件。

如果使用Disruptor對象來發布事件,那么直接調用Disruptor的publishEvent()方法發布事件即可。

此外,實際應用中不建議通過Executors來創建線程池,而應通過ThreadPoolExecutor構造函數具體指定線程池的每一個參數。因為Executors創建的線程池還是可能有安全隱患,比如Executors的newFixedThreadPool()方法使用的是無界隊列,其使用的LinkedBlockingQueue是一個可選是否有界的阻塞隊列。

//Disruptor中的Event
public class Trade {private String id;private String name;private double price;private AtomicInteger count = new AtomicInteger(0);public Trade() {}public String getId() {return id;}public void setId(String id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}public double getPrice() {return price;}public void setPrice(double price) {this.price = price;}public AtomicInteger getCount() {return count;}public void setCount(AtomicInteger count) {this.count = count;}
}public class Main {@SuppressWarnings("unchecked")public static void main(String[] args) throws Exception {//實際應用中不建議這樣創建線程池,而應通過ThreadPoolExecutor構造函數具體指定每個參數//因為這種創建的線程池還是有安全隱患,比如newFixedThreadPool()使用的是無界隊列//LinkedBlockingQueue是一個可選是否有界的阻塞隊列ExecutorService es1 = Executors.newFixedThreadPool(8);//構建一個線程池用于提交任務ExecutorService es2 = Executors.newFixedThreadPool(1);//1.構建DisruptorDisruptor<Trade> disruptor = new Disruptor<Trade>(new EventFactory<Trade>() {public Trade newInstance() {return new Trade();}},1024 * 1024,es1,ProducerType.SINGLE,new BusySpinWaitStrategy());//2.把消費者設置到Disruptor中,也就是使用Disruptor.handleEventsWith()方法//串行操作,通過鏈式編程實現disruptor.handleEventsWith(new Handler1()).handleEventsWith(new Handler2()).handleEventsWith(new Handler3());//3.啟動disruptor并獲取RingBufferRingBuffer<Trade> ringBuffer = disruptor.start();CountDownLatch latch = new CountDownLatch(1);long begin = System.currentTimeMillis();//通過線程池向Disruptor發布事件(生產數據)es2.submit(new TradePublisher(latch, disruptor));latch.await();disruptor.shutdown();es1.shutdown();es2.shutdown();System.err.println("總耗時: " + (System.currentTimeMillis() - begin));}
}public class TradePublisher implements Runnable {private static int PUBLISH_COUNT = 10;private Disruptor<Trade> disruptor;private CountDownLatch latch;public TradePublisher(CountDownLatch latch, Disruptor<Trade> disruptor) {this.disruptor = disruptor;this.latch = latch;}public void run() {TradeEventTranslator eventTranslator = new TradeEventTranslator();for (int i = 0; i < PUBLISH_COUNT; i++) {//新的發布事件的方式,另一種方式就是通過傳入的RingBuffer的publish()方法發布事件disruptor.publishEvent(eventTranslator);}latch.countDown();}
}class TradeEventTranslator implements EventTranslator<Trade> {private Random random = new Random();public void translateTo(Trade event, long sequence) {this.generateTrade(event);}private void generateTrade(Trade event) {event.setPrice(random.nextDouble() * 9999);}
}public class Handler1 implements EventHandler<Trade>, WorkHandler<Trade> {//實現EventHandler的onEvent()方法,可以監聽生產者發布的事件public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {this.onEvent(event);}//實現WorkHandler的onEvent()方法,也可以監聽生產者發布的事件public void onEvent(Trade event) throws Exception {System.err.println("handler 1 : SET NAME");Thread.sleep(1000);event.setName("H1");}
}public class Handler2 implements EventHandler<Trade> {public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {System.err.println("handler 2 : SET ID");Thread.sleep(2000);event.setId(UUID.randomUUID().toString());}
}public class Handler3 implements EventHandler<Trade> {public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {System.err.println("handler 3 : NAME: " + event.getName() + ", ID: " + event.getId() + ", PRICE: " + event.getPrice() + " INSTANCE : " + event.toString());}
}

11.Disruptor的并行操作

Disruptor的并行操作可以有兩種方式實現:方式一是調用handleEventsWith()方法時傳入多個handler對象,方式二是分別多次調用handleEventsWith()方法。

public class Main {@SuppressWarnings("unchecked")public static void main(String[] args) throws Exception {//實際應用中不建議這樣創建線程池,而應通過ThreadPoolExecutor構造函數具體指定每個參數//因為這種創建的線程池還是有安全隱患,比如newFixedThreadPool()使用的是無界隊列//LinkedBlockingQueue是一個可選是否有界的阻塞隊列ExecutorService es1 = Executors.newFixedThreadPool(8);//構建一個線程池用于提交任務ExecutorService es2 = Executors.newFixedThreadPool(1);//1.構建DisruptorDisruptor<Trade> disruptor = new Disruptor<Trade>(new EventFactory<Trade>() {public Trade newInstance() {return new Trade();}},1024 * 1024,es1,ProducerType.SINGLE,new BusySpinWaitStrategy());//2.把消費者設置到Disruptor中,也就是使用Disruptor.handleEventsWith()方法//Disruptor的并行操作可以有兩種方式實現//方式一:調用handleEventsWith方法時傳入多個handler對象disruptor.handleEventsWith(new Handler1(), new Handler2(), new Handler3());//方式二:分別多次調用handleEventsWith()方法//disruptor.handleEventsWith(new Handler1());//disruptor.handleEventsWith(new Handler2());//disruptor.handleEventsWith(new Handler3());//3.啟動disruptor并獲取RingBufferRingBuffer<Trade> ringBuffer = disruptor.start();CountDownLatch latch = new CountDownLatch(1);long begin = System.currentTimeMillis();//通過線程池向Disruptor發布事件(生產數據)es2.submit(new TradePublisher(latch, disruptor));latch.await();disruptor.shutdown();es1.shutdown();es2.shutdown();System.err.println("總耗時: " + (System.currentTimeMillis() - begin));}
}public class TradePublisher implements Runnable {private static int PUBLISH_COUNT = 10;private Disruptor<Trade> disruptor;private CountDownLatch latch;public TradePublisher(CountDownLatch latch, Disruptor<Trade> disruptor) {this.disruptor = disruptor;this.latch = latch;}public void run() {TradeEventTranslator eventTranslator = new TradeEventTranslator();for (int i = 0; i < PUBLISH_COUNT; i++) {//新的發布事件的方式,另一種方式就是通過傳入的RingBuffer的publish()方法發布事件disruptor.publishEvent(eventTranslator);}latch.countDown();}
}class TradeEventTranslator implements EventTranslator<Trade> {private Random random = new Random();public void translateTo(Trade event, long sequence) {this.generateTrade(event);}private void generateTrade(Trade event) {event.setPrice(random.nextDouble() * 9999);}
}public class Handler1 implements EventHandler<Trade>, WorkHandler<Trade> {//實現EventHandler的onEvent()方法,可以監聽生產者發布的事件public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {this.onEvent(event);}//實現WorkHandler的onEvent()方法,也可以監聽生產者發布的事件public void onEvent(Trade event) throws Exception {System.err.println("handler 1 : SET NAME");Thread.sleep(1000);event.setName("H1");}
}public class Handler2 implements EventHandler<Trade> {public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {System.err.println("handler 2 : SET ID");Thread.sleep(2000);event.setId(UUID.randomUUID().toString());}
}public class Handler3 implements EventHandler<Trade> {public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {System.err.println("handler 3 : NAME: " + event.getName() + ", ID: " + event.getId() + ", PRICE: " + event.getPrice() + " INSTANCE : " + event.toString());}
}

12.Disruptor的多邊形操作

(1)Disruptor的菱形操作

(2)Disruptor的六邊形操作

Disruptor可以實現串并行同時編碼。

(1)Disruptor的菱形操作

可以理解為先并行執行,然后再串行執行,類似于CyclicBarrier。

菱形操作方式一:調用handleEventsWith()方法時傳入多個參數 + 鏈式調用。

菱形操作方式二:調用handleEventsWith()方法時傳入多個參數 + 使用then()方法。

public class Main {@SuppressWarnings("unchecked")public static void main(String[] args) throws Exception {//實際應用中不建議這樣創建線程池,而應通過ThreadPoolExecutor構造函數具體指定每個參數//因為這種創建的線程池還是有安全隱患,比如newFixedThreadPool()使用的是無界隊列//LinkedBlockingQueue是一個可選是否有界的阻塞隊列ExecutorService es1 = Executors.newFixedThreadPool(8);//構建一個線程池用于提交任務ExecutorService es2 = Executors.newFixedThreadPool(1);//1.構建DisruptorDisruptor<Trade> disruptor = new Disruptor<Trade>(new EventFactory<Trade>() {public Trade newInstance() {return new Trade();}},1024 * 1024,es1,ProducerType.SINGLE,new BusySpinWaitStrategy());//2.把消費者設置到Disruptor中,也就是使用Disruptor.handleEventsWith()方法//菱形操作一disruptor.handleEventsWith(new Handler1(), new Handler2()).handleEventsWith(new Handler3());//菱形操作二//EventHandlerGroup<Trade> ehGroup = disruptor.handleEventsWith(new Handler1(), new Handler2());//ehGroup.then(new Handler3());//3.啟動disruptor并獲取RingBufferRingBuffer<Trade> ringBuffer = disruptor.start();CountDownLatch latch = new CountDownLatch(1);long begin = System.currentTimeMillis();//通過線程池向Disruptor發布事件(生產數據)es2.submit(new TradePublisher(latch, disruptor));latch.await();disruptor.shutdown();es1.shutdown();es2.shutdown();System.err.println("總耗時: " + (System.currentTimeMillis() - begin));}
}public class TradePublisher implements Runnable {private static int PUBLISH_COUNT = 10;private Disruptor<Trade> disruptor;private CountDownLatch latch;public TradePublisher(CountDownLatch latch, Disruptor<Trade> disruptor) {this.disruptor = disruptor;this.latch = latch;}public void run() {TradeEventTranslator eventTranslator = new TradeEventTranslator();for (int i = 0; i < PUBLISH_COUNT; i++) {//新的發布事件的方式,另一種方式就是通過傳入的RingBuffer的publish()方法發布事件disruptor.publishEvent(eventTranslator);}latch.countDown();}
}class TradeEventTranslator implements EventTranslator<Trade> {private Random random = new Random();public void translateTo(Trade event, long sequence) {this.generateTrade(event);}private void generateTrade(Trade event) {event.setPrice(random.nextDouble() * 9999);}
}public class Handler1 implements EventHandler<Trade>, WorkHandler<Trade> {//實現EventHandler的onEvent()方法,可以監聽生產者發布的事件public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {this.onEvent(event);}//實現WorkHandler的onEvent()方法,也可以監聽生產者發布的事件public void onEvent(Trade event) throws Exception {System.err.println("handler 1 : SET NAME");Thread.sleep(1000);event.setName("H1");}
}public class Handler2 implements EventHandler<Trade> {public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {System.err.println("handler 2 : SET ID");Thread.sleep(2000);event.setId(UUID.randomUUID().toString());}
}public class Handler3 implements EventHandler<Trade> {public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {System.err.println("handler 3 : NAME: " + event.getName() + ", ID: " + event.getId() + ", PRICE: " + event.getPrice() + " INSTANCE : " + event.toString());}
}

(2)Disruptor的六邊形操作

通過Disruptor的after()方法 + 菱形操作,可實現六邊形操作。

注意在單消費者模式下:一個EventHandler會對應一個BatchEventProcessor,所以如果有n個EventHandler監聽Disruptor,那么初始化Disruptor時的線程池就要有n個線程,否則可能導致多邊形操作失效。

在單消費者模式下,如果有非常多EventHandler,就需要非常多線程。此時是不合理的,所以如果有很多EventHandler,可采用多消費者模式。

public class Main {@SuppressWarnings("unchecked")public static void main(String[] args) throws Exception {//實際應用中不建議這樣創建線程池,而應通過ThreadPoolExecutor構造函數具體指定每個參數//因為這種創建的線程池還是有安全隱患,比如newFixedThreadPool()使用的是無界隊列//LinkedBlockingQueue是一個可選是否有界的阻塞隊列ExecutorService es1 = Executors.newFixedThreadPool(8);//構建一個線程池用于提交任務ExecutorService es2 = Executors.newFixedThreadPool(1);//1.構建DisruptorDisruptor<Trade> disruptor = new Disruptor<Trade>(new EventFactory<Trade>() {public Trade newInstance() {return new Trade();}},1024 * 1024,es1,ProducerType.SINGLE,new BusySpinWaitStrategy());//2.把消費者設置到Disruptor中,也就是使用Disruptor.handleEventsWith()方法//六邊形操作Handler1 h1 = new Handler1();Handler2 h2 = new Handler2();Handler3 h3 = new Handler3();Handler4 h4 = new Handler4();Handler5 h5 = new Handler5();disruptor.handleEventsWith(h1, h4);disruptor.after(h1).handleEventsWith(h2);disruptor.after(h4).handleEventsWith(h5);disruptor.after(h2, h5).handleEventsWith(h3);//3.啟動disruptor并獲取RingBufferRingBuffer<Trade> ringBuffer = disruptor.start();CountDownLatch latch = new CountDownLatch(1);long begin = System.currentTimeMillis();//通過線程池向Disruptor發布事件(生產數據)es2.submit(new TradePublisher(latch, disruptor));latch.await();disruptor.shutdown();es1.shutdown();es2.shutdown();System.err.println("總耗時: " + (System.currentTimeMillis() - begin));}
}public class TradePublisher implements Runnable {private static int PUBLISH_COUNT = 10;private Disruptor<Trade> disruptor;private CountDownLatch latch;public TradePublisher(CountDownLatch latch, Disruptor<Trade> disruptor) {this.disruptor = disruptor;this.latch = latch;}public void run() {TradeEventTranslator eventTranslator = new TradeEventTranslator();for (int i = 0; i < PUBLISH_COUNT; i++) {//新的發布事件的方式,另一種方式就是通過傳入的RingBuffer的publish()方法發布事件disruptor.publishEvent(eventTranslator);}latch.countDown();}
}class TradeEventTranslator implements EventTranslator<Trade> {private Random random = new Random();public void translateTo(Trade event, long sequence) {this.generateTrade(event);}private void generateTrade(Trade event) {event.setPrice(random.nextDouble() * 9999);}
}public class Handler1 implements EventHandler<Trade>, WorkHandler<Trade> {//實現EventHandler的onEvent()方法,可以監聽生產者發布的事件public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {this.onEvent(event);}//實現WorkHandler的onEvent()方法,也可以監聽生產者發布的事件public void onEvent(Trade event) throws Exception {System.err.println("handler 1 : SET NAME");Thread.sleep(1000);event.setName("H1");}
}public class Handler2 implements EventHandler<Trade> {public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {System.err.println("handler 2 : SET ID");Thread.sleep(2000);event.setId(UUID.randomUUID().toString());}
}public class Handler3 implements EventHandler<Trade> {public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {System.err.println("handler 3 : NAME: " + event.getName() + ", ID: " + event.getId() + ", PRICE: " + event.getPrice() + " INSTANCE : " + event.toString());}
}public class Handler4 implements EventHandler<Trade> {public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {System.err.println("handler 4 : SET PRICE");Thread.sleep(1000);event.setPrice(17.0);}
}public class Handler5 implements EventHandler<Trade> {public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {System.err.println("handler 5 : GET PRICE: " + event.getPrice());Thread.sleep(1000);event.setPrice(event.getPrice() + 3.0);}
}

13.Disruptor的多生產者和多消費者

注意一:使用多消費者模式時,每個消費者都需要實現WorkHandler接口,而不是EventHandler接口。單消費者模式,使用的是EventHandler,對應于EventProcessor。多消費者模式,使用的是WorkHandler,對應于WorkProcessor。

注意二:使用多消費者模式時,需要構建消費者工作池WorkerPool。

注意三:使用多消費者模式時,每個消費者需要一個Sequence來標記當前消費的最小序號。這樣生產者投遞消息時才能遍歷消費者的Sequence找出最小的序號,然后寫到最小的序號位置進行阻塞等待。

比如下圖中,在某一時刻:消費者1消費了序號0和2,但序號1還沒有消費完畢。消費者2消費了序號3和4,消費者3消費了序號5。此時,在RingBuffer中,雖然序號0、2、3、4、5都可以覆蓋了,但由于序號1還沒被消費,所以生產者最多只能覆蓋到序號0的位置。然后等待序號1被消費者1消費完畢后,才能繼續往RingBuffer投遞消息。

//Disruptor中的 Event
public class Order {private String id;private String name;private double price;public Order() {}public String getId() {return id;}public void setId(String id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}public double getPrice() {return price;}public void setPrice(double price) {this.price = price;}
}public class Main {public static void main(String[] args) throws InterruptedException {//1.創建RingBufferRingBuffer<Order> ringBuffer = RingBuffer.create(ProducerType.MULTI,//多生產者new EventFactory<Order>() {public Order newInstance() {return new Order();}},1024 * 1024,new YieldingWaitStrategy());//2.通過ringBuffer創建一個屏障SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();//3.創建消費者數組,每個消費者Consumer都需要實現WorkHandler接口Consumer[] consumers = new Consumer[10];for (int i = 0; i < consumers.length; i++) {consumers[i] = new Consumer("C" + i);}//4.構建多消費者工作池WorkerPool,因為多消費者模式下需要使用WorkerPoolWorkerPool<Order> workerPool = new WorkerPool<Order>(ringBuffer,sequenceBarrier,new EventExceptionHandler(),consumers);//5.設置多個消費者的sequence序號,用于單獨統計每個消費者的消費進度, 并且設置到RingBuffer中ringBuffer.addGatingSequences(workerPool.getWorkerSequences());//6.啟動workerPoolworkerPool.start(Executors.newFixedThreadPool(5));final CountDownLatch latch = new CountDownLatch(1);for (int i = 0; i < 100; i++) {final Producer producer = new Producer(ringBuffer);new Thread(new Runnable() {public void run() {try {latch.await();} catch (Exception e) {e.printStackTrace();}for (int j = 0; j < 100; j++) {producer.sendData(UUID.randomUUID().toString());}}}).start();}Thread.sleep(2000);System.err.println("----------等待線程創建完畢,才開始生產數據----------");latch.countDown();Thread.sleep(10000);System.err.println("任務總數:" + consumers[2].getCount());}static class EventExceptionHandler implements ExceptionHandler<Order> {public void handleEventException(Throwable ex, long sequence, Order event) {}public void handleOnStartException(Throwable ex) {}public void handleOnShutdownException(Throwable ex) {}}
}public class Consumer implements WorkHandler<Order> {private static AtomicInteger count = new AtomicInteger(0);private String consumerId;private Random random = new Random();public Consumer(String consumerId) {this.consumerId = consumerId;}public void onEvent(Order event) throws Exception {Thread.sleep(1 * random.nextInt(5));System.err.println("當前消費者: " + this.consumerId + ", 消費信息ID: " + event.getId());count.incrementAndGet();}public int getCount() {return count.get();}
}public class Producer {private RingBuffer<Order> ringBuffer;public Producer(RingBuffer<Order> ringBuffer) {this.ringBuffer = ringBuffer;}public void sendData(String uuid) {long sequence = ringBuffer.next();try {Order order = ringBuffer.get(sequence);order.setId(uuid);} finally {ringBuffer.publish(sequence);}}
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/81911.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/81911.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/81911.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

谷歌瀏覽器調試python pygui程序

谷歌瀏覽器地址:chrome://inspect/#devices 前端vue為8080, 配置如下 pychat 需要配置環境變量 PYTHONUNBUFFERED1;QTWEBENGINE_REMOTE_DEBUGGING9012

手寫Tomcat(二)—Tomcat簡化模型架構

一、 Tomcat架構 Tomcat的主要角色是 servlet容器&#xff0c;提供一個解釋器&#xff0c;能夠解析并執行JavaScript Object Notation (JON)腳本&#xff08;后更改為Servlet&#xff09;&#xff0c;并將請求傳送到指定的服務器&#xff08;如JavaBean&#xff09;。因此&…

Android 網絡全棧攻略(五)—— 從 OkHttp 攔截器來看 HTTP 協議二

上一篇我們介紹了 OkHttp 的責任鏈以及第一個內置攔截器 —— 重試與重定向攔截器。本篇我們將剩余四個攔截器的解析做完。 1、橋接攔截器 BridgeInterceptor 作為請求準備和實際發送之間的橋梁&#xff0c;自動處理 HTTP 請求頭等繁瑣工作。比如設置請求內容長度&#xff0c…

JDBC-java操作數據庫

1.基本結構&#xff1a; package com.atguigu.servlets;import java.sql.Connection; import java.sql.DriverManager; import java.sql.Statement;public class JDBCemo {public static void main(String[] args) throws Exception{String url "jdbc:mysql:///mysql&qu…

七彩喜適老化改造:讓每個空間成為長者尊嚴的守護者

隨著我國老齡化進程的加速&#xff0c;居家養老逐漸成為老年人首選的生活方式。 為了讓老年人能夠在熟悉的環境中安享晚年&#xff0c;適老化改造應運而生。 七彩喜作為居家養老服務的創新者&#xff0c;致力于通過科學設計和人性化改造&#xff0c;為老年人提供安全、舒適、…

【動態規劃】5 從一次函數出發推導斜率優化dp

背景 基于例題《任務安排》逐步推導進行斜率優化。 引入 例題&#xff1a;P2365 任務安排 考慮動態規劃。使用 d p i , j dp_{i,j} dpi,j? 表示前 i i i 個任務分了 j j j 段的最小費用。 顯然&#xff0c;有 d p i , j min ? k 1 i ? 1 ( d p i , j , d p k , j…

MySQL中實現大數據量的快速插入

一、SQL語句優化? 1. ?批量插入代替單條插入? ?單條插入會頻繁觸發事務提交和日志寫入&#xff0c;效率極低。?批量插入通過合并多條數據為一條SQL語句&#xff0c;減少網絡傳輸和SQL解析開銷。 -- 低效寫法&#xff1a;逐條插入 INSERT INTO table (col1, col2) VALUE…

C++23中std::span和std::basic_string_view可平凡復制提案解析

文章目錄 一、引言二、相關概念解釋2.1 平凡復制&#xff08;Trivially Copyable&#xff09;2.2 std::span2.3 std::basic_string_view 三、std::span和std::basic_string_view的應用場景3.1 std::span的應用場景3.2 std::basic_string_view的應用場景 四、P2251R1提案對std::…

廣東省省考備考(第十八天5.23)—言語:語句填空題(聽課后強化訓練)

錯題 解析 橫線出現在文段中間&#xff0c;需結合上下文內容進行分析。文段開篇指出逃離北上廣深的話題時而出現&#xff0c;一些人離開大城市回到小城市。隨后通過轉折詞“但”引出橫線內容&#xff0c;且結合橫線后人才傾向于向更發達的地方流動的內容&#xff0c;橫線處應體…

持續更新 ,GPT-4o 風格提示詞案例大全!附使用方式

本文匯集了各類4o風格提示詞的精選案例&#xff0c;從基礎指令到復雜任務&#xff0c;從創意寫作到專業領域&#xff0c;為您提供全方位的參考和靈感。我們將持續更新這份案例集&#xff0c;確保您始終能夠獲取最新、最有效的提示詞技巧。 讓我們一起探索如何通過精心設計的提…

創建型:建造者模式

目錄 1、核心思想 2、實現方式 2.1 模式結構 2.2 工作流程 2.3 實現案例 2.4 變體&#xff1a;鏈式建造者&#xff08;常見于多參數對象&#xff0c;無需指揮者&#xff09; 3、優缺點分析 4、適用場景 1、核心思想 目的&#xff1a;將復雜對象的構建過程與其表示分離…

力扣-長度最小的子數組

1.題目描述 2.題目鏈接 LCR 008. 長度最小的子數組 - 力扣&#xff08;LeetCode&#xff09; 3.題目分析 這道題目我們使用的也是雙指針。我們可以定義兩個指針都指向數組第一個元素&#xff0c;然后使用right指針遍歷原數組&#xff0c;計算left指針到right指針之間的所有元…

JAVA開發工具延長方案

親測穩定的延長方案與避坑指南 真的搞不懂了&#xff0c;說點專業的術語竟然成了 QINQUAN。那就直接點&#xff0c;把這個方案帶給需要的開發者。 延長工具直通車 保姆級教程 延長方案https://mp.weixin.qq.com/s/uajM2Y9Vz6TnolzcLur_bw還是讓大家看看&#xff0c;發什么會被…

SpringAI開發SSE傳輸協議的MCP Server

SpringAI 訪問地址&#xff1a;Spring AI ? Spring AI?是一個面向人工智能工程的應用框架&#xff0c;由Spring團隊推出&#xff0c;旨在將AI能力集成到Java應用中。Spring AI的核心是解決AI集成的根本挑戰&#xff0c;即將企業數據和API與AI模型連接起來?。 MCP…

JAVA動態生成類

在java的加載過程一般都是要預先定義java類,然后通過經過加載->連接->初始化三步。連接過程又可分為三步:驗證->準備->解析。初始化的類是不允許修改。但是在日常的工作中有時候需要動態生成類,那第這種情況怎么辦呢? 可以這么處理: 1、先定義一個空的類,僅…

深入解析Java微服務架構:Spring Boot與Spring Cloud的整合實踐

深入解析Java微服務架構&#xff1a;Spring Boot與Spring Cloud的整合實踐 引言 隨著云計算和分布式系統的快速發展&#xff0c;微服務架構已成為現代軟件開發的主流模式。Java作為企業級應用開發的核心語言&#xff0c;結合Spring Boot和Spring Cloud&#xff0c;為開發者提…

03_基礎篇-NumPy(下):深度學習中的常用操作

03_基礎篇-NumPy&#xff08;下&#xff09;&#xff1a;深度學習中的常用操作 通過上節課的學習&#xff0c;我們已經對NumPy數組有了一定的了解&#xff0c;正所謂實踐出真知&#xff0c;今天我們就以一個圖像分類的項目為例&#xff0c;看看NumPy的在實際項目中都有哪些重要…

時鐘識別項目報告(深度學習、計算機視覺)

深度學習方式 一、模型架構 本模型采用雙任務學習框架&#xff0c;基于經典殘差網絡實現時鐘圖像的小時和分鐘同步識別。 主干網絡 使用預訓練的ResNet18作為特征提取器&#xff0c;移除原分類層&#xff08;fc層&#xff09;&#xff0c;保留全局平均池化后的512維特征向量。…

openai-whisper-asr-webservice接入dify

openai-whisper-asr-webservice提供的asr的api其實并不兼容openai的api&#xff0c;所以在dify中是不能直接添加到語音轉文字的模型中&#xff0c;對比了下兩個api的傳參情況&#xff0c;其實只要改動一處&#xff0c;就能支持&#xff1a; openai兼容的asr調用中formdata中音頻…

解鎖MySQL性能調優:高級SQL技巧實戰指南

高級SQL技巧&#xff1a;解鎖MySQL性能調優的終極指南 開篇 當前&#xff0c;隨著業務系統的復雜化和數據量的爆炸式增長&#xff0c;數據庫性能調優成為了技術人員面臨的核心挑戰之一。尤其是在高并發、大數據量的場景下&#xff0c;SQL 查詢的性能直接影響到整個系統的響應…