Semaphore
Semaphore 是基于同步器实现的计数信号量。
Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。
公平的信号量可以保证不会出现线程饥饿,而非公平的信号量可以提供更高的吞吐量。
创建实例
private final Sync sync;
/**
* 信号量的同步器实现
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
Sync(int permits) {
// 写入许可数
setState(permits);
}
final int getPermits() {
// 获取可用许可数
return getState();
}
/**
* 非公平地获取 acquires 个许可
* created by ZXD at 15 Dec 2018 T 11:43:17
* @param acquires
* @return
*/
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
// 读取可用许可数
final int available = getState();
// 计算剩余许可数
final int remaining = available - acquires;
/**
* 1)剩余许可数 < 0,则直接返回,不更新可用许可数
* 2)更新可用许可书
*/
if (remaining < 0 ||
compareAndSetState(available, remaining)) {
return remaining;
}
}
}
/**
* 尝试释放 releases 个许可
* created by ZXD at 15 Dec 2018 T 11:44:56
* @param releases
* @return
*/
@Override
protected final boolean tryReleaseShared(int releases) {
for (;;) {
final int current = getState();
// 计算可用许可数
final int next = current + releases;
if (next < current) {
throw new Error("Maximum permit count exceeded");
}
// 更新许可数
if (compareAndSetState(current, next)) {
return true;
}
}
}
/**
* 递减 reductions 个许可
* created by ZXD at 15 Dec 2018 T 11:46:19
* @param reductions
*/
final void reducePermits(int reductions) {
for (;;) {
final int current = getState();
final int next = current - reductions;
if (next > current) {
throw new Error("Permit count underflow");
}
if (compareAndSetState(current, next)) {
return;
}
}
}
/**
* 一次性获取全部许可
* created by ZXD at 15 Dec 2018 T 11:46:41
* @return
*/
final int drainPermits() {
for (;;) {
// 读取当前许可数
final int current = getState();
// 如果不是 0,则将其置为 0
if (current == 0 || compareAndSetState(current, 0)) {
// 返回读取到的许可数
return current;
}
}
}
}
/**
* 非公平版本
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
@Override
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
/**
* 公平版本
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
@Override
protected int tryAcquireShared(int acquires) {
for (;;) {
// 如果已经有线程在阻塞等待获取许可,则不允许获取
if (hasQueuedPredecessors()) {
return -1;
}
final int available = getState();
final int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining)) {
return remaining;
}
}
}
}
/**
* 创建一个持有 permits 个许可的非公平信号量
*/
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
/**
* 创建一个持有 permits 个许可的
* true:公平信号量
* false:公平信号量
*/
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
获取许可
- 尝试获取一个许可,如果无许可可用,则阻塞等待,支持中断
/**
* 尝试获取一个许可,如果无许可可用,则阻塞等待
* 1)获取到一个许可
* 2)线程被中断
*/
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
- 尝试获取 permits 个许可,如果无许可可用,则阻塞等待,支持中断
/**
* 尝试获取 permits 个许可,如果无许可可用,则阻塞等待
* 1)获取到一个许可
* 2)线程被中断
*/
public void acquire(int permits) throws InterruptedException {
if (permits < 0) {
throw new IllegalArgumentException();
}
sync.acquireSharedInterruptibly(permits);
}
- 尝试获取一个许可,如果无许可可用,则阻塞等待,不支持线程中断
/**
* 尝试获取一个许可,如果无许可可用,则阻塞等待,不支持线程中断
* 1)获取到一个许可
*/
public void acquireUninterruptibly() {
sync.acquireShared(1);
}
- 尝试获取 permits 个许可,如果无许可可用,则阻塞等待,不支持中断
/**
* 尝试获取 permits 个许可,如果无许可可用,则阻塞等待,不支持中断
* 1)获取到一个许可
*/
public void acquireUninterruptibly(int permits) {
if (permits < 0) {
throw new IllegalArgumentException();
}
sync.acquireShared(permits);
}
- 如果有许可可用,则一次性获取所有的许可,并返回许可数,否则返回 0
/**
* 如果有许可可用,则一次性获取所有的许可,并返回许可数,否则返回 0
*/
public int drainPermits() {
return sync.drainPermits();
}
释放许可
/**
* 将一个许可归还给信号量
*/
public void release() {
sync.releaseShared(1);
}
/**
* 将 permits 个许可归还给信号量
*/
public void release(int permits) {
if (permits < 0) {
throw new IllegalArgumentException();
}
sync.releaseShared(permits);
}