CountDownLatch

CountDownLatch 源码
 1 import java.util.concurrent.TimeUnit;
 2 import java.util.concurrent.locks.AbstractQueuedSynchronizer;
 3
 4 public class MyCountDownLatch {
 5     /**
 6      * 通过AQS实现闭锁的同步控制,AQS的state为闭锁的count
 7      */
 8     private static final class Sync extends AbstractQueuedSynchronizer {
 9         private static final long serialVersionUID = 4982264981922014374L;
10         Sync(int count) {
11             //初始化state
12             setState(count);
13         }
14         int getCount() {
15             //获取state
16             return getState();
17         }
18         protected int tryAcquireShared(int acquires) {
19             return (getState() == 0) ? 1 : -1;
20         }
21
22         /**
23          * 共享锁,通过CAS对state减一
24          */
25         protected boolean tryReleaseShared(int releases) {
26             //自旋锁
27             for (; ; ) {
28                 //获取state
29                 int c = getState();
30                 if (c == 0)
31                     //如果state为零直接返回false,不做任何处理。不进行对state减一
32                     return false;
33                 //对state减一
34                 int nextc = c - 1;
35                 //CAS更新state为nextc
36                 if (compareAndSetState(c, nextc))
37                     //更新后如果state为零返回ture。
38                     return nextc == 0;
39             }
40         }
41     }
42     private final MyCountDownLatch.Sync sync;
43     public MyCountDownLatch(int count) {
44         //count必须大于等于零
45         if (count < 0) throw new IllegalArgumentException("count < 0");
46         //初始化state
47         this.sync = new MyCountDownLatch.Sync(count);
48     }
49
50     /**
51      * 当前线程阻塞,直到state为零或者被线程被中断。
52      * 如果state为零,不会阻塞。
53      */
54     public void await() throws InterruptedException {
55         sync.acquireSharedInterruptibly(1);
56     }
57
58     /**
59      * 设置wait时间,到期无论state是否为零,直接退出阻塞。
60      */
61     public boolean await(long timeout, TimeUnit unit)
62             throws InterruptedException {
63         return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
64     }
65
66     /**
67      * 如果state不为零对state减一,否则什么也不做。
68      */
69     public void countDown() {
70         sync.releaseShared(1);
71     }
72
73     public long getCount() {
74         //当前count的值
75         return sync.getCount();
76     }
77     public String toString() {
78         //闭锁的类信息和当前count的值
79         return super.toString() + "[Count = " + sync.getCount() + "]";
80     }
81 }
CountDownLatch 使用
 1 /**
 2      * 同步一个或者多个任务,强制他们等待由其他任务执行的一组操作完成。
 3      */
 4     public static void myCountDownLatch() {
 5         final CountDownLatch countDownLatch = new CountDownLatch(3);
 6         ExecutorService executorService = Executors.newFixedThreadPool(100);
 7
 8         ExecutorService executorServiceWait = Executors.newFixedThreadPool(3);
 9         for (int i = 0; i < 6; i++) {
10             final int j = i;
11             executorService.execute(() -> {
12                 //do something
13                 try {
14                     TimeUnit.MILLISECONDS.sleep(300 * j);
15                 } catch (InterruptedException e) {
16                     e.printStackTrace();
17                 }
18                 logger.info("序列号:" + j + countDownLatch.toString());
19                 //执行完任务count减一,count为零时不可再进行减一,一直保持为零,为零的临界点只能触发一次。
20                 countDownLatch.countDown();
21             });
22         }
23         for (int i = 0; i < 2; i++) {
24             executorServiceWait.execute(() -> {
25
26                 logger.info("闭锁 start" + countDownLatch.toString());
27                 try {
28                     //当count为零时,释放阻塞。
29                     countDownLatch.await();
30                 } catch (InterruptedException e) {
31                     logger.error("myCountDownLatch", e);
32                 }
33                 logger.info("闭锁 end" + countDownLatch.toString());
34
35             });
36         }
37
38         executorService.shutdown();
39         executorServiceWait.shutdown();
40     }
12-20 07:54