消费者如何读取数据?
前一篇是生产者的处理,这一篇讲消费者的处理
我们都知道,消费者无非就是不停地从队列中读取数据,处理数据。但是与BlockedQueue不同的是,RingBuffer的消费者不会对队列进行上锁,那它是怎样实现的呢?
概括地说,就是通过CAS原子性地得到一个可消费的序号,然后再根据序号取出数据进行处理。
在看代码之前,我们先把能想到的东西先罗列一下:
1.需要一个尾指针来追踪消费状态
2.如何防止一个数据被多个消费者重复消费?
3.消费速度不能超过生产者,如何限制?
4.当没有可处理数据的时候消费者该做什么,自旋还是挂起等待生产者唤醒?
5.如果4选择挂起,那么如果RingBuffer关闭,如何唤醒消费者以终结线程任务?
6.RingBuffer构造的时候需要传入线程工厂,RingBuffer是如何使用线程的,多个任务使用一个线程调度?
7.消费者何时启动?
好,问题有了,现在我们来看代码,下面是EventProcessor的一个实现,WorkProcessor的代码。
public final class WorkProcessor<T> implements EventProcessor { private final AtomicBoolean running = new AtomicBoolean(false); //当前处理器状态 private final Sequence sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); //当前已消费过的最新序号 private final RingBuffer<T> ringBuffer; //保留此引用,方便取数据 private final SequenceBarrier sequenceBarrier; //用于等待下一个最大可用序号,可与多个Processor共用 private final WorkHandler<? super T> workHandler; //实际上的处理器 private final ExceptionHandler<? super T> exceptionHandler; private final Sequence workSequence; //多个Processor共用的workSequence,可以得到下一个待处理的序号 private final EventReleaser eventReleaser = new EventReleaser() { @Override public void release() { sequence.set(Long.MAX_VALUE); } }; private final TimeoutHandler timeoutHandler; /** * Construct a {@link WorkProcessor}. * * @param ringBuffer to which events are published. * @param sequenceBarrier on which it is waiting. * @param workHandler is the delegate to which events are dispatched. * @param exceptionHandler to be called back when an error occurs * @param workSequence from which to claim the next event to be worked on. It should always be initialised * as {@link Sequencer#INITIAL_CURSOR_VALUE} */ public WorkProcessor( final RingBuffer<T> ringBuffer, final SequenceBarrier sequenceBarrier, final WorkHandler<? super T> workHandler, final ExceptionHandler<? super T> exceptionHandler, final Sequence workSequence) { this.ringBuffer = ringBuffer; this.sequenceBarrier = sequenceBarrier; this.workHandler = workHandler; this.exceptionHandler = exceptionHandler; this.workSequence = workSequence; if (this.workHandler instanceof EventReleaseAware) { ((EventReleaseAware) this.workHandler).setEventReleaser(eventReleaser); } timeoutHandler = (workHandler instanceof TimeoutHandler) ? (TimeoutHandler) workHandler : null; } @Override public Sequence getSequence() { return sequence; } @Override public void halt() { running.set(false); sequenceBarrier.alert(); //唤醒卡在WaitStrategy的Processor线程,好让它知道“结束”状态 } /** * remove workProcessor dynamic without message lost */ public void haltLater() { running.set(false); //所谓later,就是等待下一次检查的时候才推出,如果处于卡在WaitStrategy,则等待它返回后才检查 } @Override public boolean isRunning() { return running.get(); } /** * It is ok to have another thread re-run this method after a halt(). * * @throws IllegalStateException if this processor is already running */ @Override public void run() { if (!running.compareAndSet(false, true)) //防止run方法重复调用造成的问题 { throw new IllegalStateException("Thread is already running"); } sequenceBarrier.clearAlert(); notifyStart(); boolean processedSequence = true; long cachedAvailableSequence = Long.MIN_VALUE; long nextSequence = sequence.get(); T event = null; while (true) //死循环 { try { // if previous sequence was processed - fetch the next sequence and set // that we have successfully processed the previous sequence // typically, this will be true // this prevents the sequence getting too far forward if an exception // is thrown from the WorkHandler if (processedSequence) { if (!running.get()) //如果检查到已关闭,则唤醒在同一个Barrier上的其他processor线程 { sequenceBarrier.alert(); //唤醒其他线程 sequenceBarrier.checkAlert(); //抛出异常,终结此线程 } processedSequence = false; do { //workSequence可能和多个Processor共用 nextSequence = workSequence.get() + 1L; //这个sequence才是当前处理器处理过的序号,生产者判断尾指针的时候就是按照这个来的,这个就是gatingSequence //拿到下一个新序号的时候,说明workSequence前一个数据已经处理过了 sequence.set(nextSequence - 1L); } //由于workSequence可能由多个Processor共用,故存在竞争情况,需要使用CAS while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence)); } //如果没有超过生产者的最大游标,则表明数据可取 if (cachedAvailableSequence >= nextSequence) { //取出序号对应位置的数据 event = ringBuffer.get(nextSequence); //交给handler处理 workHandler.onEvent(event); processedSequence = true; } else { //阻塞等待下一个可用的序号 //如果就是nextSequence,就返回nextSequence //如果可用的大于nextSequence,则返回最新可用的sequence cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence); } } catch (final TimeoutException e) { notifyTimeout(sequence.get()); } catch (final AlertException ex) //checkAlert()抛出的 { if (!running.get()) //如果已经结束,则终结循环,线程任务结束 { break; } } catch (final Throwable ex) //其他异常,则交给异常处理器处理 { // handle, mark as processed, unless the exception handler threw an exception exceptionHandler.handleEventException(ex, nextSequence, event); processedSequence = true; } } notifyShutdown(); running.set(false); } private void notifyTimeout(final long availableSequence) { try { if (timeoutHandler != null) { timeoutHandler.onTimeout(availableSequence); } } catch (Throwable e) { exceptionHandler.handleEventException(e, availableSequence, null); } } private void notifyStart() { if (workHandler instanceof LifecycleAware) { try { ((LifecycleAware) workHandler).onStart(); } catch (final Throwable ex) { exceptionHandler.handleOnStartException(ex); } } } private void notifyShutdown() { if (workHandler instanceof LifecycleAware) { try { ((LifecycleAware) workHandler).onShutdown(); } catch (final Throwable ex) { exceptionHandler.handleOnShutdownException(ex); } } } }
针对问题一:需要一个尾指针来追踪消费状态
你们注意到代码中有两个Sequence,workSequence和sequence。为啥需要两个呢?
workSequence消费者使用的最新序号(该序号的数据未被处理过,只是被消费者标记成可消费);而sequence序号的数据则是被消费过的,这个序号正是前一篇中的gatingSequence。
针对问题二:如何防止一个数据被多个消费者重复消费?
问题二的解决方案就是WorkPool,即让多个WorkProcessor共用一个workSequence,这样它们就会竞争序号,一个序号只能被消费一次。
public final class WorkerPool<T> { private final AtomicBoolean started = new AtomicBoolean(false); private final Sequence workSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); //从-1开始 private final RingBuffer<T> ringBuffer; //RingBuffer引用,用于构造Processor,取数据 // WorkProcessors are created to wrap each of the provided WorkHandlers private final WorkProcessor<?>[] workProcessors; //... public WorkerPool( final RingBuffer<T> ringBuffer, final SequenceBarrier sequenceBarrier, final ExceptionHandler<? super T> exceptionHandler, final WorkHandler<? super T>... workHandlers) { this.ringBuffer = ringBuffer; final int numWorkers = workHandlers.length; workProcessors = new WorkProcessor[numWorkers]; //每个handler构造一个Processor for (int i = 0; i < numWorkers; i++) { workProcessors[i] = new WorkProcessor<>( ringBuffer, sequenceBarrier, //共用同一个sequenceBarrier workHandlers[i], exceptionHandler, workSequence); //共用同一个workSequence } } //... } public class Disruptor<T> { private final RingBuffer<T> ringBuffer; private final Executor executor; private final ConsumerRepository<T> consumerRepository = new ConsumerRepository<>(); private final AtomicBoolean started = new AtomicBoolean(false); private ExceptionHandler<? super T> exceptionHandler = new ExceptionHandlerWrapper<>(); //... public final EventHandlerGroup<T> handleEventsWithWorkerPool(final WorkHandler<T>... workHandlers) { return createWorkerPool(new Sequence[0], workHandlers); } EventHandlerGroup<T> createWorkerPool( final Sequence[] barrierSequences, final WorkHandler<? super T>[] workHandlers) { final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(barrierSequences); final WorkerPool<T> workerPool = new WorkerPool<>(ringBuffer, sequenceBarrier, exceptionHandler, workHandlers); consumerRepository.add(workerPool, sequenceBarrier); //将workPool存入Repository,启动的时候会从Repository中取出,交给Executor处理 final Sequence[] workerSequences = workerPool.getWorkerSequences(); updateGatingSequencesForNextInChain(barrierSequences, workerSequences); return new EventHandlerGroup<>(this, consumerRepository, workerSequences); } //.... }
针对问题三、四:消费速度不能超过生产者,如何限制?当没有可处理数据的时候消费者该做什么,自旋还是挂起等待生产者唤醒?
使用SequenceBarrier,从WorkProcessor的代码中我们可以知道,消费者会缓存上次获取的最大可消费序号,然后在这序号范围内都可以直接竞争。每次获取最小可用序号的时候,则会触发waitStrategy等待策略进行等待。
其中等待策略有很多中,常见的就是BlockingWaitStategy,该等待策略会挂起执行线程。当生产者publishEvent的时候,则会调用WaitStrategy#signalAllWhenBlocking()方法唤醒所有等待线程。
final class ProcessingSequenceBarrier implements SequenceBarrier { private final WaitStrategy waitStrategy; private final Sequence dependentSequence; private volatile boolean alerted = false; private final Sequence cursorSequence; private final Sequencer sequencer; //... public long waitFor(final long sequence) throws AlertException, InterruptedException, TimeoutException { checkAlert(); long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this); if (availableSequence < sequence) { return availableSequence; } return sequencer.getHighestPublishedSequence(sequence, availableSequence); } //... }
针对问题六、七:RingBuffer构造的时候需要传入线程工厂,RingBuffer是如何使用线程的,多个任务使用一个线程调度?消费者何时启动?
消费者随Disruptor启动,Disruptor启动时会从ConsumerRepository中取出Consumer,提交给Executor执行。
public RingBuffer<T> start() { checkOnlyStartedOnce(); for (final ConsumerInfo consumerInfo : consumerRepository) { consumerInfo.start(executor); } return ringBuffer; }
其中,在新版的Disruptor中,不建议使用外部传入的Executor,而是只传ThreadFactory,然后由内部构造一个Executor,就是BasicExecutor。它的实现就是每次提交的任务都创建一个新的线程负责。所以它的线程模型就是一个消费者一个线程。
public class Disruptor<T> { //... public Disruptor(final EventFactory<T> eventFactory, final int ringBufferSize, final ThreadFactory threadFactory) { this(RingBuffer.createMultiProducer(eventFactory, ringBufferSize), new BasicExecutor(threadFactory)); } //... } public class BasicExecutor implements Executor { private final ThreadFactory factory; private final Queue<Thread> threads = new ConcurrentLinkedQueue<>(); public BasicExecutor(ThreadFactory factory) { this.factory = factory; } @Override public void execute(Runnable command) { //每提交一个任务就新建一个新的线程处理这个任务 final Thread thread = factory.newThread(command); if (null == thread) { throw new RuntimeException("Failed to create thread to run: " + command); } thread.start(); threads.add(thread); } //... }
//TODO 后续整理补充一些图例,方便理解