如此狂妄,自称高性能队列的Disruptor有啥来头?

语言: CN / TW / HK

并发框架Disruptor

1. Disruptor概述

1.1 背景

​ Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级),基于Disruptor开发的系统单线程能支撑每秒600万订单,2010年在QCon演讲后,获得了业界关注,2011年,企业应用软件专家Martin Fowler专门撰写长文介绍。同年它还获得了Oracle官方的Duke大奖。

​ 目前,包括Apache Storm、Camel、Log4j 2在内的很多知名项目都应用了Disruptor以获取高性能。

​ 需要特别指出的是,这里所说的队列是系统内部的内存队列,而不是Kafka这样的分布式队列。

有界无锁 高并发队列

1.2 什么是Disruptor

​ Disruptor是用于一个JVM中多个线程之间的消息队列,作用与ArrayBlockingQueue有相似之处,但是Disruptor从功能、性能都远好于ArrayBlockingQueue,当多个线程之间传递大量数据或对性能要求较高时,可以考虑使用Disruptor作为ArrayBlockingQueue的替代者。

​ 官方也对Disruptor和ArrayBlockingQueue的性能在不同的应用场景下做了对比,目测性能只有有5~10倍左右的提升。

1.3 为什么使用Disruptor

​ 传统阻塞的队列使用锁保证线程安全,而锁通过操作系统内核上下文切换实现,会暂停线程去等待锁,直到锁释放。

​ 执行这样的上下文切换,会丢失之前保存的数据和指令。由于消费者和生产者之间的速度差异,队列总是接近满或者空的状态,这种状态会导致高水平的写入争用。

1.3.1 传统队列问题

首先这里说的队列也仅限于Java内部的消息队列

队列 有界性 结构 队列类型
ArrayBlockingQueue 有界 加锁 数组 阻塞
LinkedBlockingQueue 可选 加锁 链表 阻塞
ConcurrentLinkedQueue 无界 无锁 链表 非阻塞
LinkedTransferQueue 无界 无锁 链表 阻塞
PriorityBlockingQueue 无界 加锁 阻塞
DelayQueue 无界 加锁 阻塞

1.3.2 Disruptor应用场景

参考使用到disruptor的一些框架.

1.3.2.1 log4j2

​ Log4j2异步日志使用到了disruptor, 日志一般是有缓冲区, 满了才写到文件, 增量追加文件结合NIO等应该也比较快, 所以无论是EventHandler还是WorkHandler处理应该延迟比较小的, 写的文件也不多, 所以场景是比较合适的。

1.3.2.2 Jstorm

​ 在流处理中不同线程中数据交换,数据计算可能蛮多内存中计算, 流计算快进快出,disruptor应该不错的选择。

1.3.2.3 百度uid-generator

​ 部分使用Ring buffer和去伪共享等思路缓存已生成的uid, 应该也部分参考了disruptor吧。

1.4 Disruptor 的核心概念

先从了解 Disruptor 的核心概念开始,来了解它是如何运作的。下面介绍的概念模型,既是领域对象,也是映射到代码实现上的核心对象。

1.4.1 Ring Buffer

Disruptor中的数据结构,用于存储生产者生产的数据

​ 如其名,环形的缓冲区。曾经 RingBuffer 是 Disruptor 中的最主要的对象,但从3.0版本开始,其职责被简化为仅仅负责对通过 Disruptor 进行交换的数据(事件)进行存储和更新。在一些更高级的应用场景中,Ring Buffer 可以由用户的自定义实现来完全替代。

1.4.2 Sequence

序号,在Disruptor框架中,任何地方都有序号

​ 生产者生产的数据放在RingBuffer中的哪个位置,消费者应该消费哪个位置的数据,RingBuffer中的某个位置的数据是什么,这些都是由这个序号来决定的。这个序号可以简单的理解为一个AtomicLong类型的变量。其使用了padding的方法去消除缓存的伪共享问题。

1.4.3 Sequencer

序号生成器,这个类主要是用来协调生产者的

​ 在生产者生产数据的时候,Sequencer会产生一个可用的序号(Sequence),然后生产者就就知道数据放在环形队列的那个位置了。

​ Sequencer是Disruptor的真正核心,此接口有两个实现类 SingleProducerSequencer、MultiProducerSequencer ,它们定义在生产者和消费者之间快速、正确地传递数据的并发算法。

1.4.4 Sequence Barrier

序号屏障

​ 我们都知道,消费者在消费数据的时候,需要知道消费哪个位置的数据。消费者总不能自己想取哪个数据消费,就取哪个数据消费吧。这个SequencerBarrier起到的就是这样一个“栅栏”般的阻隔作用。你消费者想消费数据,得,我告诉你一个序号(Sequence),你去消费那个位置上的数据。要是没有数据,就好好等着吧

1.4.5 Wait Strategy

Wait Strategy决定了一个消费者怎么等待生产者将事件(Event)放入Disruptor中。

​ 设想一种这样的情景:生产者生产的非常慢,而消费者消费的非常快。那么必然会出现数据不够的情况,这个时候消费者怎么进行等待呢?WaitStrategy就是为了解决问题而诞生的。

1.4.6 Event

​ 从生产者到消费者传递的数据叫做Event。它不是一个被 Disruptor 定义的特定类型,而是由 Disruptor 的使用者定义并指定。

1.4.7 EventHandler

​ Disruptor 定义的事件处理接口,由用户实现,用于处理事件,是 Consumer 的真正实现。

1.4.8 Producer

​ 即生产者,只是泛指调用 Disruptor 发布事件的用户代码,Disruptor 没有定义特定接口或类型。

1.5 Disruptor特性

​ Disruptor其实就像一个队列一样,用于在不同的线程之间迁移数据,但是Disruptor也实现了一些其他队列没有的特性,如:

  • 同一个“事件”可以有多个消费者,消费者之间既可以并行处理,也可以相互依赖形成处理的先后次序(形成一个依赖图);
  • 预分配用于存储事件内容的内存空间;
  • 针对极高的性能目标而实现的极度优化和无锁的设计;

2. Disruptor入门

我们使用一个简单的例子来体验一下Disruptor,生产者会传递一个long类型的值到消费者,消费者接受到这个值后会打印出这个值。

2.1 添加依赖

<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.4.2</version>
</dependency>

2.2 Disruptor API

Disruptor 的 API 十分简单,主要有以下几个步骤

2.2.1 定义事件

首先创建一个 LongEvent 类,这个类将会被放入环形队列中作为消息内容。

事件(Event)就是通过 Disruptor 进行交换的数据类型。

public class LongEvent {
    private long value;

    public void set(long value) {
        this.value = value;
    }

    public long getValue() {
        return value;
    }
}

2.2.2 定义事件工厂

为了使用Disruptor的内存预分配event,我们需要定义一个EventFactory

​ 事件工厂(Event Factory)定义了如何实例化前面第1步中定义的事件(Event),需要实现接口 com.lmax.disruptor.EventFactory\<T\>。

Disruptor 通过 EventFactory 在 RingBuffer 中预创建 Event 的实例。

​ 一个 Event 实例实际上被用作一个“数据槽”,发布者发布前,先从 RingBuffer 获得一个 Event 的实例,然后往 Event 实例中填充数据,之后再发布到 RingBuffer 中,之后由 Consumer 获得该 Event 实例并从中读取数据。

public class LongEventFactory implements EventFactory<LongEvent> {
    public LongEvent newInstance() {
        return new LongEvent();
    }
}

2.2.3 定义事件处理的具体实现

为了让消费者处理这些事件,所以我们这里定义一个事件处理器,负责打印event

通过实现接口 com.lmax.disruptor.EventHandler\<T\> 定义事件处理的具体实现。

public class LongEventHandler implements EventHandler<LongEvent> {
    public void onEvent(LongEvent event, long sequence, boolean endOfBatch) {
        //CommonUtils.accumulation();
        System.out.println("consumer:" + Thread.currentThread().getName() + " Event: value=" + event.getValue() + ",sequence=" + sequence);
    }
}

2.2.4 指定等待策略

Disruptor 定义了 com.lmax.disruptor.WaitStrategy 接口用于抽象 Consumer 如何等待新事件,这是策略模式的应用

WaitStrategy YIELDING_WAIT = new YieldingWaitStrategy();

2.2.5 启动 Disruptor

注意ringBufferSize的大小必须是2的N次方

// 指定事件工厂
LongEventFactory factory = new LongEventFactory();

// 指定 ring buffer字节大小, 必须是2的N次方
int bufferSize = 1024;

//单线程模式,获取额外的性能
Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory,
                                                          bufferSize, Executors.defaultThreadFactory(),
                                                          ProducerType.SINGLE,
                                                          new YieldingWaitStrategy());
//设置事件业务处理器---消费者
disruptor.handleEventsWith(new LongEventHandler());

//启动disruptor线程
disruptor.start();

2.2.6 使用Translators发布事件

在Disruptor的3.0版本中,由于加入了丰富的Lambda风格的API,可以用来帮组开发人员简化流程。所以在3.0版本后首选使用Event Publisher/Event Translator来发布事件。

public class LongEventProducerWithTranslator {
    private final RingBuffer<LongEvent> ringBuffer;

    public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    private static final EventTranslatorOneArg<LongEvent, Long> TRANSLATOR =
            new EventTranslatorOneArg<LongEvent, Long>() {
                public void translateTo(LongEvent event, long sequence, Long data) {
                    event.set(data);
                }
            };

    public void onData(Long data) {
        ringBuffer.publishEvent(TRANSLATOR, data);
    }
}

2.2.7 关闭 Disruptor

disruptor.shutdown();//关闭 disruptor,方法会堵塞,直至所有的事件都得到处理

2.3 代码整合

2.3.1 LongEventMain

消费者-生产者启动类,其依靠构造Disruptor对象,调用start()方法完成启动线程。Disruptor 需要ringbuffer环,消费者数据处理工厂,WaitStrategy等

  • ByteBuffer 类字节buffer,用于包装消息。
  • ProducerType.SINGLE为单线程 ,可以提高性能
public class LongEventMain {
    public static void main(String[] args) {
        // 指定事件工厂
        LongEventFactory factory = new LongEventFactory();

        // 指定 ring buffer字节大小, 必须是2的N次方
        int bufferSize = 1024;

        //单线程模式,获取额外的性能
        Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory,
                bufferSize, Executors.defaultThreadFactory(),
                ProducerType.SINGLE,
                new YieldingWaitStrategy());

        //设置事件业务处理器---消费者
        disruptor.handleEventsWith(new LongEventHandler());

        //启动disruptor线程
        disruptor.start();
        // 获取 ring buffer环,用于接取生产者生产的事件
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();

        //为 ring buffer指定事件生产者
        LongEventProducerWithTranslator producer = new LongEventProducerWithTranslator(ringBuffer);
        //循环遍历
        for (int i = 0; i < 100; i++) {
            //获取一个随机数
            long value = (long) ((Math.random() * 1000000) + 1);
            //发布数据
            producer.onData(value);
        }
        //停止disruptor线程
        disruptor.shutdown();
    }
}

2.3.2 运行测试

测试结果

consumer:pool-1-thread-1 Event: value=579797,sequence=0
consumer:pool-1-thread-1 Event: value=974942,sequence=1
consumer:pool-1-thread-1 Event: value=978977,sequence=2
consumer:pool-1-thread-1 Event: value=398080,sequence=3
consumer:pool-1-thread-1 Event: value=867251,sequence=4
consumer:pool-1-thread-1 Event: value=796707,sequence=5
consumer:pool-1-thread-1 Event: value=786555,sequence=6
consumer:pool-1-thread-1 Event: value=182193,sequence=7
.....

Event: value = 为消费者接收到的数据,sequence为数据在ringbuffer环的位置。

如果本文对您有帮助,欢迎 关注点赞 `,您的支持是我坚持创作的动力。

转载请注明出处!