【優化技術專題】「線程間的高性能消息框架」再次細節領略Disruptor的底層原理和優勢分析

語言: CN / TW / HK

theme: smartblue

小知識,大挑戰!本文正在參與“程序員必備小知識”創作活動。

Disruptor原理

首先Disruptor是為了解決高併發緩存的隊列,為線程間通訊提供高效的性能,它是如何做到無阻塞、多生產、多消費的?

上圖簡單的畫了一下構建Disruptor的各個參數以及 ringBuffer 的構造,下面簡單的説一下。

生產者需要組件

生產者,產生消息,並將消息發佈到RingBuffer內存隊列中。

  • Event模型:從生產者傳遞給消費者的數據單位,完全由用户定義其類型。

java @Data public class SampleEvent { private Long id。 private String sampleDataStr。 }

  • EventFactory:創建事件(任務)的工廠類。(這裏任務會創建好,保存在內存中,可以看做是一個空任務)。

java public class SampleEventFactory implements EventFactory<SampleEvent> { @Override public SampleEvent newInstance() { // 實例化數據(建好空數據,等待後面初始化) return new SampleEvent()。 } }

  • RingBuffer:環形緩衝區通常被認為是Disruptor的主要實現,當前版本即3.0版本之後,RingBuffer僅負責存儲和更新通過Disruptor的數據(Event)。

    • ringBufferSize:容器的長度。( Disruptor 的核心容器是 ringBuffer,環轉數組,有限長度)。
  • ProductType:生產者類型:單生產者、多生產者。

    • Sequencer:Sequencer是Disruptor的核心API。該接口的2個實現類(SingleProducer,MultiProducer)實現了所有併發算法,用於在生產者和消費者之間快速,正確地傳遞數據。
  • WaitStrategy:等待策略。(當隊列裏的數據都被消費完之後,消費者和生產者之間的等待策略),等待策略確定消費者如何等待生產者將事件放入Disruptor。

  • RingBuffer:存放數據的容器。

java @Data @AllArgsConstructor public class SampleEventProducer { private RingBuffer<OrderEvent> ringBuffer。 public void sendData(long id) { //獲取下一個可用序號 long sequence = ringBuffer.next()。 try { //獲取一個空對象(沒有填充值) SampleEvent sampleEent = ringBuffer.get(sequence)。 }finally { //提交 ringBuffer.publish(sequence)。 } } }

消費者需要組件

  • Executor:消費者線程池,執行任務的線程。(每一個消費者都需要從線程池裏獲得線程去消費任務)。

  • EventProcessor:用於處理來自Disruptor的事件的主事件循環,並具有消費者序列的所有權。有一個名為 BatchEventProcessor表示,它包含事件循環的有效實現,並將回調到使用的提供的EventHandler接口實現。

  • EventHandler:事件處理器,由用户實現並代表Disruptor的使用者的接口,用户客户端實現消息的處理機制,由客户端具體實現。 ```java public class SampleEventHandler implements EventHandler {

    /* * 事件驅動監聽--消費者消費的主體 / @Override public void onEvent(SampleEvent event, long sequence, boolean endOfBatch) throws Exception { System.out.println(event.getSampleDataStr() + " " +Thread.currentThread().getName())。 } } ```

算法核心Sequence序號

  • Sequence:Disruptor使用Sequences作為識別特定組件所在位置的方法。

    • 每個消費者(EventProcessor)都像Disruptor本身一樣維護一個Sequence。大多數併發代碼依賴於這些Sequence值的變化或者叫移動,因此Sequence支持AtomicLong的許多當前功能。

    • 事實上,唯一真正的區別是Sequence包含額外的功能,以防止序列和其他值之間的錯誤共享。

  • Sequence Barrier:序列屏障由Sequencer產生,包含對Sequencer中主要發佈的sequence和任何依賴性消費者的序列的引用。它包含確定是否有任何可供消費者處理的事件的邏輯。

Disruptor的優點:

  1. 多線程之間沒有競爭即沒有鎖。

  2. 所有訪問者都記錄自己的序號的實現方式,允許多個生產者與多個消費者共享相同的數據結構。

  3. 每個對象中都能跟蹤序列號(ring buffer, claim strategy,生產者和消費者),加上神奇的緩存行填充,就意味着沒有偽共享和非預期的競爭。

下面再簡單介紹下RingBuffer核心實現,來看看隊列的實現細節。

其為環形隊列,有點像一致性Hash算法中的閉環,但完全不一樣。

底層的話是一個固定大小的數組結構,相比於隊列來説,其只有一個下標指針cursor,如果槽的個數是2的N次方更有利於基於二進制的計算機進行計算。如果看過HashMap源碼應該知道,HashMap定位元素槽時使用了一種巧妙的方式,hash&(length-1)。

RingBuffer同樣是相同的計算方式,sequence&(length-1),當然你可以進行取模操作。

  • 取模操作在寄存器中的計算,需要多次的迭代加操作進行的,所以相對於計算速度來説,對於計算機進行位運算效率絕對是高於取模操作的,尤其是對於高併發狀況下的計算,能夠節省很多單位cpu開銷

一般實現線性存儲有兩種實現方式:

  • 一種是基於連續內存分配的HashTable
  • 一種是基於隨機內存分配的迭代指針。

為什麼RingBuffer選用數組作為存儲結構,而不選用鏈表存儲?

緩存或者程序的局部性原理

  • (Good)數組內存屬是連續分配內存的預讀策略,也就是內存加載時,會將部分連續內存地址預先加載到高速緩存中,即認為你可能會使用,上面我們分析了操作系統中的cpu操作數據的流程,可以看出這種設計是為了不用反覆從內存中加載。

  • (Bad)鏈表的內存分配是碎片化的所以其存儲地址不是連續的,導致每次都會cpu都會重新計算下一個鏈表位置的地址,並從內存中加載相關的數據,數據量小的情況下並不能看出性能的優劣,但是當數據量大的情況下,這種極小的消耗,會對整體的運行效率產生影響。

因為RingBuffer不會涉及到存儲地址的修改和維護,就因為選用數組就對性能產生了有利和積極的影響。

偽共享

內存以高速緩存行的形式存儲在高速緩存系統中。高速緩存行是2的N次方個連續字節,其大小通常為32-256,最常見的緩存行大小為64字節

偽共享是一個術語,適用於線程在修改共享同一緩存行的獨立變量時無意中影響彼此的性能。在高速緩存行上寫入爭用是實現SMP系統中並行執行線程的可伸縮性的最大限制因素。(出自百度定義!)

  • 首先我們知道對於鎖來説是關中斷實現,鎖定bus消息總線實現,而對於共享內存,計算機使用的是緩存行,共享變量的多個線程,共享相同的緩存行。

  • 實現線程數量的線性可伸縮性,我們必須確保沒有兩個線程寫入同一個變量或緩存行。而當使用volatile的時候,我們讀取直接共享變量從主內存或者叫共享內存中讀取變量的值,其本質是使計算機緩存行失效。

在CPU核心A運行的線程想要更新變量X,而CPU核心B上的線程想要更新變量Y。

這兩個熱變量位於同一緩存行中。每個線程都將競爭緩存行的所有權,以便他們可以更新它。如果核心A獲得所有權,那麼MESI/MOSI緩存子系統將需要使核心B的相應緩存行無效。反之也是一樣,極大地影響性能。如果競爭核心在不同的套接字上並且還必須跨越套接字互連,則緩存行問題將進一步加劇。

總結一下:如果多個線程操作不同的成員變量, 但是這些變量存儲在同一個緩存行,如果有處理器更新了緩存行的數據並刷新到主存之後,根據緩存一致性原則,其他處理器將失效該緩存行(I狀態)導致緩存未命中,需要重新去內存中讀取最新數據,這就是偽共享問題。

  • 特別是不同的線程操作同一個緩存行,需要發出RFO(Request for Owner)信號鎖定緩存行,保證寫操作的原子性,此時其他線程不能操作這個緩存行,這將對效率有極大的影響。

為了避免避免經常執行寫操作的變量因為在同一個緩存行而導致的偽共享問題,常用的解決方式就是緩存行填充,或者稱為緩存行對齊。

緩存行填充的概念

當多個線程同時對共享的緩存行進行寫操作的時候,因為緩存系統自身的緩存一致性原則,會引發偽共享問題,解決的常用辦法是將共享變量根據緩存行大小進行補充對齊,使其加載到緩存時能夠獨享緩存行,避免與其他共享變量存儲在同一個緩存行。

下面是緩存行實現,另外緩存行填充有一個前提同時分配的對象往往位於同一位置。

java public long p1, p2, p3, p4, p5, p6, p7; // cache line padding private volatile long cursor = INITIAL_CURSOR_VALUE; public long p8, p9, p10, p11, p12, p13, p14; // cache line padding

如果有不同的消費者往不同的字段寫入,你需要確保各個字段間不會出現偽共享。

java /** * 數組保存了VolatileLongPadding,其中數組中一個long類型保存數組長度,算上 * 自身long類型value,需要再填充6個long類型,就能將數組中的對象填充滿一個緩存行。 * 注意:這裏使用繼承的方式實現緩存行對齊,因為Java編譯器會優化無效的字段。 */ class CacheLinePadding { // 如果不需要填充,只需要註釋掉這段代碼即可 public volatile long p1, p2, p3, p4, p5, p6; } class CacheLinePaddingObject extends CacheLinePadding { //實際操作的值 public volatile long value = 0L; }

RingBuffer實現上同樣也是使用了緩存行填充,保證了數組中的數據沒有偽共享的存在,RingBuffer除了一個long類型的cursor索引指針,p1->p7都為緩存行填充,一般來講8個Long類型的字段,正好是64Byte,會填充一個緩存行,當然前提是你需要,因為畢竟還有你自己的數據信息字段。

「其他文章」