disruptor筆記之一:快速入門

語言: CN / TW / HK

歡迎訪問我的GitHub

https://github.com/zq2599/blog_demos

內容:所有原創文章分類彙總及配套原始碼,涉及Java、Docker、Kubernetes、DevOPS等;

關於disruptor

disruptor是LMAX公司開發的一個高效能佇列,其作用和阻塞佇列(BlockingQueue)類似,都是在相同程序內、不同執行緒間傳遞資料(例如訊息、事件),另外disruptor也有自己的一些特色:

  1. 以廣播的形式釋出事件,並且消費者之間存在依賴關係;
  2. 為事件提前分配記憶體;
  3. 無鎖演算法;

關於Ring Buffer(環形佇列)

  • 提到disruptor一般都會提到Ring Buffer(環形佇列)是它的特點,實際上從3.0版本之後,環形佇列只是用來儲存和更新事件資料,在其他更復雜的場景下,使用者可以通過自定義操作將其替換掉;

在這裡插入圖片描述

  • 簡單的說,disruptor官方認為Ring Buffe是核心概念(Core Concepts),但不是特色( key features)

本篇概覽

作為《disruptor筆記》系列的開篇,本篇有兩個任務:

  • 建立名為<font color="blue">disruptor-tutorials</font>的gradle工程,作為整個系列的父工程,該系列所有程式碼都是這個父工程下的module;

  • 在<font color="blue">disruptor-tutorials</font>下面新建名為<font color="red">basic-event</font>的module,這是個springboot應用,作用是使用disruptor的基本功能:一個執行緒釋出事件,另一個執行緒消費事件,也就是對環形佇列最基本的操作,如下圖:

在這裡插入圖片描述

用disruptor實現訊息的釋出和消費的套路

  • 咱們提前小結用disruptor實現訊息的釋出和消費的套路,後面的開發按部就班即可,括號中是本篇對應的java類:
  1. 事件的定義:一個普通的bean(StringEvent.java)
  2. 事件工廠:定義如何生產事件的記憶體例項,這個例項剛從記憶體中建立,還沒有任何業務資料(StringEventFactory.java)
  3. 事件處理:封裝了消費單個事件的具體邏輯(StringEventHandler.java)
  4. 事件生產者:定義瞭如何將業務資料設定到還沒有業務資料的事件中,就是工廠創建出來的那種(StringEventProducer.java)
  5. 初始化邏輯:建立和啟動disruptor物件,將事件工廠傳給disruptor,建立事件生產者和事件處理物件,並分別與disruptor物件關聯;
  6. 業務邏輯:也就是呼叫事件生產者的<font color="blue">onData</font>方法釋出事件,本文的做法是在單元測試類中釋出事件,然後檢查消費的事件數和生產的事件數是否一致; 7

環境資訊

《Disruptor筆記》系列涉及的環境資訊如下:

  1. 作業系統:64位win10
  2. JDK:1.8.0_281
  3. IDE:IntelliJ IDEA 2021.1.1 (Ultimate Edition)
  4. gradle:6.7.1
  5. springboot:2.3.8.RELEASE
  6. disruptor:3.4.4

原始碼下載

名稱 連結 備註
專案主頁 https://github.com/zq2599/blog_demos 該專案在GitHub上的主頁
git倉庫地址(https) https://github.com/zq2599/blog_demos.git 該專案原始碼的倉庫地址,https協議
git倉庫地址(ssh) [email protected]:zq2599/blog_demos.git 該專案原始碼的倉庫地址,ssh協議
  • 這個git專案中有多個資料夾,本次實戰的原始碼在<font color="blue">disruptor-tutorials</font>資料夾下,如下圖紅框所示:

在這裡插入圖片描述

建立父工程

  • 因為是系列文章,所以這裡做個父工程來管理所有依賴庫和外掛,新建名為<font color="blue">disruptor-tutorials</font>的gradle工程,build.gradle如下:
import java.time.OffsetDateTime
import java.time.format.DateTimeFormatter

buildscript {
    repositories {
        maven {
            url 'https://plugins.gradle.org/m2/'
        }
        // 如果有私服就在此配置,如果沒有請註釋掉
        maven {
            url 'http://192.168.50.43:8081/repository/aliyun-proxy/'
        }
        // 阿里雲
        maven {
            url 'http://maven.aliyun.com/nexus/content/groups/public/'
        }

        mavenCentral()
    }
    ext {
        // 專案版本
        projectVersion = '1.0-SNAPSHOT'

        // sprignboot版本 https://github.com/spring-projects/spring-boot/releases
        springBootVersion = '2.3.8.RELEASE'
    }
}

plugins {
    id 'java'
    id 'java-library'
    id 'org.springframework.boot' version "${springBootVersion}" apply false
    id 'io.spring.dependency-management' version '1.0.11.RELEASE'
    id 'net.nemerosa.versioning' version '2.14.0'
    id 'io.franzbecker.gradle-lombok' version '4.0.0' apply false
    id 'com.github.ben-manes.versions' version '0.36.0' // gradle dependencyUpdates
}

// If you attempt to build without the `--scan` parameter in `gradle 6.0+` it will cause a build error that it can't find
// a buildScan property to change. This avoids that problem.
if (hasProperty('buildScan')) {
    buildScan {
        termsOfServiceUrl = 'https://gradle.com/terms-of-service'
        termsOfServiceAgree = 'yes'
    }
}

wrapper {
    gradleVersion = '6.7.1'
}

def buildTimeAndDate = OffsetDateTime.now()

ext {
    // 構建時取得當前日期和時間
    buildDate = DateTimeFormatter.ISO_LOCAL_DATE.format(buildTimeAndDate)
    buildTime = DateTimeFormatter.ofPattern('HH:mm:ss.SSSZ').format(buildTimeAndDate)
    buildRevision = versioning.info.commit
}

allprojects {
    apply plugin: 'java'
    apply plugin: 'idea'
    apply plugin: 'eclipse'
    apply plugin: 'io.spring.dependency-management'
    apply plugin: 'io.franzbecker.gradle-lombok'

    compileJava {
        sourceCompatibility = JavaVersion.VERSION_1_8
        targetCompatibility = JavaVersion.VERSION_1_8
        options.encoding = 'UTF-8'
    }

    compileJava.options*.compilerArgs = [
            '-Xlint:all', '-Xlint:-processing'
    ]

    // Copy LICENSE
    tasks.withType(Jar) {
        from(project.rootDir) {
            include 'LICENSE'
            into 'META-INF'
        }
    }

    // 寫入到MANIFEST.MF中的內容
    jar {
        manifest {
            attributes(
                    'Created-By': "${System.properties['java.version']} (${System.properties['java.vendor']} ${System.properties['java.vm.version']})".toString(),
                    'Built-By': 'travis',
                    'Build-Date': buildDate,
                    'Build-Time': buildTime,
                    'Built-OS': "${System.properties['os.name']}",
                    'Build-Revision': buildRevision,
                    'Specification-Title': project.name,
                    'Specification-Version': projectVersion,
                    'Specification-Vendor': 'Will Zhao',
                    'Implementation-Title': project.name,
                    'Implementation-Version': projectVersion,
                    'Implementation-Vendor': 'Will Zhao'
            )
        }
    }

    repositories {
        mavenCentral()

        // 如果有私服就在此配置,如果沒有請註釋掉
        maven {
            url 'http://192.168.50.43:8081/repository/aliyun-proxy/'
        }

        // 阿里雲
        maven {
            url 'http://maven.aliyun.com/nexus/content/groups/public/'
        }

        jcenter()
    }

    buildscript {
        repositories {
            maven { url 'https://plugins.gradle.org/m2/' }
        }
    }
}

allprojects { project ->
    buildscript {
        dependencyManagement {
            imports {
                mavenBom "org.springframework.boot:spring-boot-starter-parent:${springBootVersion}"
                mavenBom "org.junit:junit-bom:5.7.0"
            }

            dependencies {
                dependency 'org.projectlombok:lombok:1.16.16'
                dependency 'org.apache.commons:commons-lang3:3.11'
                dependency 'commons-collections:commons-collections:3.2.2'
                dependency 'com.lmax:disruptor:3.4.4'
            }
        }

        ext {
            springFrameworkVersion = dependencyManagement.importedProperties['spring-framework.version']
        }
    }
}

group = 'bolingcavalry'
version = projectVersion
  • 接下來編寫訊息釋出和消費的程式碼;

新建module

  • 前面新建了整個《Disruptor筆記》系列的父工程,現在新建名為<font color="blue">basic-event</font>的module,其build.gradle內容如下:
plugins {
    id 'org.springframework.boot'
}

dependencies {
    implementation 'org.projectlombok:lombok'
    implementation 'org.springframework.boot:spring-boot-starter'
    implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation 'com.lmax:disruptor'

    testImplementation('org.springframework.boot:spring-boot-starter-test')
}
  • 這個module是個springboot應用,啟動類如下:
package com.bolingcavalry;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class BasicEventApplication {
	public static void main(String[] args) {
		SpringApplication.run(BasicEventApplication.class, args);
	}
}
  • 接下來按照前面總結的套路行事;

事件的定義

  • 事件定義類StringEvent.java,可見就是個普普通通的java bean:
package com.bolingcavalry.service;

import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;

@Data
@ToString
@NoArgsConstructor
public class StringEvent {

    private String value;
}

事件工廠

  • 事件工廠的作用,是讓disruptor知道如何在記憶體中建立一個事件例項,不過,該例項和業務還沒有任何關係,本篇的事件工廠如下,可見就是建立StringEvent例項,並沒有特別的操作:
package com.bolingcavalry.service;

import com.lmax.disruptor.EventFactory;

public class StringEventFactory implements EventFactory<StringEvent> {

    @Override
    public StringEvent newInstance() {
        return new StringEvent();
    }
}

事件處理

  • 時間處理類的作用是定義一個事件如何被消費,裡面是具體的業務程式碼,每個事件都會執行此類的onEvent方法;
  • 本篇的事件處理類做的事情是列印事件內容,再用sleep消耗100毫秒,然後再呼叫外部傳入的Consumer實現類的accept方法:
package com.bolingcavalry.service;

import com.lmax.disruptor.EventHandler;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import java.util.function.Consumer;

@Slf4j
public class StringEventHandler implements EventHandler<StringEvent> {

    public StringEventHandler(Consumer<?> consumer) {
        this.consumer = consumer;
    }

    // 外部可以傳入Consumer實現類,每處理一條訊息的時候,consumer的accept方法就會被執行一次
    private Consumer<?> consumer;

    @Override
    public void onEvent(StringEvent event, long sequence, boolean endOfBatch) throws Exception {
        log.info("sequence [{}], endOfBatch [{}], event : {}", sequence, endOfBatch, event);
        
        // 這裡延時100ms,模擬消費事件的邏輯的耗時
        Thread.sleep(100);

        // 如果外部傳入了consumer,就要執行一次accept方法
        if (null!=consumer) {
            consumer.accept(null);
        }
    }
}

事件生產者

  • 每當業務要生產一個事件時,就會呼叫<font color="blue">事件生產者</font>的onData方法,將業務資料作為入參傳進來,此時生產者會從環形佇列中取出一個事件例項(就是前面的事件工廠建立的),把業務資料傳給這個例項,再把例項正式釋出出去:
package com.bolingcavalry.service;

import com.lmax.disruptor.RingBuffer;

public class StringEventProducer {
    // 儲存資料的環形佇列
    private final RingBuffer<StringEvent> ringBuffer;

    public StringEventProducer(RingBuffer<StringEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    public void onData(String content) {
        // ringBuffer是個佇列,其next方法返回的是下最後一條記錄之後的位置,這是個可用位置
        long sequence = ringBuffer.next();

        try {
            // sequence位置取出的事件是空事件
            StringEvent stringEvent = ringBuffer.get(sequence);
            // 空事件新增業務資訊
            stringEvent.setValue(content);
        } finally {
            // 釋出
            ringBuffer.publish(sequence);
        }
    }
}

初始化邏輯

  • 開發一個spring bean,這裡面有disruptor的初始化邏輯,有幾處需要關注的地方稍後會說到:
package com.bolingcavalry.service.impl;

import com.bolingcavalry.service.*;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.util.DaemonThreadFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.time.LocalDateTime;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

@Service
@Slf4j
public class BasicEventServiceImpl implements BasicEventService {

    private static final int BUFFER_SIZE = 16;

    private Disruptor<StringEvent> disruptor;

    private StringEventProducer producer;

    /**
     * 統計訊息總數
     */
    private final AtomicLong eventCount = new AtomicLong();

    @PostConstruct
    private void init() {
        Executor executor = Executors.newCachedThreadPool();

        // 例項化
        disruptor = new Disruptor<>(new StringEventFactory(),
                BUFFER_SIZE,
                new CustomizableThreadFactory("event-handler-"));

        // 準備一個匿名類,傳給disruptor的事件處理類,
        // 這樣每次處理事件時,都會將已經處理事件的總數打印出來
        Consumer<?> eventCountPrinter = new Consumer<Object>() {
            @Override
            public void accept(Object o) {
                long count = eventCount.incrementAndGet();
                log.info("receive [{}] event", count);
            }
        };

        // 指定處理類
        disruptor.handleEventsWith(new StringEventHandler(eventCountPrinter));

        // 啟動
        disruptor.start();

        // 生產者
        producer = new StringEventProducer(disruptor.getRingBuffer());
    }

    @Override
    public void publish(String value) {
        producer.onData(value);
    }

    @Override
    public long eventCount() {
        return eventCount.get();
    }
}
  • 上述程式碼有以下幾點需要注意:
  1. publish方法給外部呼叫,用於釋出一個事件;
  2. eventCountPrinter是Consumer的實現類,被傳給了StringEventHandler,這樣StringEventHandler消費訊息的時候,eventCount就會增加,也就記下了已經處理的事件總數;
  3. Disruptor的構造方法中,BUFFER_SIZE表示環形佇列的大小,這裡故意設定為16,這樣可以輕易的將環形佇列填滿,此時再發布事件會不會導致環形佇列上的資料被覆蓋呢?稍後咱們可以測一下;
  4. 記得呼叫start方法;

web介面

再寫一個web介面類,這樣就可以通過瀏覽器驗證前面的程式碼了:

package com.bolingcavalry.controller;

import com.bolingcavalry.service.BasicEventService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.*;

import java.time.LocalDateTime;

@RestController
public class BasicEventController {

    @Autowired
    BasicEventService basicEventService;

    @RequestMapping(value = "/{value}", method = RequestMethod.GET)
    public String publish(@PathVariable("value") String value) {
        basicEventService.publish(value);
        return "success, " + LocalDateTime.now().toString();
    }
}

業務邏輯

  • 現在生產事件的介面已準備好,消費事件的程式碼也完成了,接下來就是如何呼叫生產事件的介面來驗證生產和消費是否正常,這裡我選擇使用單元測試來驗證;

  • 在<font color="blue">disruptor-tutorials\basic-event\src\test\java</font>目錄下新增測試類BasicEventServiceImplTest.java,測試邏輯是釋出了一百個事件,再驗證消費事件的數量是否也等於一百:

package com.bolingcavalry.service.impl;

import com.bolingcavalry.service.BasicEventService;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import static org.junit.Assert.assertEquals;

@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
public class BasicEventServiceImplTest {

    @Autowired
    BasicEventService basicEventService;

    @Test
    public void publish() throws InterruptedException {
        log.info("start publich test");

        int count = 100;

        for(int i=0;i<count;i++) {
            log.info("publich {}", i);
            basicEventService.publish(String.valueOf(i));
        }

        // 非同步消費,因此需要延時等待
        Thread.sleep(1000);
        // 消費的事件總數應該等於釋出的事件數
        assertEquals(count, basicEventService.eventCount());
    }
}
  • 編碼完成後,點選下圖紅框1中的按鈕執行單元測試,結果如紅框2所示,測試通過:

在這裡插入圖片描述

  • 聰明的您可能會產生疑惑:環形陣列大小隻有16,消費一個事件耗時很長(100毫秒),那麼環形陣列中的事件還未消費完時如果還在釋出事件會發生什麼呢?新事件會覆蓋未消費的事件嗎?顯然不會,因為測試結果是通過的,那麼disruptor是怎麼做到的呢?其實從日誌上可以看出一些端倪,下圖是測試過程中日誌的末尾部分,紅框顯示,一直到測試快結束,釋出事件的執行緒還在執行釋出操作,這就意味著:如果消費速度過慢導致環形佇列裡放不進新的事件時,釋出事件的執行緒就會阻塞,知道環形佇列中可以放入事件為止:

在這裡插入圖片描述

  • 至此,disrupor的入門操作就完成了,咱們已經體驗過基本的釋出和消費功能,接下來的文章咱們會繼續深入學習其他更強大的功能;

你不孤單,欣宸原創一路相伴

  1. Java系列
  2. Spring系列
  3. Docker系列
  4. kubernetes系列
  5. 資料庫+中介軟體系列
  6. DevOps系列

歡迎關注公眾號:程式設計師欣宸

微信搜尋「程式設計師欣宸」,我是欣宸,期待與您一同暢遊Java世界... https://github.com/zq2599/blog_demos