【優化技術專題】「線程間的高性能消息框架」終極關注Disruptor的核心源碼和Java8的@Contended偽共享指南

語言: CN / TW / HK

theme: smartblue

小知識,大挑戰!本文正在參與“程序員必備小知識”創作活動。

Disruptor原理分析

Disruptor關聯好任務處理事件後,就調用了disruptor.start() 方法,可以看出在調用了 start() 方法後,消費者線程就已經開啟。

啟動Disruptor

start() ->開啟 Disruptor,運行事件處理器。

java public RingBuffer<T> start(){ checkOnlyStartedOnce(); //在前面 handleEventsWith() 方法裏添加的 handler 對象會加入到 consumerRepository 裏,這裏遍歷 consumerRepository 開啟消費者線程 for (final ConsumerInfo consumerInfo : consumerRepository){ //從線程池中獲取一個線程來開啟消費事件處理器。(消費者開啟監聽,一旦有生產者投遞,即可消費) //這裏開啟的線程對象為BatchEventProcessor的實例 consumerInfo.start(executor)。 } return ringBuffer。 }

關聯事件

handleEventsWith() -> createEventProcessors()調用的核心方法,作用是創建事件處理器。

java @SafeVarargs public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers){ return createEventProcessors(new Sequence[0], handlers); }

存儲事件

將EventHandler對象綁定存儲到consumerRepository內部,並且交由BatchEventProcessor處理器進行代理執行。

java EventHandlerGroup<T> createEventProcessors( final Sequence[] barrierSequences, final EventHandler<? super T>[] eventHandlers){ ... final Sequence[] processorSequences = new Sequence[eventHandlers.length]; //創建 sequence 序號柵欄 final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences)。 for (int i = 0, eventHandlersLength = eventHandlers.length。i < eventHandlersLength。i++){ final EventHandler<? super T> eventHandler = eventHandlers[i]; final BatchEventProcessor<T> batchEventProcessor = new BatchEventProcessor<>(ringBuffer, barrier, eventHandler)。 ... //這裏將消費者加入到 consumerRepository 中---ConsumerRepository consumerRepository.add(batchEventProcessor, eventHandler, barrier)。 processorSequences[i] = batchEventProcessor.getSequence()。 } ... }

  • handleEventsWith() 方法中,可以看到構建了一個 BatchEventProcessor(繼承了 Runnable 接口)對象,start()方法啟動的同樣也是這個對象的實例。

  • 這個對象繼承自 EventProcessor ,EventProcessor 是 Disruptor 裏非常核心的一個接口,它的實現類的作用是輪詢接收RingBuffer提供的事件,並在沒有可處理事件是實現等待策略。

  • 這個接口的實現類必須要關聯一個線程去執行,通常我們不需要自己去實現它。

BatchEventProcessor類

BatchEventProcessor:主要事件循環,處理 Disruptor 中的 event,擁有消費者的 Sequence。

核心私有成員變量
  • Sequence :維護當前消費者消費的 ID。

  • SequenceBarrier :序號屏障,協調消費者的消費 ID,主要作用是獲取消費者的可用序號,並提供等待策略的執行。

  • EventHandler<? super T> :消費者的消費邏輯(我們實現的業務邏輯)。

  • DataProvider :獲取消費對象。RingBuffer 實現了此接口,主要是提供業務對象。

核心方法
  • processEvents():由於 BatchEventProcessor 繼承自 Runnable 接口,所以在前面啟動它後,run() 方法會執行,而 run() 方法內部則會調用此方法。

java private void processEvents() { T event = null。 獲取當前消費者維護的序號中並+1,即下一個消費序號 long nextSequence = sequence.get() + 1L。 while (true) { try { //獲取可執行的最大的任務 ID,如果沒有,waitFor() 方法內會進行等待 final long availableSequence = sequenceBarrier.waitFor(nextSequence)。 if (batchStartAware != null && availableSequence >= nextSequence) { batchStartAware.onBatchStart(availableSequence - nextSequence + 1)。 } //不斷獲取對應位置的任務進行消費 直到上面查詢到的 availableSequence 消費完 while (nextSequence <= availableSequence) { event = dataProvider.get(nextSequence)。 eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence)。 nextSequence++。 } sequence.set(availableSequence)。 } ... } }

  • 消費者事件處理器的核心代碼,sequenceBarrier.waitFor(nextSequence) 方法內部,會比較當前消費者序號與可用序號的大小:

    • 當可用序號(availableSequence)大於當前消費者序號(nextSequence),再獲取到當前可用的最大的事件序號 ID(waitFot()方法內部調用 sequencer.getHighestPublishedSequence(sequence, availableSequence)),進行循環消費。
    • 可用序號是維護在 ProcessingSequenceBarrier 裏的,ProcessingSequenceBarrier 是通過 ringBuffer.newBarrier() 創建出來的。

由圖可以看出,在獲得可用序號時,SequenceBarrier 在 EventProcessor 和 RingBuffer中充當協調的角色。

多消費事件和單消費事件在dependentSequence 上的處理有一些不同,可以看下 ProcessingSequenceBarrier 的 dependentSequence 的賦值以及 get() 方法 (Util.getMinimumSequence(sequences))。

啟動過程分析之生產者

首先調用了 ringBuffer.next() 方法,獲取可用序號,再獲取到該序號下事先通過 Eventfactory 創建好的空事件對象,在我們對空事件對象進行賦值後,再調用 publish 方法將事件發佈,則消費者就可以獲取進行消費了。

生產者這裏的核心代碼如下,這裏我截取的是多生產者模式下的代碼:

java public long next(int n){ if (n < 1 || n > bufferSize) { throw new IllegalArgumentException("n must be > 0 and < bufferSize")。 } long current。 long next。 do{ //cursor 為生產者維護的 sequence 序列,獲取到當前可投遞的的下標,即當前投遞到該位置 current = cursor.get()。 //再+n獲取下一個下標,即下一次投遞的位置。 next = current + n。 long wrapPoint = next - bufferSize。 //目的:也是實現快讀的讀寫。gatingSequenceCache獨佔緩存行 long cachedGatingSequence = gatingSequenceCache.get()。 if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current){ //獲取消費者最小序號 long gatingSequence = Util.getMinimumSequence(gatingSequences, current)。 if (wrapPoint > gatingSequence) { //如果不符合,則阻塞線程 1ns(park()不會有死鎖的問題) LockSupport.parkNanos(1)。 // TODO, should we spin based on the wait strategy? continue。 } gatingSequenceCache.set(gatingSequence)。 } //多個生產者時要保證線程安全(這裏更新的 cursor 同時也是等待策略裏的 waitFor() 方法的 cursor 參數,因此這裏更新成功後,則等待策略會通過,表示有新的任務進來,就會消費) else if (cursor.compareAndSet(current, next)){ break。 } }while (true); return next。 }

cursor 對象和 Util.getMinimumSequence(gatingSequences, current) 方法,cursor 對象是生產者維護的一個生產者序號,標示當前生產者已經生產到哪一個位置以及下一個位置,它是 Sequence 類的一個實例化對象

  • 從圖裏可以看出,Sequence 繼承以及間接繼承了 RhsPadding 和 LhsPadding 類,而這倆個類都各定義了 7 個 long 類型的成員變量。

  • 而 Sequence 的 get() 方法返回的也是一個 long 類型的值 value。這是上一篇文章介紹的充緩存行,消除偽共享。

  • 在 64 位的計算機中,單個緩存行一般佔 64 個字節,當 cpu 從換存裏取數據時,會將該相關數據的其它數據取出來填滿一個緩存行,這時如果其它數據更新,則緩存行緩存的該數據也會失效,當下次需要使用該數據時又需要重新從內存中提取數據。

  • ArrayBlockingQueue 獲取數據時,很容易碰到偽共享導致緩存行失效,而 Disruptor這裏當在 value 的左右各填充 7 個 long 類型的數據時,每次取都能確保該數據獨佔緩存行,也不會有其他的數據更新導致該數據失效。避免了偽共享的問題( jdk 的併發包下也有一些消除偽共享的設計)。


RingBuffer:它是一個首尾相接的環狀的容器,用來在多線程中傳遞數據。第一張圖裏面創建 Disruptor 的多個參數其實都是用來創建 RingBuffer 的,比如生產者類型(單 or 多)、實例化工廠、容器長度、等待策略等。

簡單分析,多個生產者同時向 ringbuffer 投遞數據,假設此時倆個生產者將 ringbuffer 已經填滿,因為 sequence 的序號是自增+1(若不滿足獲取條件則循環掛起當前線程),所以生產的時候能保證線程安全,只需要一個 sequence 即可。

當多消費者來消費的時候,因為消費速度不同,例如消費者 1 來消費 0、1,消費者 2 消費 2、4,消費者 3 消費 3。

當消費者消費完 0 後,消費者 2 消費完 2 後,消費者 3 消費完 3 後,生產者再往隊列投遞數據時,其他位置還未被消費,會投遞到第 0 個位置, 此時再想投遞數據時,雖然消費 2 的第二個位置空缺、消費者 3 的第三個位置空缺,消費者還在消費 1 時,無法繼續投遞。因為是通過比較消費者自身維護的 sequence 的最小的序號,來進行比較。

Util.getMinimumSequence(gatingSequences, current) 方法也就無需再多説,它就是為了獲取到多個消費者的最小序號,判斷當前 ringBuffer 中的剩餘可用序號是否大於消費者最小序號,是的話,則不能投遞,需要阻塞當前線程(LockSupport.parkNanos(1))。

當消費者消費速度大於生產者生產者速度,生產者還未來得及往隊列寫入,或者生產者生產速度大於消費者消費速度,此時怎麼辦呢?而且上面也多次提到沒有滿足條件的消費事件時,消費者會等待,接下來説一下消費者的等待策略。

個人常用的策略:

  • BlockingWaitStrategy 使用了鎖,低效的策略。

  • SleepingWaitStrategy 對生產者線程的影響最小,適合用於異步日誌類似的場景。(不加鎖空等)

  • YieldingWaitStrategy 性能最好,適合用於低延遲的系統,在要求極高性能且之間處理線數小於 cpu 邏輯核心數的場景中,推薦使用。

```java @Override public long waitFor( final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier) throws AlertException, InterruptedException{ long availableSequence。 int counter = SPIN_TRIES。//100 while ((availableSequence = dependentSequence.get()) < sequence){ counter = applyWaitMethod(barrier, counter)。 } return availableSequence。 } private int applyWaitMethod(final SequenceBarrier barrier, int counter) throws AlertException { barrier.checkAlert()。

    if (0 == counter)
    {
        Thread.yield()。
    }
    else
    {
        --counter。
    }
    return counter。
}

```

Java 8 Contended註解

  • 在Java 8中,可以採用@Contended在類級別上的註釋,來進行緩存行填充。這樣,可以解決多線程情況下的偽共享衝突問題。

  • Contended可以用於類級別的修飾,同時也可以用於字段級別的修飾,當應用於字段級別時,被註釋的字段將和其他字段隔離開來,會被加載在獨立的緩存行上。在字段級別上,@Contended還支持一個“contention group”屬性(Class-Level不支持),同一group的字段們在內存上將是連續(64字節範圍內),但和其他他字段隔離開來。

@Contended註釋的行為如下所示:

在類上應用Contended:

java @Contended public static class ContendedTest2 { private Object plainField1; private Object plainField2; private Object plainField3; private Object plainField4; }

將使整個字段塊的兩端都被填充:(以下是使用 –XX:+PrintFieldLayout的輸出)

TestContended$ContendedTest2: field layout Entire class is marked contended @140 --- instance fields start --- @140 "plainField1" Ljava.lang.Object; @144 "plainField2" Ljava.lang.Object; @148 "plainField3" Ljava.lang.Object; @152 "plainField4" Ljava.lang.Object; @288 --- instance fields end --- @288 --- instance ends ---

注意,我們使用了128 bytes的填充 – 2倍於大多數硬件緩存行的大小(cache line一般為64 bytes) – 來避免相鄰扇區預取導致的偽共享衝突。

在字段上應用Contended:

java public static class ContendedTest1 { @Contended private Object contendedField1; private Object plainField1; private Object plainField2; private Object plainField3; private Object plainField4; }

將導致該字段從連續的字段塊中分離開來並高效的添加填充:

TestContended$ContendedTest1: field layout @ 12 --- instance fields start --- @ 12 "plainField1" Ljava.lang.Object; @ 16 "plainField2" Ljava.lang.Object; @ 20 "plainField3" Ljava.lang.Object; @ 24 "plainField4" Ljava.lang.Object; @156 "contendedField1" Ljava.lang.Object; (contended, group = 0) @288 --- instance fields end --- @288 --- instance ends ---

註解多個字段使他們分別被填充:

java public static class ContendedTest4 { @Contended private Object contendedField1; @Contended private Object contendedField2; private Object plainField3; private Object plainField4; }

被註解的2個字段都被獨立地填充:

TestContended$ContendedTest4: field layout @ 12 --- instance fields start --- @ 12 "plainField3" Ljava.lang.Object; @ 16 "plainField4" Ljava.lang.Object; @148 "contendedField1" Ljava.lang.Object; (contended, group = 0) @280 "contendedField2" Ljava.lang.Object; (contended, group = 0) @416 --- instance fields end --- @416 --- instance ends ---

在有些cases中,你會想對字段進行分組,同一組的字段會和其他字段有訪問衝突,但是和同一組的沒有。例如,(同一個線程的)代碼同時更新2個字段是很常見的情況。

```java public static class ContendedTest5 { @Contended("updater1") private Object contendedField1;

    @Contended("updater1")
    private Object contendedField2;

    @Contended("updater2")
    private Object contendedField3;

    private Object plainField5;
    private Object plainField6;
}

```

內存佈局是: TestContended$ContendedTest5: field layout @ 12 --- instance fields start --- @ 12 "plainField5" Ljava.lang.Object; @ 16 "plainField6" Ljava.lang.Object; @148 "contendedField1" Ljava.lang.Object; (contended, group = 12) @152 "contendedField2" Ljava.lang.Object; (contended, group = 12) @284 "contendedField3" Ljava.lang.Object; (contended, group = 15) @416 --- instance fields end --- @416 --- instance ends ---

@Contended在字段級別,並且帶分組的情況下,是否能解決偽緩存問題。

java import sun.misc.Contended; public class VolatileLong { @Contended("group0") public volatile long value1 = 0L; @Contended("group0") public volatile long value2 = 0L; @Contended("group1") public volatile long value3 = 0L; @Contended("group1") public volatile long value4 = 0L; }

用2個線程來修改字段

  • 測試1:線程0修改value1和value2;線程1修改value3和value4;他們都在同一組中。

  • 測試2:線程0修改value1和value3;線程1修改value2和value4;他們在不同組中。

測試1

```java public final class FalseSharing implements Runnable { public final static long ITERATIONS = 500L * 1000L * 1000L; private static Volatile Long volatileLong; private String groupId; public FalseSharing(String groupId) { this.groupId = groupId; } public static void main(final String[] args) throws Exception { // Thread.sleep(10000); System.out.println("starting...."); volatileLong = new VolatileLong(); final long start = System.nanoTime(); runTest(); System.out.println("duration = " + (System.nanoTime() - start)); }

private static void runTest() throws InterruptedException {
    Thread t0 = new Thread(new FalseSharing("t0"));
    Thread t1 = new Thread(new FalseSharing("t1"));
    t0.start();
    t1.start();
    t0.join();
    t1.join();
}
public void run() {
    long i = ITERATIONS + 1;
    if (groupId.equals("t0")) {
        while (0 != --i) {
            volatileLong.value1 = i;
            volatileLong.value2 = i;
        }
    } else if (groupId.equals("t1")) {
        while (0 != --i) {
            volatileLong.value3 = i;
            volatileLong.value4 = i;
        }
    }
}

}

public void run() { long i = ITERATIONS + 1; if (groupId.equals("t0")) { while (0 != --i) { volatileLong.value1 = i; volatileLong.value3 = i; } } else if (groupId.equals("t1")) { while (0 != --i) { volatileLong.value2 = i; volatileLong.value4 = i; } } } ```

「其他文章」