disruptor筆記之六:常見場景

語言: CN / TW / HK

歡迎訪問我的GitHub

https://github.com/zq2599/blog_demos

內容:所有原創文章分類彙總及配套原始碼,涉及Java、Docker、Kubernetes、DevOPS等;

《disruptor筆記》系列連結

  1. 快速入門
  2. Disruptor類分析
  3. 環形佇列的基礎操作(不用Disruptor類)
  4. 事件消費知識點小結
  5. 事件消費實戰
  6. 常見場景
  7. 等待策略
  8. 知識點補充(終篇)

本篇概覽

  • 本文是《disruptor筆記》系列的第六篇,主要內容是將一些常用的消費模式做彙總,後續日常開發中如果有需要就能拿來即用;
  • 以下是常用的模式:
  • 多個消費者獨立消費,前文已實現,本篇跳過
  • 多個消費者共同消費,前文已實現,本篇跳過
  • 既有獨立消費,也有共同消費,前文已實現,本篇跳過
  • 多個生產者和多個獨立消費者:

在這裡插入圖片描述

  1. C1、C2獨立消費,C3依賴C1和C2

在這裡插入圖片描述

  1. C1獨立消費,C2和C3也獨立消費,但依賴C1,C4依賴C2和C3

在這裡插入圖片描述

  1. C1和C2獨立消費,C3和C4也是獨立消費,但C3和C4都依賴C1和C2,然後C5依賴C3和C4

在這裡插入圖片描述

  1. C1和C2共同消費,C3和C4也是共同消費,但C3和C4都依賴C1和C2,然後C5依賴C3和C4

在這裡插入圖片描述

  1. C1和C2共同消費,C3和C4獨立消費,但C3和C4都依賴C1和C2,然後C5依賴C3和C4

在這裡插入圖片描述

  1. C1和C2獨立消費,C3和C4是共同消費,但C3和C4都依賴C1和C2,然後C5依賴C3和C4

在這裡插入圖片描述

關於本篇程式碼

  • 為了省事兒,本次不會新建工程,而是直接使用前文consume-mode模組,因此,下面這些類直接就直接使用了,無需重寫程式碼:
  • 事件定義:OrderEvent
  • 事件工廠:OrderEventFactory
  • 事件生產者:OrderEventProducer
  • 用在獨立消費場景的事件消費者:MailEventHandler
  • 用在共同消費場景的事件消費者:MailWorkHandler

原始碼下載

  • 本篇實戰中的完整原始碼可在GitHub下載到,地址和連結資訊如下表所示(https://github.com/zq2599/blog_demos):

| 名稱 | 連結 | 備註| | :-------- | :----| :----| | 專案主頁| https://github.com/zq2599/blog_demos | 該專案在GitHub上的主頁 | | git倉庫地址(https)| https://github.com/zq2599/blog_demos.git | 該專案原始碼的倉庫地址,https協議 | | git倉庫地址(ssh)| [email protected]:zq2599/blog_demos.git | 該專案原始碼的倉庫地址,ssh協議 |

  • 這個git專案中有多個資料夾,本次實戰的原始碼在disruptor-tutorials資料夾下,如下圖紅框所示:

在這裡插入圖片描述

  • disruptor-tutorials是個父工程,裡面有多個module,本篇實戰的module是consume-mode,如下圖紅框所示:

在這裡插入圖片描述

多個生產者和多個獨立消費者

咱們即將實現下圖的邏輯:

在這裡插入圖片描述

  • 前面幾篇文章所有實戰的生產者都只有一個,到了本篇,為了讓consume-mode模組的程式碼能夠支援多生產者,咱們要對功能業務的抽象父類做以下兩處改動:

  • init方法原本為private型,現在為了能讓子類重此方法,將其改為protected型別;

  • 增加名為publishWithProducer2的方法,可見內部只有丟擲異常,要想其正常工作,需要子類自己來實現:

java public void publishWithProducer2(String value) throws Exception { throw new Exception("父類未實現此方法,請在子類中重寫此方法後再呼叫"); }

  • 為了實現多生產者功能,新增MultiProducerServiceImpl.java,有幾處要注意的地方稍後會提到:

```java package com.bolingcavalry.service.impl;

import com.bolingcavalry.service.*; import com.lmax.disruptor.BlockingWaitStrategy; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; import lombok.Setter; import org.springframework.scheduling.concurrent.CustomizableThreadFactory; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct;

@Service("multiProducerService") public class MultiProducerServiceImpl extends ConsumeModeService {

/**
 * 第二個生產者
 */
@Setter
protected OrderEventProducer producer2;

@PostConstruct
@Override
protected void init() {
    // 例項化
    disruptor = new Disruptor<>(new OrderEventFactory(),
            BUFFER_SIZE,
            new CustomizableThreadFactory("event-handler-"),
            // 生產型別是多生產者
            ProducerType.MULTI,
            // BlockingWaitStrategy是預設的等待策略
            new BlockingWaitStrategy());

    // 留給子類實現具體的事件消費邏輯
    disruptorOperate();

    // 啟動
    disruptor.start();

    // 第一個生產者
    setProducer(new OrderEventProducer(disruptor.getRingBuffer()));

    // 第二個生產者
    setProducer2(new OrderEventProducer(disruptor.getRingBuffer()));
}

@Override
protected void disruptorOperate() {
    // 一號消費者
    MailEventHandler c1 = new MailEventHandler(eventCountPrinter);

    // 二號消費者
    MailEventHandler c2 = new MailEventHandler(eventCountPrinter);

    // 呼叫handleEventsWithWorkerPool,表示建立的多個消費者以共同消費的模式消費
    disruptor.handleEventsWith(c1, c2);
}

@Override
public void publishWithProducer2(String value) throws Exception {
    producer2.onData(value);
}

} ```

  • 上述程式碼有以下幾處要注意:

  • 重寫父類的init方法,主要是例項化Disruptor的時候,多傳入兩個引數:ProducerType.MULTI表示生產型別是多生產者,BlockingWaitStrategy是等待策略,之前的程式碼中咱們沒有傳此引數時,預設的就是BlockingWaitStrategy

  • init方法中還執行了setProducer2方法,設定成員變數producer2
  • 重寫publishWithProducer2方法,呼叫成員變數producer2發表事件
  • 重寫disruptorOperate方法,裡面設定了兩個獨立消費者

  • 驗證上述程式碼的方式依舊是單元測試,開啟ConsumeModeServiceTest.java,新增以下程式碼,可見新增了兩個執行緒同時執行釋出事件的操作:

```java @Autowired @Qualifier("multiProducerService") ConsumeModeService multiProducerService;

@Test
public void testMultiProducerService() throws InterruptedException {
    log.info("start testMultiProducerService");
    CountDownLatch countDownLatch = new CountDownLatch(1);

    // 兩個生產者,每個生產100個事件,一共生產兩百個事件
    // 兩個獨立消費者,每人消費200個事件,因此一共消費400個事件
    int expectEventCount = EVENT_COUNT*4;

    // 告訴service,等消費到400個訊息時,就執行countDownLatch.countDown方法
    multiProducerService.setCountDown(countDownLatch, expectEventCount);

    // 啟動一個執行緒,用第一個生產者生產事件
    new Thread(() -> {
        for(int i=0;i<EVENT_COUNT;i++) {
            log.info("publich {}", i);
            multiProducerService.publish(String.valueOf(i));
        }
    }).start();

    // 再啟動一個執行緒,用第二個生產者生產事件
    new Thread(() -> {
        for(int i=0;i<EVENT_COUNT;i++) {
            log.info("publishWithProducer2 {}", i);
            try {
                multiProducerService.publishWithProducer2(String.valueOf(i));
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }).start();

    // 當前執行緒開始等待,前面的service.setCountDown方法已經告訴過service,
    // 等消費到expectEventCount個訊息時,就執行countDownLatch.countDown方法
    // 千萬注意,要呼叫await方法,而不是wait方法!
    countDownLatch.await();

    // 消費的事件總數應該等於釋出的事件數
    assertEquals(expectEventCount, multiProducerService.eventCount());
}

```

  • 測試結果如下,測試通過,符合預期:

在這裡插入圖片描述

C1、C2獨立消費,C3依賴C1和C2

  • 邏輯圖如下:

在這裡插入圖片描述

  • 實現程式碼如下,非常簡單,依賴關係用then即可實現:

```java package com.bolingcavalry.service.impl;

import com.bolingcavalry.service.ConsumeModeService; import com.bolingcavalry.service.MailEventHandler; import com.bolingcavalry.service.MailWorkHandler; import com.bolingcavalry.service.SmsEventHandler; import org.springframework.stereotype.Service;

@Service("scene5") public class Scene5 extends ConsumeModeService {

@Override
protected void disruptorOperate() {
    MailEventHandler c1 = new MailEventHandler(eventCountPrinter);
    MailEventHandler c2 = new MailEventHandler(eventCountPrinter);
    MailEventHandler c3 = new MailEventHandler(eventCountPrinter);

    disruptor
            // C1、C2獨立消費
            .handleEventsWith(c1, c2)
            // C3依賴C1和C2
            .then(c3);
}

} ```

  • 單元測試程式碼:

```java @Autowired @Qualifier("scene5") Scene5 scene5;

@Test
public void testScene5 () throws InterruptedException {
    log.info("start testScene5");
    testConsumeModeService(scene5,
            EVENT_COUNT,
            // 三個獨立消費者,一共消費300個事件
            EVENT_COUNT * 3);
}

```

  • 為了節省篇幅,測試結果就不貼了,要注意的是,每個事件都一定是C1和C2先消費過,才會被C3消費到;

C1獨立消費,C2和C3也獨立消費,但依賴C1,C4依賴C2和C3

  • 邏輯圖如下:

在這裡插入圖片描述

  • 實現程式碼如下:

```java package com.bolingcavalry.service.impl;

import com.bolingcavalry.service.ConsumeModeService; import com.bolingcavalry.service.MailEventHandler; import org.springframework.stereotype.Service;

@Service("scene6") public class Scene6 extends ConsumeModeService {

@Override
protected void disruptorOperate() {
    MailEventHandler c1 = new MailEventHandler(eventCountPrinter);
    MailEventHandler c2 = new MailEventHandler(eventCountPrinter);
    MailEventHandler c3 = new MailEventHandler(eventCountPrinter);
    MailEventHandler c4 = new MailEventHandler(eventCountPrinter);

    disruptor
            // C1
            .handleEventsWith(c1)
            // C2和C3也獨立消費
            .then(c2, c3)
            // C4依賴C2和C3
            .then(c4);
}

} ```

  • 單元測試程式碼:

```java @Autowired @Qualifier("scene6") Scene6 scene6;

@Test
public void testScene6 () throws InterruptedException {
    log.info("start testScene6");
    testConsumeModeService(scene6,
            EVENT_COUNT,
            // 四個獨立消費者,一共消費400個事件
            EVENT_COUNT * 4);
}

```

C1和C2獨立消費,C3和C4也是獨立消費,但C3和C4都依賴C1和C2,然後C5依賴C3和C4

  • 邏輯圖如下:

在這裡插入圖片描述

  • 實現程式碼如下:

```java package com.bolingcavalry.service.impl;

import com.bolingcavalry.service.ConsumeModeService; import com.bolingcavalry.service.MailEventHandler; import org.springframework.stereotype.Service;

@Service("scene7") public class Scene7 extends ConsumeModeService {

@Override
protected void disruptorOperate() {
    MailEventHandler c1 = new MailEventHandler(eventCountPrinter);
    MailEventHandler c2 = new MailEventHandler(eventCountPrinter);
    MailEventHandler c3 = new MailEventHandler(eventCountPrinter);
    MailEventHandler c4 = new MailEventHandler(eventCountPrinter);
    MailEventHandler c5 = new MailEventHandler(eventCountPrinter);

    disruptor
            // C1和C2獨立消費
            .handleEventsWith(c1, c2)
            // C3和C4也是獨立消費,但C3和C4都依賴C1和C2
            .then(c3, c4)
            // 然後C5依賴C3和C4
            .then(c5);
}

} ```

  • 單元測試程式碼:

```java @Autowired @Qualifier("scene7") Scene7 scene7;

@Test
public void testScene7 () throws InterruptedException {
    log.info("start testScene7");
    testConsumeModeService(scene7,
            EVENT_COUNT,
            // 五個獨立消費者,一共消費500個事件
            EVENT_COUNT * 5);
}

```

C1和C2共同消費,C3和C4也是共同消費,但C3和C4都依賴C1和C2,然後C5依賴C3和C4

  • 邏輯圖如下:

在這裡插入圖片描述

  • 實現程式碼如下:

```java package com.bolingcavalry.service.impl;

import com.bolingcavalry.service.ConsumeModeService; import com.bolingcavalry.service.MailEventHandler; import com.bolingcavalry.service.MailWorkHandler; import org.springframework.stereotype.Service;

/* * @author will ([email protected]) * @version 1.0 * @description: C1和C2共同消費,C3和C4也是共同消費,但C3和C4都依賴C1和C2,然後C5依賴C3和C4 * @date 2021/5/23 11:05 / @Service("scene8") public class Scene8 extends ConsumeModeService {

@Override
protected void disruptorOperate() {
    MailWorkHandler c1 = new MailWorkHandler(eventCountPrinter);
    MailWorkHandler c2 = new MailWorkHandler(eventCountPrinter);
    MailWorkHandler c3 = new MailWorkHandler(eventCountPrinter);
    MailWorkHandler c4 = new MailWorkHandler(eventCountPrinter);
    MailWorkHandler c5 = new MailWorkHandler(eventCountPrinter);

    disruptor
            // C1和C2共同消費
            .handleEventsWithWorkerPool(c1, c2)
            // C3和C4也是獨立消費,但C3和C4都依賴C1和C2
            .thenHandleEventsWithWorkerPool(c3, c4)
            // 然後C5依賴C3和C4
            .thenHandleEventsWithWorkerPool(c5);
}

} ```

  • 單元測試程式碼:

```java @Autowired @Qualifier("scene8") Scene8 scene8;

@Test
public void testScene8 () throws InterruptedException {
    log.info("start testScene8");
    testConsumeModeService(scene8,
            EVENT_COUNT,
            // C1和C2共同消費,C3和C4共同消費,C5雖然只是一個,但也是共同消費模式,
            // 也就是一共有三組消費者,所以一共消費300個事件
            EVENT_COUNT * 3);
}

```

C1和C2共同消費,C3和C4獨立消費,但C3和C4都依賴C1和C2,然後C5依賴C3和C4

  • 邏輯圖如下:

在這裡插入圖片描述

  • 實現程式碼如下:

```java package com.bolingcavalry.service.impl;

import com.bolingcavalry.service.ConsumeModeService; import com.bolingcavalry.service.MailEventHandler; import com.bolingcavalry.service.MailWorkHandler; import org.springframework.stereotype.Service;

@Service("scene9") public class Scene9 extends ConsumeModeService {

@Override
protected void disruptorOperate() {
    MailWorkHandler c1 = new MailWorkHandler(eventCountPrinter);
    MailWorkHandler c2 = new MailWorkHandler(eventCountPrinter);
    MailEventHandler c3 = new MailEventHandler(eventCountPrinter);
    MailEventHandler c4 = new MailEventHandler(eventCountPrinter);
    MailEventHandler c5 = new MailEventHandler(eventCountPrinter);

    disruptor
            // C1和C2共同消費
            .handleEventsWithWorkerPool(c1, c2)
            // C3和C4獨立消費,但C3和C4都依賴C1和C2
            .then(c3, c4)
            // 然後C5依賴C3和C4
            .then(c5);
}

} ```

  • 單元測試程式碼:

```java @Autowired @Qualifier("scene9") Scene9 scene9;

@Test
public void testScene9 () throws InterruptedException {
    log.info("start testScene9");
    testConsumeModeService(scene9,
            EVENT_COUNT,
            // C1和C2共同消費(100個事件),
            // C3和C4獨立消費(200個事件),
            // C5獨立消費(100個事件),
            // 所以一共消費400個事件
            EVENT_COUNT * 4);
}

```

C1和C2獨立消費,C3和C4是共同消費,但C3和C4都依賴C1和C2,然後C5依賴C3和C4

  • 邏輯圖如下:

在這裡插入圖片描述

  • 實現程式碼如下:

```java package com.bolingcavalry.service.impl;

import com.bolingcavalry.service.ConsumeModeService; import com.bolingcavalry.service.MailEventHandler; import com.bolingcavalry.service.MailWorkHandler; import org.springframework.stereotype.Service;

@Service("scene10") public class Scene10 extends ConsumeModeService {

@Override
protected void disruptorOperate() {
    MailEventHandler c1 = new MailEventHandler(eventCountPrinter);
    MailEventHandler c2 = new MailEventHandler(eventCountPrinter);
    MailWorkHandler c3 = new MailWorkHandler(eventCountPrinter);
    MailWorkHandler c4 = new MailWorkHandler(eventCountPrinter);
    MailEventHandler c5 = new MailEventHandler(eventCountPrinter);

    disruptor
            // C1和C2共同消費
            .handleEventsWith(c1, c2)
            // C3和C4是共同消費,但C3和C4都依賴C1和C2
            .thenHandleEventsWithWorkerPool(c3, c4)
            // 然後C5依賴C3和C4
            .then(c5);
}

} ```

  • 單元測試程式碼:

java @Test public void testScene10 () throws InterruptedException { log.info("start testScene10"); testConsumeModeService(scene10, EVENT_COUNT, // C1和C2獨立消費(200個事件), // C3和C4共同消費(100個事件), // C5獨立消費(100個事件), // 所以一共消費400個事件 EVENT_COUNT * 4); }

  • 至此,一些常見場景的程式碼已完成,希望本文能給您一些參考,幫您更得心應手的用好這個優秀的工具;

你不孤單,欣宸原創一路相伴

  1. Java系列
  2. Spring系列
  3. Docker系列
  4. kubernetes系列
  5. 資料庫+中介軟體系列
  6. DevOps系列

歡迎關注公眾號:程式設計師欣宸

微信搜尋「程式設計師欣宸」,我是欣宸,期待與您一同暢遊Java世界... https://github.com/zq2599/blog_demos