disruptor筆記之二:Disruptor類分析
歡迎訪問我的GitHub
https://github.com/zq2599/blog_demos
內容:所有原創文章分類彙總及配套原始碼,涉及Java、Docker、Kubernetes、DevOPS等;
《disruptor筆記》系列連結
本篇概覽
- 通過前文的實戰,咱們對Disruptor有了初步認識,藉助com.lmax.disruptor.dsl.Disruptor類可以輕鬆完成以下操作:
- 環形佇列初始化
- 指定事件消費者
- 啟動消費者執行緒
- 接下來要面對兩個問題:
- 深入瞭解Disruptor類是如何完成上述操作的;
- 對Disruptor類有了足夠了解時,嘗試不用Disruptor,自己動手操作環形佇列,實現訊息的生產和消費,這樣做的目的是加深對Disruptor內部的認識,做到知其所以然;
- 接下來咱們先解決第一個問題吧,結合Disruptor物件的原始碼來看看上述三個操作到底做了什麼;
環形佇列初始化
- 環形佇列初始化發生在例項化Disruptor物件的時候,即Disruptor的構造方法:
public Disruptor(final EventFactory<T> eventFactory, final int ringBufferSize, final ThreadFactory threadFactory)
{
this(RingBuffer.createMultiProducer(eventFactory, ringBufferSize), new BasicExecutor(threadFactory));
}
- <font color="blue">RingBuffer.createMultiProducer</font>方法內部例項化了RingBuffer,如下圖紅框:
- 記下<font color="red">第一個重要知識點</font>:建立RingBuffer物件;
指定事件消費者
- 在前文中,下面這行程式碼指定了事件由StringEventHandler消費:
disruptor.handleEventsWith(new StringEventHandler(eventCountPrinter));
- 檢視handleEventsWith方法的內部:
public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers)
{
return createEventProcessors(new Sequence[0], handlers);
}
- 展開createEventProcessors方法,如下圖,請重點關注建立SequenceBarrier和BatchEventProcessor等操作:
- 展開上圖紅框四中的updateGatingSequencesForNextInChain方法,如下圖,紅框中的ringBuffer.addGatingSequences需要重點關注:
- 小結一下,disruptor.handleEventsWith方法涉及到<font color="red">四個重要知識點</font>:
- 建立SequenceBarrier物件,用於接收ringBuffer中的可消費事件
- 建立BatchEventProcessor,負責消費事件
- 繫結BatchEventProcessor物件的異常處理類
- 呼叫ringBuffer.addGatingSequences,將消費者的Sequence傳給ringBuffer
啟動消費者執行緒
- 前文已通過日誌確定了消費事件的邏輯是在一個獨立的執行緒中執行的,啟動消費者執行緒的程式碼如下:
disruptor.start();
- 展開start方法,如下可見,關鍵程式碼是<font color="blue">consumerInfo.start(executor)</font>:
public RingBuffer<T> start()
{
checkOnlyStartedOnce();
for (final ConsumerInfo consumerInfo : consumerRepository)
{
consumerInfo.start(executor);
}
return ringBuffer;
}
- ConsumerInfo是介面,對應的實現類有EventProcessorInfo和WorkerPoolInfo兩種,這裡應該是哪種呢?既然來源是consumerRepository,這就要看當初是怎麼存入consumerRepository的,前面在分析createEventProcessors方法時,下圖紅框中的consumerRepository.add被忽略了,現在需要進去看看:
- 進去後一目瞭然,可見ConsumerInfo的實現是EventProcessorInfo:
- 所以,回到前面對<font color="blue">consumerInfo.start(executor)</font>方法的分析,這裡要看的就是EventProcessorInfo的start方法了,如下圖,非常簡單,就是啟動一個執行緒執行eventprocessor(這個eventprocessor是BatchEventProcessor物件):
- 小結一下,disruptor.start方法涉及到<font color="red">一個重要知識點</font>:
- 啟動獨立執行緒,用來執行消費事件的業務邏輯;
消費事件的邏輯
- 為了理解訊息處理邏輯,還要重點關注BatchEventProcessor.processEvents方法,如下圖所示,其實也很簡單,就是不停的從環形佇列取出可用的事件,然後再更新自己的Sequence,相當於標記已經消費到哪裡了:
總結
最後總結Disruptor類的重要功能:
- 建立環形佇列(RingBuffer物件)
- 建立SequenceBarrier物件,用於接收ringBuffer中的可消費事件
- 建立BatchEventProcessor,負責消費事件
- 繫結BatchEventProcessor物件的異常處理類
- 呼叫ringBuffer.addGatingSequences,將消費者的Sequence傳給ringBuffer
- 啟動獨立執行緒,用來執行消費事件的業務邏輯
- 聰明的您一定會發現,本文並沒有全面分析Disruptor類的原始碼,例如after、shutdown等方法都沒有提到,確實如此,欣宸在此給您道歉了,本篇的重點是找出那些與基本功能有關程式碼,為後面的實戰提供理論指導(不用Disruptor類實現訊息生產消費的實戰),因此很多高階功能都跳過了;
理解官方流程圖
- 此時再看官方流程圖,聰明的您應該很快就能理解此圖表達的意思:每個消費者都有自己的Sequence,通過此Sequence取得自己在環形佇列中消費的位置,再通過SequenceBarrier來等待可用事件的出現,等到事件出現了就用get方法取出具體的事件,給EventHandler來處理:
後續預告
- 此時,咱們對Disruptor類已經有了比較深入的理解,接下來的文章,咱們會嘗試不用Disruptor類,僅憑著對RingBuffer物件的操作來實現以下三種功能:
- 100個事件,單個消費者消費;
- 100個事件,三個消費者,每個都獨自消費這個100個事件;
- 100個事件,三個消費者共同消費這個100個事件;
你不孤單,欣宸原創一路相伴
歡迎關注公眾號:程式設計師欣宸
微信搜尋「程式設計師欣宸」,我是欣宸,期待與您一同暢遊Java世界... https://github.com/zq2599/blog_demos
「其他文章」
- 瀏覽器上寫程式碼,4核8G微軟伺服器免費用,Codespaces真香
- Java擴充套件Nginx之三:基礎配置項
- Java擴充套件Nginx之一:你好,nginx-clojure
- JavaCV的攝像頭實戰之十四:口罩檢測
- JavaCV人臉識別三部曲之二:訓練
- JavaCV人臉識別三部曲之一:視訊中的人臉儲存為圖片
- JavaCV的攝像頭實戰之八:人臉檢測
- 超詳細的編碼實戰,讓你的springboot應用識別圖片中的行人、汽車、狗子、喵星人(JavaCV YOLO4)
- Java應用日誌如何與Jaeger的trace關聯
- Spring Cloud Gateway實戰之五:內建filter
- Spring Cloud Gateway的斷路器(CircuitBreaker)功能
- Java版流媒體編解碼和影象處理(JavaCPP FFmpeg)
- DL4J實戰之六:圖形化展示訓練過程
- 純淨Ubuntu16安裝CUDA(9.1)和cuDNN
- disruptor筆記之六:常見場景
- Spring Cloud Gateway過濾器精確控制異常返回(分析篇)
- disruptor筆記之四:事件消費知識點小結
- disruptor筆記之二:Disruptor類分析
- disruptor筆記之一:快速入門
- Spring Native實戰(暢快體驗79毫秒啟動springboot應用)