Disruptor分析-创新互联

什么是 Disruptor?

Disruptor是一个高性能的异步处理框架,或者可以认为是最快的消息框架(轻量的JMS),也可以认为是一个观察者模式的实现,或者事件监听模式的实现

目前创新互联建站已为上千的企业提供了网站建设、域名、网站空间、网站托管、服务器托管、企业网站设计、闵行网站维护等服务,公司将坚持客户导向、应用为本的策略,正道将秉承"和谐、参与、激情"的文化,与客户和合作伙伴齐心协力一起成长,共同发展。

性能远远高于传统的BlockingQueue容器

Disruptor使用观察者模式,主动将消息发送给消费者,而不是等消费者从队列中取,在无锁的情况下, 实现queue(环形, RingBuffer)的并发操作, 性能远高于BlockingQueue

Disruptor 的设计思想

环形数组结构

为了避免垃圾回收,使用数组,数组对处理器的缓存机制更加友好

数组长度为 2^n,通过位运算,加快定位速度,下标采用递增的方式,不用担心索引溢出

无锁设计

每个生产者或者消费者线程,会先申请可以操作的元素在数组中的位置,申请到之后,直接在该位置写入或者读取数据

Disruptor 实现生产消费模型

pom


    com.lmax
    disruptor
    3.2.1

LongEvent

// 声明一个Event来包含需要传递的数据
public class LongEvent {
    private Long value;

    public Long getValue() {
        return value;
    }

    public void setValue(Long value) {
        this.value = value;
    }
}

LongEventFactory

// Event工厂
public class LongEventFactory implements EventFactory {

    public LongEvent newInstance() {
        return new LongEvent();
    }
}

LongEventHandler

// 事件消费者
public class LongEventHandler implements EventHandler {

    public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {
        System.out.println("消费者:"+event.getValue());
    }
}

LongEventProducer

public class LongEventProducer {
    private RingBuffer ringBuffer;

    public LongEventProducer(RingBuffer ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    public void onData(ByteBuffer byteBuffer) {
        // 获取事件队列下标位置
        long sequence = ringBuffer.next();
        try {
            // 取出空队列
            LongEvent longEvent = ringBuffer.get(sequence);
            // 赋值
            longEvent.setValue(byteBuffer.getLong(0));
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            System.out.println("生产者发送数据。。。");
            // 发送数据
            ringBuffer.publish(sequence);
        }
    }
}

Main

public class Main {
    public static void main(String[] args) {
        // 创建可缓存线程池
        ExecutorService executorService = Executors.newCachedThreadPool();
        // 创建工厂
        EventFactory eventFactory = new LongEventFactory();
        // 创建ringBufferSize
        int ringBufferSize = 1024 * 1024;
        // 创建disruptor
        // MULTI表示可以多个生产者
        Disruptor longEventDisruptor = new Disruptor(eventFactory, ringBufferSize, executorService, ProducerType.MULTI, new YieldingWaitStrategy());
        // 注册消费者
        longEventDisruptor.handleEventsWith(new LongEventHandler());
        // 启动
        longEventDisruptor.start();
        // 创建RingBuffer容器
        RingBuffer ringBuffer = longEventDisruptor.getRingBuffer();
        // 创建生产者
        LongEventProducer longEventProducer = new LongEventProducer(ringBuffer);
        // 指定缓冲区大小
        ByteBuffer byteBuffer = ByteBuffer.allocate(8);
        for (int i = 0; i < 100; i++) {
            byteBuffer.putLong(0, i);
            longEventProducer.onData(byteBuffer);
        }
        executorService.shutdown();
        longEventDisruptor.shutdown();
    }
}

什么是 RingBuffer

它是一个环(首尾相接的环),作用是存储数据,实现不同线程之间的数据传输

Disruptor 分析

RingBuffer 每块区是拥有一个序号的,这个序号指向环形数组结构的下一个可用元素

Disruptor 分析

随着不断地写进了填充这个圆环,这个指针序号会不断地递增,直到绕过这个环

Disruptor 分析

如果圆环满了,它会将金数据覆盖,如上图:现在12的区域的下个区域目前是3,如果有新的数据到来,那么指针往下移的时候就会把区域3的数据给覆盖变成13,框架提供了一系列帮助我们平行消费的监控,会很好的控制生产者和消费者之间的速度,从而达到生产和消费之间的平衡

RingBuffer 为什么效率高?

采用数组,数组支持索引访问

数组的内存分配是预先加载的,一但指定大小创建后,就一直存在,这也意味着不需要花大量的时间做垃圾回收,而阻塞队列采用链表实现,需要不断的删除、创建节点

Disruptor的核心概念

  • RingBuffer:Disruptor 底层数据结构实现,核心类,是线程间交换数据的中转地
  • Sequence:序号,声明一个序号,用于跟踪 ringbuffer 中任务的变化和消费者的消费情况

  • Sequencer:生产者与缓存 RingBuffer 之间的桥梁,单生产者与多生产者分别对应于两个实现SingleProducerSequencer 与 MultiProducerSequencer,Sequencer 用于向 RingBuffer 申请空间,使用 publish 方法通过 waitStrategy 通知所有在等待可消费事件的 SequenceBarrie
  • SequenceBarrier:序号栅栏,管理和协调生产者的游标序号和各个消费者的序号,确保生产者不会覆盖消费者未来得及处理的消息,确保存在依赖的消费者之间能够按照正确的顺序处理
  • WaitStrategy:有多种实现,用以表示当无可消费事件时,消费者的等待策略

  • Event:消费事件

  • EventProcessor:事件处理器,监听 RingBuffer 的事件,并消费可用事件,从 RingBuffer 读取的事件会交由实际的生产者实现类来消费,它会一直侦听下一个可用的序号,直到该序号对应的事件已经准备好

  • EventHandler:业务处理器,是实际消费者的接口,完成具体的业务逻辑实现,第三方实现该接口,代表着消费者

  • Producer:生产者接口,第三方线程充当该角色,producer 向 RingBuffer 写入事件

另外有需要云服务器可以了解下创新互联scvps.cn,海内外云服务器15元起步,三天无理由+7*72小时售后在线,公司持有idc许可证,提供“云服务器、裸金属服务器、高防服务器、香港服务器、美国服务器、虚拟主机、免备案服务器”等云主机租用服务以及企业上云的综合解决方案,具有“安全稳定、简单易用、服务可用性高、性价比高”等特点与优势,专为企业上云打造定制,能够满足用户丰富、多元化的应用场景需求。


网站栏目:Disruptor分析-创新互联
文章源于:http://scyanting.com/article/csseih.html