如此狂妄,自稱高性能隊列的Disruptor有啥來頭?

語言: CN / TW / HK

併發框架Disruptor

1. Disruptor概述

1.1 背景

​ Disruptor是英國外匯交易公司LMAX開發的一個高性能隊列,研發的初衷是解決內存隊列的延遲問題(在性能測試中發現竟然與I/O操作處於同樣的數量級),基於Disruptor開發的系統單線程能支撐每秒600萬訂單,2010年在QCon演講後,獲得了業界關注,2011年,企業應用軟件專家Martin Fowler專門撰寫長文介紹。同年它還獲得了Oracle官方的Duke大獎。

​ 目前,包括Apache Storm、Camel、Log4j 2在內的很多知名項目都應用了Disruptor以獲取高性能。

​ 需要特別指出的是,這裏所説的隊列是系統內部的內存隊列,而不是Kafka這樣的分佈式隊列。

有界無鎖 高併發隊列

1.2 什麼是Disruptor

​ Disruptor是用於一個JVM中多個線程之間的消息隊列,作用與ArrayBlockingQueue有相似之處,但是Disruptor從功能、性能都遠好於ArrayBlockingQueue,當多個線程之間傳遞大量數據或對性能要求較高時,可以考慮使用Disruptor作為ArrayBlockingQueue的替代者。

​ 官方也對Disruptor和ArrayBlockingQueue的性能在不同的應用場景下做了對比,目測性能只有有5~10倍左右的提升。

1.3 為什麼使用Disruptor

​ 傳統阻塞的隊列使用鎖保證線程安全,而鎖通過操作系統內核上下文切換實現,會暫停線程去等待鎖,直到鎖釋放。

​ 執行這樣的上下文切換,會丟失之前保存的數據和指令。由於消費者和生產者之間的速度差異,隊列總是接近滿或者空的狀態,這種狀態會導致高水平的寫入爭用。

1.3.1 傳統隊列問題

首先這裏説的隊列也僅限於Java內部的消息隊列

隊列 有界性 結構 隊列類型
ArrayBlockingQueue 有界 加鎖 數組 阻塞
LinkedBlockingQueue 可選 加鎖 鏈表 阻塞
ConcurrentLinkedQueue 無界 無鎖 鏈表 非阻塞
LinkedTransferQueue 無界 無鎖 鏈表 阻塞
PriorityBlockingQueue 無界 加鎖 阻塞
DelayQueue 無界 加鎖 阻塞

1.3.2 Disruptor應用場景

參考使用到disruptor的一些框架.

1.3.2.1 log4j2

​ Log4j2異步日誌使用到了disruptor, 日誌一般是有緩衝區, 滿了才寫到文件, 增量追加文件結合NIO等應該也比較快, 所以無論是EventHandler還是WorkHandler處理應該延遲比較小的, 寫的文件也不多, 所以場景是比較合適的。

1.3.2.2 Jstorm

​ 在流處理中不同線程中數據交換,數據計算可能蠻多內存中計算, 流計算快進快出,disruptor應該不錯的選擇。

1.3.2.3 百度uid-generator

​ 部分使用Ring buffer和去偽共享等思路緩存已生成的uid, 應該也部分參考了disruptor吧。

1.4 Disruptor 的核心概念

先從瞭解 Disruptor 的核心概念開始,來了解它是如何運作的。下面介紹的概念模型,既是領域對象,也是映射到代碼實現上的核心對象。

1.4.1 Ring Buffer

Disruptor中的數據結構,用於存儲生產者生產的數據

​ 如其名,環形的緩衝區。曾經 RingBuffer 是 Disruptor 中的最主要的對象,但從3.0版本開始,其職責被簡化為僅僅負責對通過 Disruptor 進行交換的數據(事件)進行存儲和更新。在一些更高級的應用場景中,Ring Buffer 可以由用户的自定義實現來完全替代。

1.4.2 Sequence

序號,在Disruptor框架中,任何地方都有序號

​ 生產者生產的數據放在RingBuffer中的哪個位置,消費者應該消費哪個位置的數據,RingBuffer中的某個位置的數據是什麼,這些都是由這個序號來決定的。這個序號可以簡單的理解為一個AtomicLong類型的變量。其使用了padding的方法去消除緩存的偽共享問題。

1.4.3 Sequencer

序號生成器,這個類主要是用來協調生產者的

​ 在生產者生產數據的時候,Sequencer會產生一個可用的序號(Sequence),然後生產者就就知道數據放在環形隊列的那個位置了。

​ Sequencer是Disruptor的真正核心,此接口有兩個實現類 SingleProducerSequencer、MultiProducerSequencer ,它們定義在生產者和消費者之間快速、正確地傳遞數據的併發算法。

1.4.4 Sequence Barrier

序號屏障

​ 我們都知道,消費者在消費數據的時候,需要知道消費哪個位置的數據。消費者總不能自己想取哪個數據消費,就取哪個數據消費吧。這個SequencerBarrier起到的就是這樣一個“柵欄”般的阻隔作用。你消費者想消費數據,得,我吿訴你一個序號(Sequence),你去消費那個位置上的數據。要是沒有數據,就好好等着吧

1.4.5 Wait Strategy

Wait Strategy決定了一個消費者怎麼等待生產者將事件(Event)放入Disruptor中。

​ 設想一種這樣的情景:生產者生產的非常慢,而消費者消費的非常快。那麼必然會出現數據不夠的情況,這個時候消費者怎麼進行等待呢?WaitStrategy就是為了解決問題而誕生的。

1.4.6 Event

​ 從生產者到消費者傳遞的數據叫做Event。它不是一個被 Disruptor 定義的特定類型,而是由 Disruptor 的使用者定義並指定。

1.4.7 EventHandler

​ Disruptor 定義的事件處理接口,由用户實現,用於處理事件,是 Consumer 的真正實現。

1.4.8 Producer

​ 即生產者,只是泛指調用 Disruptor 發佈事件的用户代碼,Disruptor 沒有定義特定接口或類型。

1.5 Disruptor特性

​ Disruptor其實就像一個隊列一樣,用於在不同的線程之間遷移數據,但是Disruptor也實現了一些其他隊列沒有的特性,如:

  • 同一個“事件”可以有多個消費者,消費者之間既可以並行處理,也可以相互依賴形成處理的先後次序(形成一個依賴圖);
  • 預分配用於存儲事件內容的內存空間;
  • 針對極高的性能目標而實現的極度優化和無鎖的設計;

2. Disruptor入門

我們使用一個簡單的例子來體驗一下Disruptor,生產者會傳遞一個long類型的值到消費者,消費者接受到這個值後會打印出這個值。

2.1 添加依賴

<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.4.2</version>
</dependency>

2.2 Disruptor API

Disruptor 的 API 十分簡單,主要有以下幾個步驟

2.2.1 定義事件

首先創建一個 LongEvent 類,這個類將會被放入環形隊列中作為消息內容。

事件(Event)就是通過 Disruptor 進行交換的數據類型。

public class LongEvent {
    private long value;

    public void set(long value) {
        this.value = value;
    }

    public long getValue() {
        return value;
    }
}

2.2.2 定義事件工廠

為了使用Disruptor的內存預分配event,我們需要定義一個EventFactory

​ 事件工廠(Event Factory)定義瞭如何實例化前面第1步中定義的事件(Event),需要實現接口 com.lmax.disruptor.EventFactory\<T\>。

Disruptor 通過 EventFactory 在 RingBuffer 中預創建 Event 的實例。

​ 一個 Event 實例實際上被用作一個“數據槽”,發佈者發佈前,先從 RingBuffer 獲得一個 Event 的實例,然後往 Event 實例中填充數據,之後再發布到 RingBuffer 中,之後由 Consumer 獲得該 Event 實例並從中讀取數據。

public class LongEventFactory implements EventFactory<LongEvent> {
    public LongEvent newInstance() {
        return new LongEvent();
    }
}

2.2.3 定義事件處理的具體實現

為了讓消費者處理這些事件,所以我們這裏定義一個事件處理器,負責打印event

通過實現接口 com.lmax.disruptor.EventHandler\<T\> 定義事件處理的具體實現。

public class LongEventHandler implements EventHandler<LongEvent> {
    public void onEvent(LongEvent event, long sequence, boolean endOfBatch) {
        //CommonUtils.accumulation();
        System.out.println("consumer:" + Thread.currentThread().getName() + " Event: value=" + event.getValue() + ",sequence=" + sequence);
    }
}

2.2.4 指定等待策略

Disruptor 定義了 com.lmax.disruptor.WaitStrategy 接口用於抽象 Consumer 如何等待新事件,這是策略模式的應用

WaitStrategy YIELDING_WAIT = new YieldingWaitStrategy();

2.2.5 啟動 Disruptor

注意ringBufferSize的大小必須是2的N次方

// 指定事件工廠
LongEventFactory factory = new LongEventFactory();

// 指定 ring buffer字節大小, 必須是2的N次方
int bufferSize = 1024;

//單線程模式,獲取額外的性能
Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory,
                                                          bufferSize, Executors.defaultThreadFactory(),
                                                          ProducerType.SINGLE,
                                                          new YieldingWaitStrategy());
//設置事件業務處理器---消費者
disruptor.handleEventsWith(new LongEventHandler());

//啟動disruptor線程
disruptor.start();

2.2.6 使用Translators發佈事件

在Disruptor的3.0版本中,由於加入了豐富的Lambda風格的API,可以用來幫組開發人員簡化流程。所以在3.0版本後首選使用Event Publisher/Event Translator來發布事件。

public class LongEventProducerWithTranslator {
    private final RingBuffer<LongEvent> ringBuffer;

    public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    private static final EventTranslatorOneArg<LongEvent, Long> TRANSLATOR =
            new EventTranslatorOneArg<LongEvent, Long>() {
                public void translateTo(LongEvent event, long sequence, Long data) {
                    event.set(data);
                }
            };

    public void onData(Long data) {
        ringBuffer.publishEvent(TRANSLATOR, data);
    }
}

2.2.7 關閉 Disruptor

disruptor.shutdown();//關閉 disruptor,方法會堵塞,直至所有的事件都得到處理

2.3 代碼整合

2.3.1 LongEventMain

消費者-生產者啟動類,其依靠構造Disruptor對象,調用start()方法完成啟動線程。Disruptor 需要ringbuffer環,消費者數據處理工廠,WaitStrategy等

  • ByteBuffer 類字節buffer,用於包裝消息。
  • ProducerType.SINGLE為單線程 ,可以提高性能
public class LongEventMain {
    public static void main(String[] args) {
        // 指定事件工廠
        LongEventFactory factory = new LongEventFactory();

        // 指定 ring buffer字節大小, 必須是2的N次方
        int bufferSize = 1024;

        //單線程模式,獲取額外的性能
        Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory,
                bufferSize, Executors.defaultThreadFactory(),
                ProducerType.SINGLE,
                new YieldingWaitStrategy());

        //設置事件業務處理器---消費者
        disruptor.handleEventsWith(new LongEventHandler());

        //啟動disruptor線程
        disruptor.start();
        // 獲取 ring buffer環,用於接取生產者生產的事件
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();

        //為 ring buffer指定事件生產者
        LongEventProducerWithTranslator producer = new LongEventProducerWithTranslator(ringBuffer);
        //循環遍歷
        for (int i = 0; i < 100; i++) {
            //獲取一個隨機數
            long value = (long) ((Math.random() * 1000000) + 1);
            //發佈數據
            producer.onData(value);
        }
        //停止disruptor線程
        disruptor.shutdown();
    }
}

2.3.2 運行測試

測試結果

consumer:pool-1-thread-1 Event: value=579797,sequence=0
consumer:pool-1-thread-1 Event: value=974942,sequence=1
consumer:pool-1-thread-1 Event: value=978977,sequence=2
consumer:pool-1-thread-1 Event: value=398080,sequence=3
consumer:pool-1-thread-1 Event: value=867251,sequence=4
consumer:pool-1-thread-1 Event: value=796707,sequence=5
consumer:pool-1-thread-1 Event: value=786555,sequence=6
consumer:pool-1-thread-1 Event: value=182193,sequence=7
.....

Event: value = 為消費者接收到的數據,sequence為數據在ringbuffer環的位置。

如果本文對您有幫助,歡迎 關注點贊 `,您的支持是我堅持創作的動力。

轉載請註明出處!