disruptor实现细节及源码分析
disruptor实现细节及源码分析
一、 背景介绍
Disruptor它是一个开源的并发框架,并获得 2011 Duke’s 程序框架创新奖,能够在无锁的情况下实现网络的Queue并发操作。
说明:下文所有内容基于disruptor3.34版本。
二、 应用场景
在消费者--生产者模式中或发布订阅模式中使用。
具有以下特点:
无锁的设计及CAS式的原子访问。
预分配存储空间,避免垃圾回收带来的资源消耗。
三、 核心对象
RingBuffer:环形的一个数据结构,对象初始化时,会使用事件Event进行填充。Buffer的大小必须是2的幂次方,方便移位操作。
Event:无指定具体接口,用户自己实现,可以携带任何业务数据。
EventFactory:产生事件Event的工厂,由用户自己实现。
EventTranslator:事件发布的回调接口,由用户实现,负责将业务参数设置到事件中。
Sequencer:序列产生器,也是协调生产者和消费者及实现高并发的核心。有MultiProducerSequencer 和 SingleProducerSequencer两个实现类。
SequenceBarrier:拥有RingBuffer的发布事件Sequence引用和消费者依赖的Sequence引用。决定消费者消费可消费的Sequence。
EventHandler:事件的处理者,由用户自己实现。
EventProcessor:事件的处理器,单独在一个线程中运行。
WorkHandler:事件的处理者,由用户自己实现。
WorkProcessor:事件的处理器,单独在一个线程中运行。
WorkerPool:一组WorkProcessor的处理。
WaitStrategy:在消费者比生产者快时,消费者处理器的等待策略。
四、 简单示例
定义业务数据类:
定义事件类:
定义事件处理类:
定义事件工厂类:
定义事件发布辅助类:
主类:
五、 实现原理及源码分析
RingBuffer的实现:
封装了一个对象数组,RingBuffer实例化时,用Event填充。生产者和消费者通过对序列(long的原子操作封装)取模计算获取对象数组中Event。
public E get(long sequence){return elementAt(sequence); }protected final E elementAt(long sequence){return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT));}
单个生产者的实现:
保存有所有消费者当前消费的前一个序列值,在取下一个要发布的序列时,检查要发布的序列是否覆盖所有消费者正在处理的最小序列。如果未覆盖,则获取可发布的游标值,如果覆盖(说明缓存已经满了),则自旋等待,直到可以发布。发布事件时则先发布,后指定当前游标为发布的序列值。
public long next(int n){if (n < 1){thrownew IllegalArgumentException("n must be > 0");}//当前生产者发布的的最大序列long nextValue = this.nextValue;long nextSequence = nextValue + n;//要发布的最大序列long wrapPoint = nextSequence - bufferSize;//覆盖点long cachedGatingSequence = this.cachedValue;//消费者中处理序列最小的前一个序列//缓存已满 或者处理器处理异常时if (wrapPoint > cachedGatingSequence ||cachedGatingSequence > nextValue){long minSequence;//等待直到有可用的缓存while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences,nextValue))){LockSupport.parkNanos(1L);// TODO: Use waitStrategy to spin?}this.cachedValue = minSequence;}//更新当前生产者发布的的最大序列this.nextValue = nextSequence;return nextSequence;}
多个生产者的实现:
保存有所有消费者当前消费的前一个序列值,并维护一个和RingBuffer一样大小的数组,在取下一个要发布的序列时,检查要发布的序列是否覆盖所有消费者正在处理的最小序列。如果未覆盖,则先发布,后指定当前游标为发布的序列值,如果未覆盖,则获取可发布的游标值,如果覆盖(说明缓存已经满了),则自旋等待,直到可以发布。一个生产者获取可发布的序列后,立即更新当前游标。发布事件时生产者每发布一个序列,则记录到数组指定位置。
public long next(int n){if (n < 1){thrownew IllegalArgumentException("n must be > 0");}long current;long next;do{//当前游标current = cursor.get();//要发布的游标next = current + n;//覆盖点long wrapPoint = next - bufferSize;//消费者中处理序列最小的前一个序列long cachedGatingSequence = gatingSequenceCache.get();//缓存已满 或者处理器处理异常时if (wrapPoint > cachedGatingSequence ||cachedGatingSequence > current){long gatingSequence = Util.getMinimumSequence(gatingSequences, current);if (wrapPoint > gatingSequence){LockSupport.parkNanos(1);// TODO, should we spin based on the waitstrategy?//缓存满时,继续再次尝试continue;}//更新当前生产者发布的的最大序列gatingSequenceCache.set(gatingSequence);}elseif (cursor.compareAndSet(current, next)){//成功获取到发布序列并设置当前游标成功时跳出循环break;}}while (true);return next;}消费者的实现:
消费者保持一个自己的序列,每次累加后nextSequence,去获取可访问的最大序列。对于一个生产者,就是nextSequence到RingBuffer当前游标的序列。对于多个生产者,就是nextSequence到RingBuffer当前游标之间,最大的连续的序列集。
public long waitFor(finallong sequence)throws AlertException,InterruptedException, TimeoutException{checkAlert();//获取最大的可消费的序列,依赖等待策略long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);if (availableSequence < sequence){return availableSequence;}return sequencer.getHighestPublishedSequence(sequence,availableSequence);}一个生产者:
public long getHighestPublishedSequence(long lowerBound, long availableSequence){// 返回最大序列availableSequencereturn availableSequence; }多个生产者:
public boolean isAvailable(long sequence){int index = calculateIndex(sequence);int flag = calculateAvailabilityFlag(sequence);long bufferAddress = (index * SCALE) + BASE;//相应位置上的值相等,说明已经发布该序列returnUNSAFE.getIntVolatile(availableBuffer,bufferAddress) == flag;}@Overridepublic long getHighestPublishedSequence(long lowerBound, long availableSequence){//从数组中找出未发布序列,即由小到大连续的发布序列for (long sequence = lowerBound; sequence <= availableSequence;sequence++){if (!isAvailable(sequence)){//返回未发布序列的前一个序列return sequence - 1;}}return availableSequence;}等待策略:
消费者在缓存中没有可以消费的事件时,采取的等待策略:
BlockingWaitStrategy:通过线程阻塞的方式,等待生产者唤醒
BusySpinWaitStrategy:线程一直自旋等待,比较耗CPU。
LiteBlockingWaitStrategy:通过线程阻塞的方式,等待生产者唤醒,比BlockingWaitStrategy要轻,某些情况下可以减少阻塞的次数。
PhasedBackoffWaitStrategy:根据指定的时间段参数和指定的等待策略决定采用哪种等待策略。
SleepingWaitStrategy:可通过参数设置,使线程通过Thread.yield()主动放弃执行,通过线程调度器重新调度;或一直自旋等待。
TimeoutBlockingWaitStrategy:通过参数设置阻塞时间,如果超时则抛出异常。
YieldingWaitStrategy: 通过Thread.yield()主动放弃执行,通过线程调度器重新调度。
转载于:https://blog.51cto.com/11246272/1745472
总结
以上是生活随笔为你收集整理的disruptor实现细节及源码分析的全部内容,希望文章能够帮你解决所遇到的问题。
- 上一篇: PHP 表单处理
- 下一篇: mybatis学习笔记(3)-入门程序一