如此狂妄,自稱高性能隊列的Disruptor有啥來頭?
併發框架Disruptor
1. Disruptor概述
1.1 背景
Disruptor是英國外匯交易公司LMAX開發的一個高性能隊列,研發的初衷是解決內存隊列的延遲問題(在性能測試中發現竟然與I/O操作處於同樣的數量級),基於Disruptor開發的系統單線程能支撐每秒600萬訂單,2010年在QCon演講後,獲得了業界關注,2011年,企業應用軟件專家Martin Fowler專門撰寫長文介紹。同年它還獲得了Oracle官方的Duke大獎。
目前,包括Apache Storm、Camel、Log4j 2在內的很多知名項目都應用了Disruptor以獲取高性能。
需要特別指出的是,這裏所説的隊列是系統內部的內存隊列,而不是Kafka這樣的分佈式隊列。
有界無鎖 高併發隊列
1.2 什麼是Disruptor
Disruptor是用於一個JVM中多個線程之間的消息隊列,作用與ArrayBlockingQueue有相似之處,但是Disruptor從功能、性能都遠好於ArrayBlockingQueue,當多個線程之間傳遞大量數據或對性能要求較高時,可以考慮使用Disruptor作為ArrayBlockingQueue的替代者。
官方也對Disruptor和ArrayBlockingQueue的性能在不同的應用場景下做了對比,目測性能只有有5~10倍左右的提升。
1.3 為什麼使用Disruptor
傳統阻塞的隊列使用鎖保證線程安全,而鎖通過操作系統內核上下文切換實現,會暫停線程去等待鎖,直到鎖釋放。
執行這樣的上下文切換,會丟失之前保存的數據和指令。由於消費者和生產者之間的速度差異,隊列總是接近滿或者空的狀態,這種狀態會導致高水平的寫入爭用。
1.3.1 傳統隊列問題
首先這裏説的隊列也僅限於Java內部的消息隊列
隊列 | 有界性 | 鎖 | 結構 | 隊列類型 |
---|---|---|---|---|
ArrayBlockingQueue | 有界 | 加鎖 | 數組 | 阻塞 |
LinkedBlockingQueue | 可選 | 加鎖 | 鏈表 | 阻塞 |
ConcurrentLinkedQueue | 無界 | 無鎖 | 鏈表 | 非阻塞 |
LinkedTransferQueue | 無界 | 無鎖 | 鏈表 | 阻塞 |
PriorityBlockingQueue | 無界 | 加鎖 | 堆 | 阻塞 |
DelayQueue | 無界 | 加鎖 | 堆 | 阻塞 |
1.3.2 Disruptor應用場景
參考使用到disruptor的一些框架.
1.3.2.1 log4j2
Log4j2異步日誌使用到了disruptor, 日誌一般是有緩衝區, 滿了才寫到文件, 增量追加文件結合NIO等應該也比較快, 所以無論是EventHandler還是WorkHandler處理應該延遲比較小的, 寫的文件也不多, 所以場景是比較合適的。
1.3.2.2 Jstorm
在流處理中不同線程中數據交換,數據計算可能蠻多內存中計算, 流計算快進快出,disruptor應該不錯的選擇。
1.3.2.3 百度uid-generator
部分使用Ring buffer和去偽共享等思路緩存已生成的uid, 應該也部分參考了disruptor吧。
1.4 Disruptor 的核心概念
先從瞭解 Disruptor 的核心概念開始,來了解它是如何運作的。下面介紹的概念模型,既是領域對象,也是映射到代碼實現上的核心對象。
1.4.1 Ring Buffer
Disruptor中的數據結構,用於存儲生產者生產的數據
如其名,環形的緩衝區。曾經 RingBuffer 是 Disruptor 中的最主要的對象,但從3.0版本開始,其職責被簡化為僅僅負責對通過 Disruptor 進行交換的數據(事件)進行存儲和更新。在一些更高級的應用場景中,Ring Buffer 可以由用户的自定義實現來完全替代。
1.4.2 Sequence
序號,在Disruptor框架中,任何地方都有序號
生產者生產的數據放在RingBuffer中的哪個位置,消費者應該消費哪個位置的數據,RingBuffer中的某個位置的數據是什麼,這些都是由這個序號來決定的。這個序號可以簡單的理解為一個AtomicLong類型的變量。其使用了padding的方法去消除緩存的偽共享問題。
1.4.3 Sequencer
序號生成器,這個類主要是用來協調生產者的
在生產者生產數據的時候,Sequencer會產生一個可用的序號(Sequence),然後生產者就就知道數據放在環形隊列的那個位置了。
Sequencer是Disruptor的真正核心,此接口有兩個實現類 SingleProducerSequencer、MultiProducerSequencer ,它們定義在生產者和消費者之間快速、正確地傳遞數據的併發算法。
1.4.4 Sequence Barrier
序號屏障
我們都知道,消費者在消費數據的時候,需要知道消費哪個位置的數據。消費者總不能自己想取哪個數據消費,就取哪個數據消費吧。這個SequencerBarrier起到的就是這樣一個“柵欄”般的阻隔作用。你消費者想消費數據,得,我吿訴你一個序號(Sequence),你去消費那個位置上的數據。要是沒有數據,就好好等着吧
1.4.5 Wait Strategy
Wait Strategy決定了一個消費者怎麼等待生產者將事件(Event)放入Disruptor中。
設想一種這樣的情景:生產者生產的非常慢,而消費者消費的非常快。那麼必然會出現數據不夠的情況,這個時候消費者怎麼進行等待呢?WaitStrategy就是為了解決問題而誕生的。
1.4.6 Event
從生產者到消費者傳遞的數據叫做Event。它不是一個被 Disruptor 定義的特定類型,而是由 Disruptor 的使用者定義並指定。
1.4.7 EventHandler
Disruptor 定義的事件處理接口,由用户實現,用於處理事件,是 Consumer 的真正實現。
1.4.8 Producer
即生產者,只是泛指調用 Disruptor 發佈事件的用户代碼,Disruptor 沒有定義特定接口或類型。
1.5 Disruptor特性
Disruptor其實就像一個隊列一樣,用於在不同的線程之間遷移數據,但是Disruptor也實現了一些其他隊列沒有的特性,如:
- 同一個“事件”可以有多個消費者,消費者之間既可以並行處理,也可以相互依賴形成處理的先後次序(形成一個依賴圖);
- 預分配用於存儲事件內容的內存空間;
- 針對極高的性能目標而實現的極度優化和無鎖的設計;
2. Disruptor入門
我們使用一個簡單的例子來體驗一下Disruptor,生產者會傳遞一個long類型的值到消費者,消費者接受到這個值後會打印出這個值。
2.1 添加依賴
<dependency> <groupId>com.lmax</groupId> <artifactId>disruptor</artifactId> <version>3.4.2</version> </dependency>
2.2 Disruptor API
Disruptor 的 API 十分簡單,主要有以下幾個步驟
2.2.1 定義事件
首先創建一個 LongEvent
類,這個類將會被放入環形隊列中作為消息內容。
事件(Event)就是通過 Disruptor 進行交換的數據類型。
public class LongEvent { private long value; public void set(long value) { this.value = value; } public long getValue() { return value; } }
2.2.2 定義事件工廠
為了使用Disruptor的內存預分配event,我們需要定義一個EventFactory
事件工廠(Event Factory)定義瞭如何實例化前面第1步中定義的事件(Event),需要實現接口 com.lmax.disruptor.EventFactory\<T\>。
Disruptor 通過 EventFactory 在 RingBuffer 中預創建 Event 的實例。
一個 Event 實例實際上被用作一個“數據槽”,發佈者發佈前,先從 RingBuffer 獲得一個 Event 的實例,然後往 Event 實例中填充數據,之後再發布到 RingBuffer 中,之後由 Consumer 獲得該 Event 實例並從中讀取數據。
public class LongEventFactory implements EventFactory<LongEvent> { public LongEvent newInstance() { return new LongEvent(); } }
2.2.3 定義事件處理的具體實現
為了讓消費者處理這些事件,所以我們這裏定義一個事件處理器,負責打印event
通過實現接口 com.lmax.disruptor.EventHandler\<T\> 定義事件處理的具體實現。
public class LongEventHandler implements EventHandler<LongEvent> { public void onEvent(LongEvent event, long sequence, boolean endOfBatch) { //CommonUtils.accumulation(); System.out.println("consumer:" + Thread.currentThread().getName() + " Event: value=" + event.getValue() + ",sequence=" + sequence); } }
2.2.4 指定等待策略
Disruptor 定義了 com.lmax.disruptor.WaitStrategy 接口用於抽象 Consumer 如何等待新事件,這是策略模式的應用
WaitStrategy YIELDING_WAIT = new YieldingWaitStrategy();
2.2.5 啟動 Disruptor
注意ringBufferSize的大小必須是2的N次方
// 指定事件工廠 LongEventFactory factory = new LongEventFactory(); // 指定 ring buffer字節大小, 必須是2的N次方 int bufferSize = 1024; //單線程模式,獲取額外的性能 Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory, bufferSize, Executors.defaultThreadFactory(), ProducerType.SINGLE, new YieldingWaitStrategy()); //設置事件業務處理器---消費者 disruptor.handleEventsWith(new LongEventHandler()); //啟動disruptor線程 disruptor.start();
2.2.6 使用Translators發佈事件
在Disruptor的3.0版本中,由於加入了豐富的Lambda風格的API,可以用來幫組開發人員簡化流程。所以在3.0版本後首選使用Event Publisher/Event Translator來發布事件。
public class LongEventProducerWithTranslator { private final RingBuffer<LongEvent> ringBuffer; public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer) { this.ringBuffer = ringBuffer; } private static final EventTranslatorOneArg<LongEvent, Long> TRANSLATOR = new EventTranslatorOneArg<LongEvent, Long>() { public void translateTo(LongEvent event, long sequence, Long data) { event.set(data); } }; public void onData(Long data) { ringBuffer.publishEvent(TRANSLATOR, data); } }
2.2.7 關閉 Disruptor
disruptor.shutdown();//關閉 disruptor,方法會堵塞,直至所有的事件都得到處理
2.3 代碼整合
2.3.1 LongEventMain
消費者-生產者啟動類,其依靠構造Disruptor對象,調用start()方法完成啟動線程。Disruptor 需要ringbuffer環,消費者數據處理工廠,WaitStrategy等
- ByteBuffer 類字節buffer,用於包裝消息。
- ProducerType.SINGLE為單線程 ,可以提高性能
public class LongEventMain { public static void main(String[] args) { // 指定事件工廠 LongEventFactory factory = new LongEventFactory(); // 指定 ring buffer字節大小, 必須是2的N次方 int bufferSize = 1024; //單線程模式,獲取額外的性能 Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory, bufferSize, Executors.defaultThreadFactory(), ProducerType.SINGLE, new YieldingWaitStrategy()); //設置事件業務處理器---消費者 disruptor.handleEventsWith(new LongEventHandler()); //啟動disruptor線程 disruptor.start(); // 獲取 ring buffer環,用於接取生產者生產的事件 RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); //為 ring buffer指定事件生產者 LongEventProducerWithTranslator producer = new LongEventProducerWithTranslator(ringBuffer); //循環遍歷 for (int i = 0; i < 100; i++) { //獲取一個隨機數 long value = (long) ((Math.random() * 1000000) + 1); //發佈數據 producer.onData(value); } //停止disruptor線程 disruptor.shutdown(); } }
2.3.2 運行測試
測試結果
consumer:pool-1-thread-1 Event: value=579797,sequence=0 consumer:pool-1-thread-1 Event: value=974942,sequence=1 consumer:pool-1-thread-1 Event: value=978977,sequence=2 consumer:pool-1-thread-1 Event: value=398080,sequence=3 consumer:pool-1-thread-1 Event: value=867251,sequence=4 consumer:pool-1-thread-1 Event: value=796707,sequence=5 consumer:pool-1-thread-1 Event: value=786555,sequence=6 consumer:pool-1-thread-1 Event: value=182193,sequence=7 .....
Event: value = 為消費者接收到的數據,sequence為數據在ringbuffer環的位置。
如果本文對您有幫助,歡迎 關注
和 點贊
`,您的支持是我堅持創作的動力。
轉載請註明出處!
- 天翼雲全場景業務無縫替換至國產原生操作系統CTyunOS!
- 以羊了個羊為例,淺談小程序抓包與響應報文修改
- 這幾種常見的 JVM 調優場景,你知道嗎?
- 如此狂妄,自稱高性能隊列的Disruptor有啥來頭?
- 為什麼要學習GoF設計模式?
- 827. 最大人工島 : 簡單「並查集 枚舉」運用題
- 手把手教你如何使用 Timestream 實現物聯網時序數據存儲和分析
- 850. 矩形面積 II : 掃描線模板題
- Java 併發編程解析 | 基於JDK源碼解析Java領域中的併發鎖,我們可以從中學習到什麼內容?
- 【手把手】光説不練假把式,這篇全鏈路壓測實踐探索
- 大廠鍾愛的全鏈路壓測有什麼意義?四種壓測方案詳細對比分析
- 寫個續集,填坑來了!關於“Thread.sleep(0)這一行‘看似無用’的代碼”裏面留下的坑。
- 857. 僱傭 K 名工人的最低成本 : 枚舉 優先隊列(堆)運用題
- Vue3 實現一個自定義toast(小彈窗)
- 669. 修剪二叉搜索樹 : 常規樹的遍歷與二叉樹性質
- 讀完 RocketMQ 源碼,我學會了如何優雅的創建線程
- 性能調優——小小的log大大的坑
- 1582. 二進制矩陣中的特殊位置 : 簡單模擬題
- elementui源碼學習之仿寫一個el-switch
- 646. 最長數對鏈 : 常規貪心 DP 運用題