欢迎访问 生活随笔!

生活随笔

当前位置: 首页 > 编程资源 > 编程问答 >内容正文

编程问答

disruptor实现细节及源码分析

发布时间:2025/7/14 编程问答 39 豆豆
生活随笔 收集整理的这篇文章主要介绍了 disruptor实现细节及源码分析 小编觉得挺不错的,现在分享给大家,帮大家做个参考.

disruptor实现细节及源码分析

一、     背景介绍

     Disruptor它是一个开源的并发框架,并获得 2011 Duke’s 程序框架创新奖,能够在无锁的情况下实现网络的Queue并发操作。

     说明:下文所有内容基于disruptor3.34版本。

二、     应用场景

     在消费者--生产者模式中或发布订阅模式中使用。

    具有以下特点:

  • 无锁的设计及CAS式的原子访问。

  • 预分配存储空间,避免垃圾回收带来的资源消耗。

    三、     核心对象

  •         RingBuffer:环形的一个数据结构,对象初始化时,会使用事件Event进行填充。Buffer的大小必须是2的幂次方,方便移位操作。

             

            Event:无指定具体接口,用户自己实现,可以携带任何业务数据。

             

            EventFactory:产生事件Event的工厂,由用户自己实现。

             

            EventTranslator:事件发布的回调接口,由用户实现,负责将业务参数设置到事件中。

             

            Sequencer:序列产生器,也是协调生产者和消费者及实现高并发的核心。有MultiProducerSequencer SingleProducerSequencer两个实现类。

             

            SequenceBarrier:拥有RingBuffer的发布事件Sequence引用和消费者依赖的Sequence引用。决定消费者消费可消费的Sequence

             

            EventHandler:事件的处理者,由用户自己实现。

             

            EventProcessor:事件的处理器,单独在一个线程中运行。

             

            WorkHandler:事件的处理者,由用户自己实现。

             

            WorkProcessor:事件的处理器,单独在一个线程中运行。

             

            WorkerPool:一组WorkProcessor的处理。

             

            WaitStrategy:在消费者比生产者快时,消费者处理器的等待策略。

     

    四、     简单示例

    • 定义业务数据类:

    public class MyData {private long value;public MyData(long value){this.value = value;}public long getValue() {return value;}public void setValue(long value) {this.value = value;}public String toString(){StringBuffer sb = new StringBuffer();sb.append("value=").append(value);return sb.toString();} }

    • 定义事件类:

    public class MyEvent {private MyData data;public MyData getData() {return data;}public void setData(MyData data) {this.data = data;}}

    • 定义事件处理类:

    public class MyEventHandler implements EventHandler<MyEvent>{@Overridepublic void onEvent(MyEvent event, long sequence, boolean endOfBatch)throws Exception {System.out.println("事件处理:"+event.getData());}}

    • 定义事件工厂类:

    public class MyFactory implements EventFactory<MyEvent>{@Overridepublic MyEvent newInstance() {return new MyEvent();}}
    • 定义事件发布辅助类:

    public class MyEventTranslatorOneArg implements EventTranslatorOneArg<MyEvent,MyData>{@Overridepublic void translateTo(MyEvent event, long sequence, MyData data) {System.out.println("发布事件:"+data);event.setData(data);}}

    • 主类:

    public class MyMainTest {public static void main(String[]args){Disruptor<MyEvent> disruptor = newDisruptor<MyEvent>(new MyFactory(),//事件工厂128, //必须为2的幂次方new ThreadFactory(){//线程工厂@Overridepublic Thread newThread(Runnable runnable) {returnnew Thread(runnable);}},ProducerType.SINGLE,//指定生产者为一个或多个newYieldingWaitStrategy());//等待策略//指定处理器disruptor.handleEventsWith(newMyEventHandler());disruptor.start();//发布事件MyEventTranslatorOneArg translator = newMyEventTranslatorOneArg();for(int i = 0; i < 10; i++){disruptor.publishEvent(translator, new MyData(i));}disruptor.shutdown();} }

    五、     实现原理及源码分析

    • RingBuffer的实现:

        封装了一个对象数组,RingBuffer实例化时,用Event填充。生产者和消费者通过对序列(long的原子操作封装)取模计算获取对象数组中Event

     

    public E get(long sequence){return elementAt(sequence); }protected final E elementAt(long sequence){return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT));}


    • 单个生产者的实现:

        保存有所有消费者当前消费的前一个序列值,在取下一个要发布的序列时,检查要发布的序列是否覆盖所有消费者正在处理的最小序列。如果未覆盖,则获取可发布的游标值,如果覆盖(说明缓存已经满了),则自旋等待,直到可以发布。发布事件时则先发布,后指定当前游标为发布的序列值。

     

    public long next(int n){if (n < 1){thrownew IllegalArgumentException("n must be > 0");}//当前生产者发布的的最大序列long nextValue = this.nextValue;long nextSequence = nextValue + n;//要发布的最大序列long wrapPoint = nextSequence - bufferSize;//覆盖点long cachedGatingSequence = this.cachedValue;//消费者中处理序列最小的前一个序列//缓存已满  或者处理器处理异常时if (wrapPoint > cachedGatingSequence ||cachedGatingSequence > nextValue){long minSequence;//等待直到有可用的缓存while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences,nextValue))){LockSupport.parkNanos(1L);// TODO: Use waitStrategy to spin?}this.cachedValue = minSequence;}//更新当前生产者发布的的最大序列this.nextValue = nextSequence;return nextSequence;}


    • 多个生产者的实现:

        保存有所有消费者当前消费的前一个序列值,并维护一个和RingBuffer一样大小的数组,在取下一个要发布的序列时,检查要发布的序列是否覆盖所有消费者正在处理的最小序列。如果未覆盖,则先发布,后指定当前游标为发布的序列值,如果未覆盖,则获取可发布的游标值,如果覆盖(说明缓存已经满了),则自旋等待,直到可以发布。一个生产者获取可发布的序列后,立即更新当前游标。发布事件时生产者每发布一个序列,则记录到数组指定位置。

    public long next(int n){if (n < 1){thrownew IllegalArgumentException("n must be > 0");}long current;long next;do{//当前游标current = cursor.get();//要发布的游标next = current + n;//覆盖点long wrapPoint = next - bufferSize;//消费者中处理序列最小的前一个序列long cachedGatingSequence = gatingSequenceCache.get();//缓存已满  或者处理器处理异常时if (wrapPoint > cachedGatingSequence ||cachedGatingSequence > current){long gatingSequence = Util.getMinimumSequence(gatingSequences, current);if (wrapPoint > gatingSequence){LockSupport.parkNanos(1);// TODO, should we spin based on the waitstrategy?//缓存满时,继续再次尝试continue;}//更新当前生产者发布的的最大序列gatingSequenceCache.set(gatingSequence);}elseif (cursor.compareAndSet(current, next)){//成功获取到发布序列并设置当前游标成功时跳出循环break;}}while (true);return next;}

    • 消费者的实现:

        消费者保持一个自己的序列,每次累加后nextSequence,去获取可访问的最大序列。对于一个生产者,就是nextSequenceRingBuffer当前游标的序列。对于多个生产者,就是nextSequenceRingBuffer当前游标之间,最大的连续的序列集。

    public long waitFor(finallong sequence)throws AlertException,InterruptedException, TimeoutException{checkAlert();//获取最大的可消费的序列,依赖等待策略long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);if (availableSequence < sequence){return availableSequence;}return sequencer.getHighestPublishedSequence(sequence,availableSequence);}

         一个生产者:

    public long getHighestPublishedSequence(long lowerBound, long availableSequence){// 返回最大序列availableSequencereturn availableSequence; }

         多个生产者:

    public boolean isAvailable(long sequence){int index = calculateIndex(sequence);int flag = calculateAvailabilityFlag(sequence);long bufferAddress = (index * SCALE) + BASE;//相应位置上的值相等,说明已经发布该序列returnUNSAFE.getIntVolatile(availableBuffer,bufferAddress) == flag;}@Overridepublic long getHighestPublishedSequence(long lowerBound, long availableSequence){//从数组中找出未发布序列,即由小到大连续的发布序列for (long sequence = lowerBound; sequence <= availableSequence;sequence++){if (!isAvailable(sequence)){//返回未发布序列的前一个序列return sequence - 1;}}return availableSequence;}


    • 等待策略:

            消费者在缓存中没有可以消费的事件时,采取的等待策略:

             

            BlockingWaitStrategy:通过线程阻塞的方式,等待生产者唤醒

             

            BusySpinWaitStrategy:线程一直自旋等待,比较耗CPU

             

            LiteBlockingWaitStrategy:通过线程阻塞的方式,等待生产者唤醒,比BlockingWaitStrategy要轻,某些情况下可以减少阻塞的次数。

             

            PhasedBackoffWaitStrategy:根据指定的时间段参数和指定的等待策略决定采用哪种等待策略。

             

            SleepingWaitStrategy:可通过参数设置,使线程通过Thread.yield()主动放弃执行,通过线程调度器重新调度;或一直自旋等待。

            TimeoutBlockingWaitStrategy:通过参数设置阻塞时间,如果超时则抛出异常。

             

            YieldingWaitStrategy 通过Thread.yield()主动放弃执行,通过线程调度器重新调度。

            


    转载于:https://blog.51cto.com/11246272/1745472

    总结

    以上是生活随笔为你收集整理的disruptor实现细节及源码分析的全部内容,希望文章能够帮你解决所遇到的问题。

    如果觉得生活随笔网站内容还不错,欢迎将生活随笔推荐给好友。