尼恩說在前面
在40歲老架構師 尼恩的讀者交流群(50+)中,最近有小伙伴拿到了一線互聯網企業如得物、阿里、滴滴、極兔、有贊、shein 希音、百度、網易的面試資格,遇到很多很重要的面試題:
Disruptor 官方說能達到每秒600w OPS訂單處理能力,怎么實現的?
Disruptor 什么情況下發生消費數據丟失? 該怎么解決?
小伙伴 沒有回答好,導致面試掛了。
Disruptor 是隊列之王,相關面試題也是一個非常常見的面試題,考察的是高性能的基本功。
如何才能回答得很漂亮,才能 讓面試官刮目相看、口水直流呢?
這里,尼恩給大家做一下系統化、體系化的梳理,讓面試官愛到 “不能自已、口水直流”,然后幫大家 實現 ”offer自由”。
當然,這道面試題,以及參考答案,也會收入咱們的 《尼恩Java面試寶典》V175版本PDF集群,供后面的小伙伴參考,提升大家的 3高 架構、設計、開發水平。
注:本文以 PDF 持續更新,最新尼恩 架構筆記、面試題 的PDF文件,請關注本公眾號【技術自由圈】獲取。
隊列之王 Disruptor 簡介
Disruptor是英國外匯交易公司LMAX開發的一個高性能隊列,研發的初衷是解決內存隊列的延遲問題(在性能測試中發現竟然與I/O操作處于同樣的數量級)。
基于Disruptor開發的系統單線程能支撐每秒600萬訂單,2010年在QCon演講后,獲得了業界關注。
2011年,企業應用軟件專家Martin Fowler專門撰寫長文介紹Disruptor。
2011年,Disruptor還獲得了Oracle官方的Duke大獎。
目前,包括Apache Storm、Camel、Log4j 2在內的很多知名項目都應用了Disruptor以獲取高性能。
Disruptor通過以下設計來,來實現 單線程能支撐每秒600萬訂單的問題:
-
核心架構1:無鎖架構
生產和消費,都是無鎖架構。具體來說,生產者位點/消費者位點的操作,都是無鎖操作,或者使用輕量級CAS原子操作。無鎖架構好處是,既沒有鎖的競爭, 也沒有線程的內核態、用戶態切換的開銷。 關于內核態、用戶態的原理請參見尼恩的葵花寶典。
-
核心架構2:環形數組架構
數組元素不會被回收,避免頻繁的GC,所以,為了避免垃圾回收,采用數組而非鏈表。
同時,數組對處理器的緩存機制更加友好。
數組長度2^n,通過位運算,加快定位的速度。
下標采取遞增的形式。不用擔心index溢出的問題。
index是long類型,即使100萬QPS的處理速度,也需要30萬年才能用完。
-
核心架構3:cache line padding
兩個維度的CPU 緩存行加速,享受到 CPU Cache 那風馳電掣的速度帶來的紅利:
第一維度: 對 位點等核心組件進行 CPU cache line padding,實現高并發訪問(修改和取值)。
第二個維度: ringbuffer 是一個數據,加載的時候一般也會塞滿整個 CPU cache line。也就是說 從內存加載數據到 CPU Cache 里面的時候, 如果是加載數組里面的數據(如 Disruptor),那么 CPU 就會加載到數組里面連續的多個數據。
所以,Disruptor 數組的遍歷、還是位點的增長, 很容易享受到 CPU Cache 那風馳電掣的速度帶來的紅利。
SpringBoot + Disruptor 使用實戰
有關 Disruptor的 簡單實戰,請參見 尼恩的 《Disruptor 學習圣經 V3》, 由于過于簡單,這里不做啰嗦。
下面,來看一個SpringBoot + Disruptor的 使用實戰
使用 Disruptor 實現一個生產消費模型步驟是:
-
準備好簡單的一個springboot應用
-
定義事件(Event) : 你可以把 Event 理解為存放在隊列中等待消費的消息對象。
-
創建事件工廠 :事件工廠用于生產事件,我們在初始化 Disruptor 類的時候需要用到。
-
創建處理事件的 Handler :Event 在對應的 Handler 中被處理,你可以將其理解為生產消費者模型中的消費者。
-
創建并裝配 Disruptor : 事件的生產和消費需要用到Disruptor 對象。
-
定義生產者,并使用生產者發消息
-
對簡單的SpringBoot + Disruptor 進行擴容,實現 容量監控預警+ 動態擴容
定義一個Event和工廠
首先定義一個Event來包含需要傳遞的數據:
由于需要讓Disruptor為我們創建事件,我們同時還聲明了一個EventFactory來創建Event對象。
public class LongEventFactory implements EventFactory { @Override public Object newInstance() { return new LongEvent(); }
}
定義
事件處理器(消費者)
我們還需要一個事件消費者,也就是一個事件處理器。
這個例子中,事件處理器的工作,就是簡單地把事件中存儲的數據打印到終端:
/** * 類似于消費者* disruptor會回調此處理器的方法*/static class LongEventHandler implements EventHandler<LongEvent> {@Overridepublic void onEvent(LongEvent longEvent, long l, boolean b) throws Exception {System.out.println(longEvent.getValue());}}
disruptor會回調此處理器的方法
定義事件源(生產者)
事件都會有一個生成事件的源,類似于 生產者的角色.
注意,這是一個 600wqps 能力的 異步生產者。 這里定義兩個版本:
生產者的角色的接口定義如下
入門級:一個簡單 DisruptorProducer 生產者的定義和使用
定義一個簡單 DisruptorProducer 生產者
大致的代碼如下
package com.crazymaker.cloud.disruptor.demo.business.impl;@Slf4j
public class DisruptorProducer implements AsyncProducer {//一個translator可以看做一個事件初始化器,publicEvent方法會調用它//填充Eventprivate static final EventTranslatorOneArg<LongEvent, Long> TRANSLATOR =new EventTranslatorOneArg<LongEvent, Long>() {public void translateTo(LongEvent event, long sequence, Long data) {event.setValue(data);}};private final RingBuffer<LongEvent> ringBuffer;public DisruptorProducer() {this.ringBuffer = disruptor().getRingBuffer();}public void publishData(Long data) {log.info("生產一個數據:" + data + " | ringBuffer.remainingCapacity()= " + ringBuffer.remainingCapacity());ringBuffer.publishEvent(TRANSLATOR, data);}private Disruptor<LongEvent> disruptor() {ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("DisruptorThreadPool").build();LongEventFactory eventFactory = new LongEventFactory();int bufferSize = 1024;Disruptor<LongEvent> disruptor = new Disruptor<>(eventFactory, bufferSize, namedThreadFactory,ProducerType.MULTI, new BlockingWaitStrategy());// 連接 消費者 處理器 ,兩個消費者LongEventWorkHandler1 handler1 = new LongEventWorkHandler1();LongEventWorkHandler2 handler2 = new LongEventWorkHandler2();disruptor.handleEventsWith(handler1, handler2);//為消費者配置異常處理器disruptor.handleExceptionsFor(handler1).with(exceptionHandler);disruptor.handleExceptionsFor(handler2).with(exceptionHandler);// 開啟 分裂者(事件分發)disruptor.start();return disruptor;}ExceptionHandler exceptionHandler =...//省略非核心代碼 異常處理器實現
}
上面的代碼,通過 disruptor() 方法創建和裝配 一個Disruptor對象 ,Disruptor 里邊有一個環形隊列。然后 disruptor() 方法給 Disruptor對象設置消費者,并且為消費者設置異常處理器。
使用這一個簡單 DisruptorProducer 生產者
定義一個配置類,用于實例化 生產者
定義controller, 注入這個 生產者,就可以異步發布數據 給消費者了
springboot應用啟動之后, 可以通過 httpclient 工具,測試一下:
看一下測試數據
具體的代碼和,演示過程,后面參考尼恩錄制和發布《尼恩Java面試寶典》配套視頻。
Disruptor:消費數據丟失問題的分析與解決
在處理高并發、大數據量等場景時,Disruptor雖然其高性能、低延遲,然而,在使用過程中,一些用戶可能會遇到消費數據丟失問題。
為了解決這些問題,我們需要深入了解Disruptor的工作原理,并采取相應的解決方案。
消費數據丟失問題的根因
消費線程丟失問題通常發生在消費者處理速度跟不上生產者的時候。
由于Disruptor采用環形隊列來存儲數據,當隊列滿時,新的數據會覆蓋舊的數據。
Disruptor 中,生產和消費的index是long類型,即使100萬QPS的處理速度,也需要30萬年才能用完。
生產和消費的index 下標采取遞增的形式。不用擔心index溢出的問題。
生產和消費的index 是通過 取模, 映射到 ring 環形數據的。
如果消費者速度慢, 生產者快,消費跟不上,生產的index(/Sequence)就會越來越大,取模的時候,又會從0開始,去覆蓋ring前面的數據,從而導致沒有消費的數據被丟失。
從上圖可以看到,只要生產者 的Sequence 大于消費者 一個ring 的數量, 就開始 覆蓋舊的數據,也就是開始丟失數據。
消費數據丟失問題解決方案:
-
增加消費者數量:增加消費者線程的數量,可以并行處理更多的數據,提高消費速度。
同時,合理配置消費者與生產者的數量比例,確保隊列生產者 的Sequence 不會大于消費者 一個ring 的數量。
-
增加ring環形數組的大小:通過增加數組的大小,從而保證一個環可以存放足夠多的數據,但這個可能會導致OOM。
-
剩余容量監控與告警:
通過Prometheus 對 remainingCapacity剩余容量 進行實時監控,當remainingCapacity 超過80%(閾值)及時發出告警通知相關人員處理,進行微服務實例的 HPA 橫向擴容,或者進行 Disruptor 隊列進行動態擴容。 -
Disruptor 動態擴容
對 Disruptor 框架進行合理封裝,從單個Disruptor 隊列模式,變成 ringbuffer 數組的形式,并且可以結合nacos 或者 Zookeeper 這種發布訂閱組件, 對 ringbuffer 數組 進行在線擴容。
總之,通過增加消費者數量、增加ring環形數組的大小、剩余容量監控與告警, Disruptor 動態擴容等方式,可以有效解決 消費數據丟失問題。
高級版:Spring Boot + Prometheus 監控剩余容量 大小
我們的微服務項目中使用了 spring boot,集成 prometheus。
我們可以通過將remainingCapacity 作為指標暴露到 prometheus 中,通過如下代碼:
增加這個代碼之后,請求 /actuator/prometheus
之后,可以看到對應的返回:
這樣,當這個值低于20%,我們就認為這個剩余空間不夠,可以擴容了。
Disruptor 如何 動態擴容?
關于 Disruptor 動態擴容的方案,可以實現一個可以擴容的子類
定義一個環形隊列的數據
private Disruptor<LongEvent>[] executors;
在構造函數中,初始化為1:
public ResizableDisruptorProducer() {executors = new Disruptor[1];executors[0] = disruptor();this.ringBuffer = executors[0].getRingBuffer();}
發布事件的時候,通過取模的方式,確定使用executors 數組 其中的一個RingBuffer
在next方法,執行indx 取模和 獲取 ringbuffer 的操作
這里參考了netty源碼里邊 PowerOfTwoEventExecutorChooser 反應器選擇的方式,使用位運算取模,從而實現高性能取模。
什么時候擴容呢? 當監控發現超過80%的閾值后,運維會收到告警,然后可以通過naocos、Zookeeper的發布訂閱, 通知微服務進行擴容。
微服務 擴容要回調下面的risize方法
具體的代碼和,演示過程,后面參考尼恩錄制和發布《尼恩Java面試寶典》配套視頻。
在使用Disruptor框架時,需要根據實際情況選擇合適的監控和擴容解決方案,并不斷優化和調整系統配置,以滿足日益增長的業務需求。
說在最后
以上的內容,如果大家能對答如流,如數家珍,基本上 面試官會被你 震驚到、吸引到。最終,讓面試官愛到 “不能自已、口水直流”。offer, 也就來了。
在面試之前,建議大家系統化的刷一波 5000頁《尼恩Java面試寶典》V174,在刷題過程中,如果有啥問題,大家可以來 找 40歲老架構師尼恩交流。
另外,如果沒有面試機會,可以找尼恩來幫扶、領路。
尼恩已經指導了大量的就業困難的小伙伴上岸,前段時間,幫助一個40歲+就業困難小伙伴拿到了一個年薪100W的offer,小伙伴實現了 逆天改命 。
尼恩技術圣經系列PDF
- 《NIO圣經:一次穿透NIO、Selector、Epoll底層原理》
- 《Docker圣經:大白話說Docker底層原理,6W字實現Docker自由》
- 《K8S學習圣經:大白話說K8S底層原理,14W字實現K8S自由》
- 《SpringCloud Alibaba 學習圣經,10萬字實現SpringCloud 自由》
- 《大數據HBase學習圣經:一本書實現HBase學習自由》
- 《大數據Flink學習圣經:一本書實現大數據Flink自由》
- 《響應式圣經:10W字,實現Spring響應式編程自由》
- 《Go學習圣經:Go語言實現高并發CRUD業務開發》
……完整版尼恩技術圣經PDF集群,請找尼恩領取
《尼恩 架構筆記》《尼恩高并發三部曲》《尼恩Java面試寶典》PDF,請到下面公號【技術自由圈】取↓↓↓