disruptor筆記之六:常見場景
歡迎訪問我的GitHub
https://github.com/zq2599/blog_demos
內容:所有原創文章分類彙總及配套原始碼,涉及Java、Docker、Kubernetes、DevOPS等;
《disruptor筆記》系列連結
本篇概覽
- 本文是《disruptor筆記》系列的第六篇,主要內容是將一些常用的消費模式做彙總,後續日常開發中如果有需要就能拿來即用;
- 以下是常用的模式:
- 多個消費者獨立消費,前文已實現,本篇跳過
- 多個消費者共同消費,前文已實現,本篇跳過
- 既有獨立消費,也有共同消費,前文已實現,本篇跳過
- 多個生產者和多個獨立消費者:
- C1、C2獨立消費,C3依賴C1和C2
- C1獨立消費,C2和C3也獨立消費,但依賴C1,C4依賴C2和C3:
- C1和C2獨立消費,C3和C4也是獨立消費,但C3和C4都依賴C1和C2,然後C5依賴C3和C4:
- C1和C2共同消費,C3和C4也是共同消費,但C3和C4都依賴C1和C2,然後C5依賴C3和C4:
- C1和C2共同消費,C3和C4獨立消費,但C3和C4都依賴C1和C2,然後C5依賴C3和C4:
- C1和C2獨立消費,C3和C4是共同消費,但C3和C4都依賴C1和C2,然後C5依賴C3和C4:
關於本篇程式碼
- 為了省事兒,本次不會新建工程,而是直接使用前文的consume-mode模組,因此,下面這些類直接就直接使用了,無需重寫程式碼:
- 事件定義:OrderEvent
- 事件工廠:OrderEventFactory
- 事件生產者:OrderEventProducer
- 用在獨立消費場景的事件消費者:MailEventHandler
- 用在共同消費場景的事件消費者:MailWorkHandler
原始碼下載
- 本篇實戰中的完整原始碼可在GitHub下載到,地址和連結資訊如下表所示(https://github.com/zq2599/blog_demos):
| 名稱 | 連結 | 備註| | :-------- | :----| :----| | 專案主頁| https://github.com/zq2599/blog_demos | 該專案在GitHub上的主頁 | | git倉庫地址(https)| https://github.com/zq2599/blog_demos.git | 該專案原始碼的倉庫地址,https協議 | | git倉庫地址(ssh)| [email protected]:zq2599/blog_demos.git | 該專案原始碼的倉庫地址,ssh協議 |
- 這個git專案中有多個資料夾,本次實戰的原始碼在disruptor-tutorials資料夾下,如下圖紅框所示:
- disruptor-tutorials是個父工程,裡面有多個module,本篇實戰的module是consume-mode,如下圖紅框所示:
多個生產者和多個獨立消費者
咱們即將實現下圖的邏輯:
-
前面幾篇文章所有實戰的生產者都只有一個,到了本篇,為了讓consume-mode模組的程式碼能夠支援多生產者,咱們要對功能業務的抽象父類做以下兩處改動:
-
init方法原本為private型,現在為了能讓子類重此方法,將其改為protected型別;
-
增加名為publishWithProducer2的方法,可見內部只有丟擲異常,要想其正常工作,需要子類自己來實現:
java
public void publishWithProducer2(String value) throws Exception {
throw new Exception("父類未實現此方法,請在子類中重寫此方法後再呼叫");
}
- 為了實現多生產者功能,新增MultiProducerServiceImpl.java,有幾處要注意的地方稍後會提到:
```java package com.bolingcavalry.service.impl;
import com.bolingcavalry.service.*; import com.lmax.disruptor.BlockingWaitStrategy; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; import lombok.Setter; import org.springframework.scheduling.concurrent.CustomizableThreadFactory; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct;
@Service("multiProducerService") public class MultiProducerServiceImpl extends ConsumeModeService {
/**
* 第二個生產者
*/
@Setter
protected OrderEventProducer producer2;
@PostConstruct
@Override
protected void init() {
// 例項化
disruptor = new Disruptor<>(new OrderEventFactory(),
BUFFER_SIZE,
new CustomizableThreadFactory("event-handler-"),
// 生產型別是多生產者
ProducerType.MULTI,
// BlockingWaitStrategy是預設的等待策略
new BlockingWaitStrategy());
// 留給子類實現具體的事件消費邏輯
disruptorOperate();
// 啟動
disruptor.start();
// 第一個生產者
setProducer(new OrderEventProducer(disruptor.getRingBuffer()));
// 第二個生產者
setProducer2(new OrderEventProducer(disruptor.getRingBuffer()));
}
@Override
protected void disruptorOperate() {
// 一號消費者
MailEventHandler c1 = new MailEventHandler(eventCountPrinter);
// 二號消費者
MailEventHandler c2 = new MailEventHandler(eventCountPrinter);
// 呼叫handleEventsWithWorkerPool,表示建立的多個消費者以共同消費的模式消費
disruptor.handleEventsWith(c1, c2);
}
@Override
public void publishWithProducer2(String value) throws Exception {
producer2.onData(value);
}
} ```
-
上述程式碼有以下幾處要注意:
-
重寫父類的init方法,主要是例項化Disruptor的時候,多傳入兩個引數:ProducerType.MULTI表示生產型別是多生產者,BlockingWaitStrategy是等待策略,之前的程式碼中咱們沒有傳此引數時,預設的就是BlockingWaitStrategy
- init方法中還執行了setProducer2方法,設定成員變數producer2
- 重寫publishWithProducer2方法,呼叫成員變數producer2發表事件
-
重寫disruptorOperate方法,裡面設定了兩個獨立消費者
-
驗證上述程式碼的方式依舊是單元測試,開啟ConsumeModeServiceTest.java,新增以下程式碼,可見新增了兩個執行緒同時執行釋出事件的操作:
```java @Autowired @Qualifier("multiProducerService") ConsumeModeService multiProducerService;
@Test
public void testMultiProducerService() throws InterruptedException {
log.info("start testMultiProducerService");
CountDownLatch countDownLatch = new CountDownLatch(1);
// 兩個生產者,每個生產100個事件,一共生產兩百個事件
// 兩個獨立消費者,每人消費200個事件,因此一共消費400個事件
int expectEventCount = EVENT_COUNT*4;
// 告訴service,等消費到400個訊息時,就執行countDownLatch.countDown方法
multiProducerService.setCountDown(countDownLatch, expectEventCount);
// 啟動一個執行緒,用第一個生產者生產事件
new Thread(() -> {
for(int i=0;i<EVENT_COUNT;i++) {
log.info("publich {}", i);
multiProducerService.publish(String.valueOf(i));
}
}).start();
// 再啟動一個執行緒,用第二個生產者生產事件
new Thread(() -> {
for(int i=0;i<EVENT_COUNT;i++) {
log.info("publishWithProducer2 {}", i);
try {
multiProducerService.publishWithProducer2(String.valueOf(i));
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
// 當前執行緒開始等待,前面的service.setCountDown方法已經告訴過service,
// 等消費到expectEventCount個訊息時,就執行countDownLatch.countDown方法
// 千萬注意,要呼叫await方法,而不是wait方法!
countDownLatch.await();
// 消費的事件總數應該等於釋出的事件數
assertEquals(expectEventCount, multiProducerService.eventCount());
}
```
- 測試結果如下,測試通過,符合預期:
C1、C2獨立消費,C3依賴C1和C2
- 邏輯圖如下:
- 實現程式碼如下,非常簡單,依賴關係用then即可實現:
```java package com.bolingcavalry.service.impl;
import com.bolingcavalry.service.ConsumeModeService; import com.bolingcavalry.service.MailEventHandler; import com.bolingcavalry.service.MailWorkHandler; import com.bolingcavalry.service.SmsEventHandler; import org.springframework.stereotype.Service;
@Service("scene5") public class Scene5 extends ConsumeModeService {
@Override
protected void disruptorOperate() {
MailEventHandler c1 = new MailEventHandler(eventCountPrinter);
MailEventHandler c2 = new MailEventHandler(eventCountPrinter);
MailEventHandler c3 = new MailEventHandler(eventCountPrinter);
disruptor
// C1、C2獨立消費
.handleEventsWith(c1, c2)
// C3依賴C1和C2
.then(c3);
}
} ```
- 單元測試程式碼:
```java @Autowired @Qualifier("scene5") Scene5 scene5;
@Test
public void testScene5 () throws InterruptedException {
log.info("start testScene5");
testConsumeModeService(scene5,
EVENT_COUNT,
// 三個獨立消費者,一共消費300個事件
EVENT_COUNT * 3);
}
```
- 為了節省篇幅,測試結果就不貼了,要注意的是,每個事件都一定是C1和C2先消費過,才會被C3消費到;
C1獨立消費,C2和C3也獨立消費,但依賴C1,C4依賴C2和C3
- 邏輯圖如下:
- 實現程式碼如下:
```java package com.bolingcavalry.service.impl;
import com.bolingcavalry.service.ConsumeModeService; import com.bolingcavalry.service.MailEventHandler; import org.springframework.stereotype.Service;
@Service("scene6") public class Scene6 extends ConsumeModeService {
@Override
protected void disruptorOperate() {
MailEventHandler c1 = new MailEventHandler(eventCountPrinter);
MailEventHandler c2 = new MailEventHandler(eventCountPrinter);
MailEventHandler c3 = new MailEventHandler(eventCountPrinter);
MailEventHandler c4 = new MailEventHandler(eventCountPrinter);
disruptor
// C1
.handleEventsWith(c1)
// C2和C3也獨立消費
.then(c2, c3)
// C4依賴C2和C3
.then(c4);
}
} ```
- 單元測試程式碼:
```java @Autowired @Qualifier("scene6") Scene6 scene6;
@Test
public void testScene6 () throws InterruptedException {
log.info("start testScene6");
testConsumeModeService(scene6,
EVENT_COUNT,
// 四個獨立消費者,一共消費400個事件
EVENT_COUNT * 4);
}
```
C1和C2獨立消費,C3和C4也是獨立消費,但C3和C4都依賴C1和C2,然後C5依賴C3和C4
- 邏輯圖如下:
- 實現程式碼如下:
```java package com.bolingcavalry.service.impl;
import com.bolingcavalry.service.ConsumeModeService; import com.bolingcavalry.service.MailEventHandler; import org.springframework.stereotype.Service;
@Service("scene7") public class Scene7 extends ConsumeModeService {
@Override
protected void disruptorOperate() {
MailEventHandler c1 = new MailEventHandler(eventCountPrinter);
MailEventHandler c2 = new MailEventHandler(eventCountPrinter);
MailEventHandler c3 = new MailEventHandler(eventCountPrinter);
MailEventHandler c4 = new MailEventHandler(eventCountPrinter);
MailEventHandler c5 = new MailEventHandler(eventCountPrinter);
disruptor
// C1和C2獨立消費
.handleEventsWith(c1, c2)
// C3和C4也是獨立消費,但C3和C4都依賴C1和C2
.then(c3, c4)
// 然後C5依賴C3和C4
.then(c5);
}
} ```
- 單元測試程式碼:
```java @Autowired @Qualifier("scene7") Scene7 scene7;
@Test
public void testScene7 () throws InterruptedException {
log.info("start testScene7");
testConsumeModeService(scene7,
EVENT_COUNT,
// 五個獨立消費者,一共消費500個事件
EVENT_COUNT * 5);
}
```
C1和C2共同消費,C3和C4也是共同消費,但C3和C4都依賴C1和C2,然後C5依賴C3和C4
- 邏輯圖如下:
- 實現程式碼如下:
```java package com.bolingcavalry.service.impl;
import com.bolingcavalry.service.ConsumeModeService; import com.bolingcavalry.service.MailEventHandler; import com.bolingcavalry.service.MailWorkHandler; import org.springframework.stereotype.Service;
/* * @author will ([email protected]) * @version 1.0 * @description: C1和C2共同消費,C3和C4也是共同消費,但C3和C4都依賴C1和C2,然後C5依賴C3和C4 * @date 2021/5/23 11:05 / @Service("scene8") public class Scene8 extends ConsumeModeService {
@Override
protected void disruptorOperate() {
MailWorkHandler c1 = new MailWorkHandler(eventCountPrinter);
MailWorkHandler c2 = new MailWorkHandler(eventCountPrinter);
MailWorkHandler c3 = new MailWorkHandler(eventCountPrinter);
MailWorkHandler c4 = new MailWorkHandler(eventCountPrinter);
MailWorkHandler c5 = new MailWorkHandler(eventCountPrinter);
disruptor
// C1和C2共同消費
.handleEventsWithWorkerPool(c1, c2)
// C3和C4也是獨立消費,但C3和C4都依賴C1和C2
.thenHandleEventsWithWorkerPool(c3, c4)
// 然後C5依賴C3和C4
.thenHandleEventsWithWorkerPool(c5);
}
} ```
- 單元測試程式碼:
```java @Autowired @Qualifier("scene8") Scene8 scene8;
@Test
public void testScene8 () throws InterruptedException {
log.info("start testScene8");
testConsumeModeService(scene8,
EVENT_COUNT,
// C1和C2共同消費,C3和C4共同消費,C5雖然只是一個,但也是共同消費模式,
// 也就是一共有三組消費者,所以一共消費300個事件
EVENT_COUNT * 3);
}
```
C1和C2共同消費,C3和C4獨立消費,但C3和C4都依賴C1和C2,然後C5依賴C3和C4
- 邏輯圖如下:
- 實現程式碼如下:
```java package com.bolingcavalry.service.impl;
import com.bolingcavalry.service.ConsumeModeService; import com.bolingcavalry.service.MailEventHandler; import com.bolingcavalry.service.MailWorkHandler; import org.springframework.stereotype.Service;
@Service("scene9") public class Scene9 extends ConsumeModeService {
@Override
protected void disruptorOperate() {
MailWorkHandler c1 = new MailWorkHandler(eventCountPrinter);
MailWorkHandler c2 = new MailWorkHandler(eventCountPrinter);
MailEventHandler c3 = new MailEventHandler(eventCountPrinter);
MailEventHandler c4 = new MailEventHandler(eventCountPrinter);
MailEventHandler c5 = new MailEventHandler(eventCountPrinter);
disruptor
// C1和C2共同消費
.handleEventsWithWorkerPool(c1, c2)
// C3和C4獨立消費,但C3和C4都依賴C1和C2
.then(c3, c4)
// 然後C5依賴C3和C4
.then(c5);
}
} ```
- 單元測試程式碼:
```java @Autowired @Qualifier("scene9") Scene9 scene9;
@Test
public void testScene9 () throws InterruptedException {
log.info("start testScene9");
testConsumeModeService(scene9,
EVENT_COUNT,
// C1和C2共同消費(100個事件),
// C3和C4獨立消費(200個事件),
// C5獨立消費(100個事件),
// 所以一共消費400個事件
EVENT_COUNT * 4);
}
```
C1和C2獨立消費,C3和C4是共同消費,但C3和C4都依賴C1和C2,然後C5依賴C3和C4
- 邏輯圖如下:
- 實現程式碼如下:
```java package com.bolingcavalry.service.impl;
import com.bolingcavalry.service.ConsumeModeService; import com.bolingcavalry.service.MailEventHandler; import com.bolingcavalry.service.MailWorkHandler; import org.springframework.stereotype.Service;
@Service("scene10") public class Scene10 extends ConsumeModeService {
@Override
protected void disruptorOperate() {
MailEventHandler c1 = new MailEventHandler(eventCountPrinter);
MailEventHandler c2 = new MailEventHandler(eventCountPrinter);
MailWorkHandler c3 = new MailWorkHandler(eventCountPrinter);
MailWorkHandler c4 = new MailWorkHandler(eventCountPrinter);
MailEventHandler c5 = new MailEventHandler(eventCountPrinter);
disruptor
// C1和C2共同消費
.handleEventsWith(c1, c2)
// C3和C4是共同消費,但C3和C4都依賴C1和C2
.thenHandleEventsWithWorkerPool(c3, c4)
// 然後C5依賴C3和C4
.then(c5);
}
} ```
- 單元測試程式碼:
java
@Test
public void testScene10 () throws InterruptedException {
log.info("start testScene10");
testConsumeModeService(scene10,
EVENT_COUNT,
// C1和C2獨立消費(200個事件),
// C3和C4共同消費(100個事件),
// C5獨立消費(100個事件),
// 所以一共消費400個事件
EVENT_COUNT * 4);
}
- 至此,一些常見場景的程式碼已完成,希望本文能給您一些參考,幫您更得心應手的用好這個優秀的工具;
你不孤單,欣宸原創一路相伴
歡迎關注公眾號:程式設計師欣宸
微信搜尋「程式設計師欣宸」,我是欣宸,期待與您一同暢遊Java世界... https://github.com/zq2599/blog_demos
- 瀏覽器上寫程式碼,4核8G微軟伺服器免費用,Codespaces真香
- Java擴充套件Nginx之三:基礎配置項
- Java擴充套件Nginx之一:你好,nginx-clojure
- JavaCV的攝像頭實戰之十四:口罩檢測
- JavaCV人臉識別三部曲之二:訓練
- JavaCV人臉識別三部曲之一:視訊中的人臉儲存為圖片
- JavaCV的攝像頭實戰之八:人臉檢測
- 超詳細的編碼實戰,讓你的springboot應用識別圖片中的行人、汽車、狗子、喵星人(JavaCV YOLO4)
- Java應用日誌如何與Jaeger的trace關聯
- Spring Cloud Gateway實戰之五:內建filter
- Spring Cloud Gateway的斷路器(CircuitBreaker)功能
- Java版流媒體編解碼和影象處理(JavaCPP FFmpeg)
- DL4J實戰之六:圖形化展示訓練過程
- 純淨Ubuntu16安裝CUDA(9.1)和cuDNN
- disruptor筆記之六:常見場景
- Spring Cloud Gateway過濾器精確控制異常返回(分析篇)
- disruptor筆記之四:事件消費知識點小結
- disruptor筆記之二:Disruptor類分析
- disruptor筆記之一:快速入門
- Spring Native實戰(暢快體驗79毫秒啟動springboot應用)