CyclicBarrier源码分析

CyclicBarrier的作用是让一组线程互相等待至某个状态后并行执行(相对外部来说是并行,其实内部还是串行)

基本的使用方法是创建一个CyclicBarrier实例,并且指定parties的个数,然后线程依次调用CyclicBarrier的await()方法让自己进入等待状态,当最后一个线程进入await()方法时,将会唤醒所有正在等待的线程,并行执行。

CyclicBarrier虽然也是同步器,但是并非直接通过AQS来进行实现的,而是借助了ReentrantLock以及Condition来进行实现。


CyclicBarrier的结构

public class CyclicBarrier {

    /**
     * 存在一个Generation静态内部类
     */
    private static class Generation {
        boolean broken = false; // 标识CyclicBarrier是否被破坏
    }

    private final ReentrantLock lock = new ReentrantLock();

    private final Condition trip = lock.newCondition(); // 与ReentrantLock绑定的Condition实例

    private final int parties; // 用于记录一共有多少个线程需要等待

    private final Runnable barrierCommand; // 由最后一个进入await()方法的线程进行调用

    private Generation generation = new Generation();

    private int count; // 用于记录还需要多少个线程进行等待

    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

    public CyclicBarrier(int parties) {
        this(parties, null);
    }

    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }

    public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
               BrokenBarrierException,
               TimeoutException {
        return dowait(true, unit.toNanos(timeout));
    }

    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        // ......
    }

    // 其他省略
}

可以看到CyclicBarrier存在全局的lock、trip、parties、count、barrierCommand以及generation属性,其中parties属性用于记录一共有多少个线程需要等待,而count用于记录还需要多少个线程进行等待。

同时CyclicBarrier中定义了一个Generation静态内部类,该内部类只有一个broken全局属性,用于标识CyclicBarrier是否被破坏,默认为false。

同时CyclicBarrier的构造方法会初始化全局的parties、count以及barrierCommand属性(CyclicBarrier初始化后,count的数量等于parties的数量)


await()方法

由于当创建CyclicBarrier实例之后,线程需要依次调用CyclicBarrier的await()方法让自己进入等待状态,因此从await()方法开始入手。

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}
public int await(long timeout, TimeUnit unit)
    throws InterruptedException,
           BrokenBarrierException,
           TimeoutException {
    return dowait(true, unit.toNanos(timeout));
}

await()方法存在两个重载,区别是一个支持超时,一个不支持超时,最终都会调用dowait()方法(使用timed参数表示是否有超时限制,如果timed参数为true则需要传递具体的超时时间)


dowait()方法

private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
    // 获取全局的ReentrantLock实例,并进行加锁
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 获取全局的Generation实例,如果Generation中的broken属性为true则表示CyclicBarrier已经被破坏,则直接抛出异常(默认是false)
        final Generation g = generation;

        if (g.broken)
            throw new BrokenBarrierException();

        // 如果线程已经被设置了中断标识,则调用breakBarrier()方法,破坏CyclicBarrier
        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException(); // 抛出异常
        }

        // index属性用于记录还需要多少个线程进行等待
        int index = --count;
        // 如果index等于0,表示当前线程是最后一个进入await()方法的线程,如果barrierCommand不为空,那么执行barrierCommand的run()方法,然后调用nextGeneration()方法,唤醒在指定Condition实例中等待的所有线程,并重置CyclicBarrier,然后线程直接返回,做自己的事情
        if (index == 0) {  // 最后一个线程走这个逻辑
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run();
                ranAction = true;
                nextGeneration();
                return 0;
            } finally {
                // 如果在执行barrierCommand的run()方法时抛出异常,那么ranAction标识为false,那么需要调用breakBarrier()方法,破坏CyclicBarrier
                if (!ranAction)
                    breakBarrier();
            }
        }

        // 如果非最后一个线程那么将会往下执行

        // 循环
        for (;;) {
            try {
                // 如果没有超时限制,那么直接调用Condition实例的await()方法,让线程在指定的Condition实例中进行等待,并释放掉它拥有的锁
                // 如果有超时限制,那么调用Condition实例的awaitNanos()方法,至多让线程在指定的Condition实例中等待指定的时间,该方法返回线程被唤醒后剩余的毫秒数(超时返回小于等于0),并释放掉它拥有的锁
                if (!timed)
                    trip.await();
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    Thread.currentThread().interrupt();
                }
            }

            // 当线程被唤醒后将会串行执行以下的逻辑

            // 如果发现CyclicBarrier被破坏了,那么就抛出异常
            if (g.broken)
                throw new BrokenBarrierException();

            // 正常情况下,当调用了nextGeneration()方法之后,generation引用就指向一个新的Generation实例,因此g!=generation,那么线程直接返回,做自己的事情
            if (g != generation)
                return index;

            // 如果线程在Condition实例等待的过程中由于达到了超时时间而被唤醒了,那么将会调用breakBarrier()方法,破坏CyclicBarrier
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException(); // 抛出异常
            }
        }
    } finally {
        lock.unlock(); // 解锁
    }
}

当线程进入dowait()方法后,需要获取锁,如果当前线程并非最后一个进入await()方法的线程,那么将会在指定的Condition实例中进行等待,然后释放掉它拥有的锁,如果当前线程是最后一个进入await()方法的线程(index==0,表示还需要0个线程进行等待),如果barrierCommand不为空,那么将会执行barrierCommand的run()方法,最后调用nextGeneration()方法。

如果在执行dowait()方法的过程中,线程已经被设置了中断标识,或者最后一个线程在执行barrierCommand的run()方法时抛出异常,或者在指定Condition实例等待的线程由于达到了超时时间而被唤醒,那么都会调用breakBarrier()方法。


nextGeneration()方法

private void nextGeneration() {
    // 唤醒在指定Condition实例中等待的所有线程
    trip.signalAll();
    // 将count的数量设置成parties
    count = parties;
    // 将generation引用指向一个新的Generation实例
    generation = new Generation();
}

nextGeneration()方法用于指向下一个Generation,该方法将会唤醒在指定Condition实例中等待的所有线程,然后将count的数量设置成parties,恢复成CyclicBarrier初始化后的状态,最后将generation引用指向一个新的Generation实例。


breakBarrier()方法

private void breakBarrier() {
    // 将Generation实例的broken属性设置为true,表示CyclicBarrier已经被破坏
    generation.broken = true;
    // 将count的数量设置回parties
    count = parties;
    // 唤醒在指定Condition实例中等待的所有线程
    trip.signalAll();
}

breakBarrier()方法用于破坏CyclicBarrier,将Generation实例的broken属性设置为true,表示CyclicBarrier已经被破坏,然后将count的数量设置成parties,最后唤醒在指定Condition实例中等待的所有线程。


reset()方法

public void reset() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        breakBarrier(); // 感觉是多余的
        nextGeneration(); // 指向下一个Generation
    } finally {
        lock.unlock();
    }
}

reset()方法用于重置CyclicBarrier,其根本是将generation引用指向一个新的Generation实例。


流程总结

1.当创建了一个CyclicBarrier实例之后,线程需要依次调用CyclicBarrier的await()方法,让自己进入等待状态。

2.await()方法又会调用dowait()方法,当线程进入dowait()方法后,需要获取锁,如果当前线程并非最后一个进入await()方法的线程,那么将会在指定的Condition实例中进行等待,然后释放掉它拥有的锁,如果当前线程是最后一个进入await()方法的线程(index==0,表示还需要0个线程进行等待),如果barrierCommand不为空,那么将会执行barrierCommand的run()方法,最后调用nextGeneration()方法。

3.nextGeneration()方法用于指向下一个Generation,该方法将会唤醒在指定Condition实例中等待的所有线程,然后将count的数量设置成parties,恢复成CyclicBarrier初始化后的状态,最后将generation引用指向一个新的Generation实例,当最后一个线程执行完nextGeneration()方法后,将会直接返回,做自己的事情,最后释放掉它拥有的锁。

4.当被唤醒的线程依次获取到锁后,将会继续往下执行,如果判断到generation引用已经指向一个新的Generation实例,那么直接返回,做自己的事情,最后释放掉它拥有锁。

5.如果在执行dowait()方法的过程中,线程已经被设置了中断标识,或者最后一个线程在执行barrierCommand的run()方法时抛出异常,或者在指定Condition实例等待的线程由于达到了超时时间而被唤醒,那么都会调用breakBarrier()方法,破坏CyclicBarrier,将Generation实例的broken属性设置为true,表示CyclicBarrier已经被破坏,然后将count的数量设置成parties,最后唤醒在指定Condition实例中等待的所有线程。

6.当被唤醒的线程依次获取到锁后,将会继续往下执行,如果判断到Generation实例的broken属性被设置了true,也就是CyclicBarrier已经被破坏,那么将会直接抛出异常,最后释放掉它拥有的锁。

7.当CyclicBarrier被破坏后是不能够进行复用的,因为Generation的broken属性已经被设置成true,因此需要先调用一次reset()方法进行重置。


FAQ

为什么说CyclicBarrier是可以复用的?

因为当最后一个线程进入await()方法,将会调用nextGeneration()方法,该方法除了唤醒在指定Condition中等待的线程之外,还会将count的数量设置成parties,恢复成CyclicBarrier初始化后的状态,同时将generation引用指向一个新的Generation实例,因此CyclicBarrier是可以复用的,同时需要注意的是,如果CyclicBarrier已经被破坏,那么需要先调用一次reset()方法之后才能够进行复用。

09-09 03:00