1 import java.util.concurrent.TimeUnit;
2 import java.util.concurrent.locks.AbstractQueuedSynchronizer;
3
4 public class MyCountDownLatch {
5 /**
6 * 通过AQS实现闭锁的同步控制,AQS的state为闭锁的count
7 */
8 private static final class Sync extends AbstractQueuedSynchronizer {
9 private static final long serialVersionUID = 4982264981922014374L;
10 Sync(int count) {
11 //初始化state
12 setState(count);
13 }
14 int getCount() {
15 //获取state
16 return getState();
17 }
18 protected int tryAcquireShared(int acquires) {
19 return (getState() == 0) ? 1 : -1;
20 }
21
22 /**
23 * 共享锁,通过CAS对state减一
24 */
25 protected boolean tryReleaseShared(int releases) {
26 //自旋锁
27 for (; ; ) {
28 //获取state
29 int c = getState();
30 if (c == 0)
31 //如果state为零直接返回false,不做任何处理。不进行对state减一
32 return false;
33 //对state减一
34 int nextc = c - 1;
35 //CAS更新state为nextc
36 if (compareAndSetState(c, nextc))
37 //更新后如果state为零返回ture。
38 return nextc == 0;
39 }
40 }
41 }
42 private final MyCountDownLatch.Sync sync;
43 public MyCountDownLatch(int count) {
44 //count必须大于等于零
45 if (count < 0) throw new IllegalArgumentException("count < 0");
46 //初始化state
47 this.sync = new MyCountDownLatch.Sync(count);
48 }
49
50 /**
51 * 当前线程阻塞,直到state为零或者被线程被中断。
52 * 如果state为零,不会阻塞。
53 */
54 public void await() throws InterruptedException {
55 sync.acquireSharedInterruptibly(1);
56 }
57
58 /**
59 * 设置wait时间,到期无论state是否为零,直接退出阻塞。
60 */
61 public boolean await(long timeout, TimeUnit unit)
62 throws InterruptedException {
63 return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
64 }
65
66 /**
67 * 如果state不为零对state减一,否则什么也不做。
68 */
69 public void countDown() {
70 sync.releaseShared(1);
71 }
72
73 public long getCount() {
74 //当前count的值
75 return sync.getCount();
76 }
77 public String toString() {
78 //闭锁的类信息和当前count的值
79 return super.toString() + "[Count = " + sync.getCount() + "]";
80 }
81 }
1 /**
2 * 同步一个或者多个任务,强制他们等待由其他任务执行的一组操作完成。
3 */
4 public static void myCountDownLatch() {
5 final CountDownLatch countDownLatch = new CountDownLatch(3);
6 ExecutorService executorService = Executors.newFixedThreadPool(100);
7
8 ExecutorService executorServiceWait = Executors.newFixedThreadPool(3);
9 for (int i = 0; i < 6; i++) {
10 final int j = i;
11 executorService.execute(() -> {
12 //do something
13 try {
14 TimeUnit.MILLISECONDS.sleep(300 * j);
15 } catch (InterruptedException e) {
16 e.printStackTrace();
17 }
18 logger.info("序列号:" + j + countDownLatch.toString());
19 //执行完任务count减一,count为零时不可再进行减一,一直保持为零,为零的临界点只能触发一次。
20 countDownLatch.countDown();
21 });
22 }
23 for (int i = 0; i < 2; i++) {
24 executorServiceWait.execute(() -> {
25
26 logger.info("闭锁 start" + countDownLatch.toString());
27 try {
28 //当count为零时,释放阻塞。
29 countDownLatch.await();
30 } catch (InterruptedException e) {
31 logger.error("myCountDownLatch", e);
32 }
33 logger.info("闭锁 end" + countDownLatch.toString());
34
35 });
36 }
37
38 executorService.shutdown();
39 executorServiceWait.shutdown();
40 }