如此狂妄,自稱高效能佇列的Disruptor有啥來頭?

語言: CN / TW / HK

併發框架Disruptor

file

1. Disruptor概述

1.1 背景

​ Disruptor是英國外匯交易公司LMAX開發的一個高效能佇列,研發的初衷是解決記憶體佇列的延遲問題(在效能測試中發現竟然與I/O操作處於同樣的數量級),基於Disruptor開發的系統單執行緒能支撐每秒600萬訂單,2010年在QCon演講後,獲得了業界關注,2011年,企業應用軟體專家Martin Fowler專門撰寫長文介紹。同年它還獲得了Oracle官方的Duke大獎。

​ 目前,包括Apache Storm、Camel、Log4j 2在內的很多知名專案都應用了Disruptor以獲取高效能。

​ 需要特別指出的是,這裡所說的佇列是系統內部的記憶體佇列,而不是Kafka這樣的分散式佇列。

有界無鎖 高併發佇列

1.2 什麼是Disruptor

​ Disruptor是用於一個JVM中多個執行緒之間的訊息佇列,作用與ArrayBlockingQueue有相似之處,但是Disruptor從功能、效能都遠好於ArrayBlockingQueue,當多個執行緒之間傳遞大量資料或對效能要求較高時,可以考慮使用Disruptor作為ArrayBlockingQueue的替代者。

​ 官方也對Disruptor和ArrayBlockingQueue的效能在不同的應用場景下做了對比,目測效能只有有5~10倍左右的提升。

1.3 為什麼使用Disruptor

​ 傳統阻塞的佇列使用鎖保證執行緒安全,而鎖通過作業系統核心上下文切換實現,會暫停執行緒去等待鎖,直到鎖釋放。

​ 執行這樣的上下文切換,會丟失之前儲存的資料和指令。由於消費者和生產者之間的速度差異,佇列總是接近滿或者空的狀態,這種狀態會導致高水平的寫入爭用。

1.3.1 傳統佇列問題

首先這裡說的佇列也僅限於Java內部的訊息佇列

佇列 有界性 結構 佇列型別
ArrayBlockingQueue 有界 加鎖 陣列 阻塞
LinkedBlockingQueue 可選 加鎖 連結串列 阻塞
ConcurrentLinkedQueue 無界 無鎖 連結串列 非阻塞
LinkedTransferQueue 無界 無鎖 連結串列 阻塞
PriorityBlockingQueue 無界 加鎖 阻塞
DelayQueue 無界 加鎖 阻塞
1.3.2 Disruptor應用場景

參考使用到disruptor的一些框架.

1.3.2.1 log4j2

​ Log4j2非同步日誌使用到了disruptor, 日誌一般是有緩衝區, 滿了才寫到檔案, 增量追加檔案結合NIO等應該也比較快, 所以無論是EventHandler還是WorkHandler處理應該延遲比較小的, 寫的檔案也不多, 所以場景是比較合適的。

1.3.2.2 Jstorm

​ 在流處理中不同執行緒中資料交換,資料計算可能蠻多記憶體中計算, 流計算快進快出,disruptor應該不錯的選擇。

1.3.2.3 百度uid-generator

​ 部分使用Ring buffer和去偽共享等思路快取已生成的uid, 應該也部分參考了disruptor吧。

1.4 Disruptor 的核心概念

先從瞭解 Disruptor 的核心概念開始,來了解它是如何運作的。下面介紹的概念模型,既是領域物件,也是對映到程式碼實現上的核心物件。

1.4.1 Ring Buffer

Disruptor中的資料結構,用於儲存生產者生產的資料

​ 如其名,環形的緩衝區。曾經 RingBuffer 是 Disruptor 中的最主要的物件,但從3.0版本開始,其職責被簡化為僅僅負責對通過 Disruptor 進行交換的資料(事件)進行儲存和更新。在一些更高階的應用場景中,Ring Buffer 可以由使用者的自定義實現來完全替代。

1.4.2 Sequence

序號,在Disruptor框架中,任何地方都有序號

​ 生產者生產的資料放在RingBuffer中的哪個位置,消費者應該消費哪個位置的資料,RingBuffer中的某個位置的資料是什麼,這些都是由這個序號來決定的。這個序號可以簡單的理解為一個AtomicLong型別的變數。其使用了padding的方法去消除快取的偽共享問題。

1.4.3 Sequencer

序號生成器,這個類主要是用來協調生產者的

​ 在生產者生產資料的時候,Sequencer會產生一個可用的序號(Sequence),然後生產者就就知道資料放在環形佇列的那個位置了。

​ Sequencer是Disruptor的真正核心,此介面有兩個實現類 SingleProducerSequencer、MultiProducerSequencer ,它們定義在生產者和消費者之間快速、正確地傳遞資料的併發演算法。

1.4.4 Sequence Barrier

序號屏障

​ 我們都知道,消費者在消費資料的時候,需要知道消費哪個位置的資料。消費者總不能自己想取哪個資料消費,就取哪個資料消費吧。這個SequencerBarrier起到的就是這樣一個“柵欄”般的阻隔作用。你消費者想消費資料,得,我告訴你一個序號(Sequence),你去消費那個位置上的資料。要是沒有資料,就好好等著吧

1.4.5 Wait Strategy

Wait Strategy決定了一個消費者怎麼等待生產者將事件(Event)放入Disruptor中。

​ 設想一種這樣的情景:生產者生產的非常慢,而消費者消費的非常快。那麼必然會出現資料不夠的情況,這個時候消費者怎麼進行等待呢?WaitStrategy就是為了解決問題而誕生的。

1.4.6 Event

​ 從生產者到消費者傳遞的資料叫做Event。它不是一個被 Disruptor 定義的特定型別,而是由 Disruptor 的使用者定義並指定。

1.4.7 EventHandler

​ Disruptor 定義的事件處理介面,由使用者實現,用於處理事件,是 Consumer 的真正實現。

1.4.8 Producer

​ 即生產者,只是泛指呼叫 Disruptor 釋出事件的使用者程式碼,Disruptor 沒有定義特定介面或型別。

1.5 Disruptor特性

​ Disruptor其實就像一個佇列一樣,用於在不同的執行緒之間遷移資料,但是Disruptor也實現了一些其他佇列沒有的特性,如:

  • 同一個“事件”可以有多個消費者,消費者之間既可以並行處理,也可以相互依賴形成處理的先後次序(形成一個依賴圖);
  • 預分配用於儲存事件內容的記憶體空間;
  • 針對極高的效能目標而實現的極度優化和無鎖的設計;

2. Disruptor入門

我們使用一個簡單的例子來體驗一下Disruptor,生產者會傳遞一個long型別的值到消費者,消費者接受到這個值後會打印出這個值。

2.1 新增依賴

<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.4.2</version>
</dependency>

2.2 Disruptor API

Disruptor 的 API 十分簡單,主要有以下幾個步驟

2.2.1 定義事件

首先建立一個 LongEvent 類,這個類將會被放入環形佇列中作為訊息內容。

事件(Event)就是通過 Disruptor 進行交換的資料型別。

public class LongEvent {
    private long value;

    public void set(long value) {
        this.value = value;
    }

    public long getValue() {
        return value;
    }
}
2.2.2 定義事件工廠

為了使用Disruptor的記憶體預分配event,我們需要定義一個EventFactory

​ 事件工廠(Event Factory)定義瞭如何例項化前面第1步中定義的事件(Event),需要實現介面 com.lmax.disruptor.EventFactory<T>。

Disruptor 通過 EventFactory 在 RingBuffer 中預建立 Event 的例項。

​ 一個 Event 例項實際上被用作一個“資料槽”,釋出者釋出前,先從 RingBuffer 獲得一個 Event 的例項,然後往 Event 例項中填充資料,之後再發布到 RingBuffer 中,之後由 Consumer 獲得該 Event 例項並從中讀取資料。

public class LongEventFactory implements EventFactory<LongEvent> {
    public LongEvent newInstance() {
        return new LongEvent();
    }
}
2.2.3 定義事件處理的具體實現

為了讓消費者處理這些事件,所以我們這裡定義一個事件處理器,負責列印event

通過實現介面 com.lmax.disruptor.EventHandler<T> 定義事件處理的具體實現。

public class LongEventHandler implements EventHandler<LongEvent> {
    public void onEvent(LongEvent event, long sequence, boolean endOfBatch) {
        //CommonUtils.accumulation();
        System.out.println("consumer:" + Thread.currentThread().getName() + " Event: value=" + event.getValue() + ",sequence=" + sequence);
    }
}
2.2.4 指定等待策略

Disruptor 定義了 com.lmax.disruptor.WaitStrategy 介面用於抽象 Consumer 如何等待新事件,這是策略模式的應用

WaitStrategy YIELDING_WAIT = new YieldingWaitStrategy();
2.2.5 啟動 Disruptor

注意ringBufferSize的大小必須是2的N次方

// 指定事件工廠
LongEventFactory factory = new LongEventFactory();

// 指定 ring buffer位元組大小, 必須是2的N次方
int bufferSize = 1024;

//單執行緒模式,獲取額外的效能
Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory,
                                                          bufferSize, Executors.defaultThreadFactory(),
                                                          ProducerType.SINGLE,
                                                          new YieldingWaitStrategy());
//設定事件業務處理器---消費者
disruptor.handleEventsWith(new LongEventHandler());

//啟動disruptor執行緒
disruptor.start();
2.2.6 使用Translators釋出事件

在Disruptor的3.0版本中,由於加入了豐富的Lambda風格的API,可以用來幫組開發人員簡化流程。所以在3.0版本後首選使用Event Publisher/Event Translator來發布事件。

public class LongEventProducerWithTranslator {
    private final RingBuffer<LongEvent> ringBuffer;

    public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    private static final EventTranslatorOneArg<LongEvent, Long> TRANSLATOR =
            new EventTranslatorOneArg<LongEvent, Long>() {
                public void translateTo(LongEvent event, long sequence, Long data) {
                    event.set(data);
                }
            };

    public void onData(Long data) {
        ringBuffer.publishEvent(TRANSLATOR, data);
    }
}
2.2.7 關閉 Disruptor
disruptor.shutdown();//關閉 disruptor,方法會堵塞,直至所有的事件都得到處理

2.3 程式碼整合

2.3.1 LongEventMain

消費者-生產者啟動類,其依靠構造Disruptor物件,呼叫start()方法完成啟動執行緒。Disruptor 需要ringbuffer環,消費者資料處理工廠,WaitStrategy等

  • ByteBuffer 類位元組buffer,用於包裝訊息。

  • ProducerType.SINGLE為單執行緒 ,可以提高效能

public class LongEventMain {
    public static void main(String[] args) {
        // 指定事件工廠
        LongEventFactory factory = new LongEventFactory();

        // 指定 ring buffer位元組大小, 必須是2的N次方
        int bufferSize = 1024;

        //單執行緒模式,獲取額外的效能
        Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory,
                bufferSize, Executors.defaultThreadFactory(),
                ProducerType.SINGLE,
                new YieldingWaitStrategy());

        //設定事件業務處理器---消費者
        disruptor.handleEventsWith(new LongEventHandler());

        //啟動disruptor執行緒
        disruptor.start();
        // 獲取 ring buffer環,用於接取生產者生產的事件
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();

        //為 ring buffer指定事件生產者
        LongEventProducerWithTranslator producer = new LongEventProducerWithTranslator(ringBuffer);
        //迴圈遍歷
        for (int i = 0; i < 100; i++) {
            //獲取一個隨機數
            long value = (long) ((Math.random() * 1000000) + 1);
            //釋出資料
            producer.onData(value);
        }
        //停止disruptor執行緒
        disruptor.shutdown();
    }
}
2.3.2 執行測試

測試結果

consumer:pool-1-thread-1 Event: value=579797,sequence=0
consumer:pool-1-thread-1 Event: value=974942,sequence=1
consumer:pool-1-thread-1 Event: value=978977,sequence=2
consumer:pool-1-thread-1 Event: value=398080,sequence=3
consumer:pool-1-thread-1 Event: value=867251,sequence=4
consumer:pool-1-thread-1 Event: value=796707,sequence=5
consumer:pool-1-thread-1 Event: value=786555,sequence=6
consumer:pool-1-thread-1 Event: value=182193,sequence=7
.....

Event: value = 為消費者接收到的資料,sequence為資料在ringbuffer環的位置。 本文由傳智教育博學谷教研團隊釋出。

如果本文對您有幫助,歡迎關注點贊;如果您有任何建議也可留言評論私信,您的支援是我堅持創作的動力。

轉載請註明出處!