一,(等待多线程完成的)CountDownLatch
背景;
- countDownLatch(同步援助)是在java1.5被引入,跟它一起被引入的工具类还有CyclicBarrier(同步援助)、Semaphore(计数信号量)、concurrentHashMap和BlockingQueue(阻塞队列)。
- 存在于java.util.cucurrent包下。
概念理解:
主要方法介绍
源码:
(1),countDownLatch类中只提供了一个构造器:
CountDownLatch(int count)
构造一个 CountDownLatch初始化与给定的数。
(2)类中有三个方法是最重要的:
//调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行
public void await() throws InterruptedException { };
//和await()类似,只不过等待一定的时间后count值还没变为0的话就会继续执行
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { };
//将count值减1
public void countDown() { };
使用举例:
public static void CountDownLatchUsed() throws InterruptedException {
CountDownLatch downLatch = new CountDownLatch(6);
for (int i = 0; i < 6; i++) {
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"\t 下自习走人");
downLatch.countDown();
},String.valueOf(i)).start();
}
downLatch.await();
System.out.println(Thread.currentThread().getName()+"自习室关门走人");
}
运行结果:
了解更多:https://blog.csdn.net/chenssy/article/details/49794141#commentBox
二,(同步屏障)CyclicBarrier
概念理解:
主要方法介绍:
源码:
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
在CyclicBarrier中最重要的方法莫过于await()方法,在所有参与者都已经在此 barrier 上调用 await 方法之前,将一直等待。如下:
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
await()方法内部调用dowait(boolean timed, long nanos)方法:
应用场景:
应用实例:
比如开会要等所有人到齐了,才会开会:
public class BlockQueueDemo {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(4, new Runnable() {
@Override
public void run() {
System.out.println("所有人都到齐了吧,咱们开会。。。");
}
});
for (int i = 1; i <= 4; i++) {
final int tempInt = i;
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + "\t" + tempInt + "已到位");
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}, String.valueOf(i)).start();
}
}
}
运行结果:
了解更多:https://blog.csdn.net/chenssy/article/details/70160595
三,(控制并发线程数的)Semaphore
概念理解:
理论知识:
CountDownLatch和Semephore优缺点对比
生活实例版本解释:
深入理解:
源码分析
(1)Semaphore的结构如下:
/**
* 创建具有给定的许可数和非公平的公平设置的 Semaphore。 (默认是非公平锁)
*/
public Semaphore(int permits) {
sync = new NonfairSync(permits);
} /**
* 创建具有给定的许可数和给定的公平设置的 Semaphore。
*/
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
(2)信号量的获取:acquire()
在ReentrantLock中已经阐述过,公平锁和非公平锁获取锁机制的差别:对于公平锁而言,如果当前线程不在CLH队列(CLH锁是一个自旋锁。能确保无饥饿性。提供先来先服务的公平性)的头部,则需要排队等候,而非公平锁则不同,它无论当前线程处于CLH队列的何处都会直接获取锁。所以公平信号量和非公平信号量的区别也一样。
关于CLH 自旋队列锁 更多知识:https://blog.csdn.net/aesop_wubo/article/details/7533186
源码:
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
对于公平信号量和非公平信号量,他们机制的差异就体现在traAcquireShared()方法中:
公平锁 tryAcquireShared() 尝试获取信号量
源码:
protected int tryAcquireShared(int acquires) {
for (;;) {
//判断该线程是否位于CLH队列的列头,如果是的话返回 -1,调用doAcquireSharedInterruptibly()
if (hasQueuedPredecessors())
return -1;
//获取当前的信号量许可
int available = getState();
//设置“获得acquires个信号量许可之后,剩余的信号量许可数”
int remaining = available - acquires; //如果剩余信号量 > 0 ,则设置“可获取的信号量”为remaining
if (remaining < 0 || compareAndSetState(available, remaining))
return remaining;
}
}
注意 : tryAcquireShared是尝试获取 信号量,remaining表示下次可获取的信号量。
(2)hasQueuedPredecessors() 是否有队列同步器
对于hasQueuedPredecessors、compareAndSetState在ReentrantLock中已经阐述了,hasQueuedPredecessors用于判断该线程是否位于CLH队列列头,compareAndSetState用于设置state的,它是进行原子操作的。代码如下:
public final boolean hasQueuedPredecessors() {
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
} protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
(3) doAcquireSharedInterruptibly源代码如下:
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
/*
* 创建CLH队列的node节点,Node.SHARED表示该节点为共享锁
*/
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
//获取该节点的前继节点
final Node p = node.predecessor();
//当p为头节点时,基于公平锁机制,线程尝试获取锁
if (p == head) {
//尝试获取锁
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//判断当前线程是否需要阻塞,如果阻塞的话,则一直处于阻塞状态知道获取共享锁为止
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
非公平锁
对于非公平锁就简单多了,她没有那些所谓的要判断是不是CLH队列的列头,如下:
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
在非公平锁中,tryAcquireShared直接调用AQS的nonfairTryAcquireShared()。通过上面的代码我可看到非公平锁并没有通过if (hasQueuedPredecessors())这样的条件来判断该节点是否为CLH队列的头节点,而是直接判断信号量。
信号量的释放:release()
信号量Semaphore的释放和获取不同,它没有分公平锁和非公平锁。如下:
public void release() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
//尝试释放共享锁
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
注意:release()释放线索所占有的共享锁,它首先通过tryReleaseShared尝试释放共享锁,如果成功直接返回,如果失败则调用doReleaseShared来释放共享锁。
tryReleaseShared:源码部分:
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
//信号量的许可数 = 当前信号许可数 + 待释放的信号许可数
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
//设置可获取的信号许可数为next
if (compareAndSetState(current, next))
return true;
}
}
doReleaseShared:源码部分:
private void doReleaseShared() {
for (;;) {
//node 头节点
Node h = head;
//h != null,且h != 尾节点
if (h != null && h != tail) {
//获取h节点对应线程的状态
int ws = h.waitStatus;
//若h节点状态为SIGNAL,表示h节点的下一个节点需要被唤醒
if (ws == Node.SIGNAL) {
//设置h节点状态
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
//唤醒h节点对应的下一个节点
unparkSuccessor(h);
}
//若h节点对应的状态== 0 ,则设置“文件点对应的线程所拥有的共享锁”为其它线程获取锁的空状态
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
//h == head时,则退出循环,若h节点发生改变时则循环继续
if (h == head)
break;
}
}
参考博客: