简介
java.util.concurrent包是Java 5的一个重大改进,java.util.concurrent包提供了多种线程间同步和通信的机制,比如Executors, Queues, Timing, Synchronizers和Concurrent Collections等。与synchronized关键字和Object.notify()等方法相比,这些类和方法的抽象层次都较高。Effective Java中提到,其中比较重要的同步和通信机制有Executor框架、Concurrent Collections和Synchronizers三种。
其中Synchronizers包含了五种: Semaphore信号量,CounDownLatch倒计时锁存器,CyclicBarrier循环栅栏,Phaser和Exchanger。 JCIP中提到,Exchanger可以看做一种特殊的Barrier。Effective Java 提到用的比较多的主要是Semaphore信号量和CounDownLatch倒计时锁存器。本文主要讲解我认为比较重要的Semaphore信号量、CounDownLatch计数锁存器和CyclibBarrier。每一种都按照它们的概念、jdk实现、所提供的方法和使用(traveler或者jdk, or sample code)来进行介绍。
1 Semaphore
semaphore,信号量,是众多synchronizer中的一个。在操作系统中就存在互斥量和信号量这样的概念。 semaphore跟锁机制存在一定的相似性,semaphore也是一种锁机制,所不同的是,reentrantLock是只允许一个线程获得锁,而信号量持有多个许可(permits),允许多个线程获得许可并执行。从这个意义上看,重入锁是许可只有1的信号量。它们所提供的方法也非常接近。
1.1 实现
跟ReentrantLock一样,Semaphore也是以AQS为基础来实现的。
1.1.1 构造函数:
非公平版本:
1 public Semaphore(int permits) {
2 sync = new NonfairSync(permits);
3 }
可以选择是否公平的版本:
1 public Semaphore(int permits, boolean fair) {
2 sync = fair ? new FairSync(permits) : new NonfairSync(permits);
3 }
1.1.2 其他方法
跟ReentrantLock不同的是,每种acquire方法都分为有参数的和不带参数的两个版本:
acquire() :
1 public void acquire() throws InterruptedException {
2 sync.acquireSharedInterruptibly(1);
3 }
acquire(int permits)
1 public void acquire(int permits) throws InterruptedException {
2 if (permits < 0) throw new IllegalArgumentException();
3 sync.acquireSharedInterruptibly(permits);
4 }
与此类似的还有:
acquireUninterruptibly()&acquireUninterruptibly(int)
tryAcquire()& tryAcquire(int)
tryAcquire(long,TimeUnit)& tryAcquire(int, long,TimeUnit)
release()& release(int)
1.2 使用示例:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore; public class TIJ_semaphore {
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
final Semaphore semp = new Semaphore(5); // 5 permits for (int index = 0; index < 20; index++) {
final int NO = index;
Runnable run = new Runnable() {
public void run() {
try {
// if 1 permit avaliable, thread will get a permits and go; if no permit avaliable, thread will block until 1 avaliable
semp.acquire();
System.out.println("Accessing: " + NO);
Thread.sleep((long) (10000);
semp.release();
} catch (InterruptedException e) {
}
}
};
exec.execute(run);
}
exec.shutdown();
}
程序输出结果为:
1 Accessing: 0
2 Accessing: 2
3 Accessing: 3
4 Accessing: 4
5 Accessing: 1
6 (等待10s)
7 Accessing: 5
8 Accessing: 6
9 Accessing: 14
10 Accessing: 8
11 Accessing: 7
12 (等待10s)
13 Accessing: 10
14 Accessing: 9
15 Accessing: 11
16 Accessing: 15
17 Accessing: 12
18 (等待10s)
19 Accessing: 13
20 Accessing: 16
21 Accessing: 17
22 Accessing: 19
23 Accessing: 18
2 CountDownLatch
2.1 实现
内部使用AQS实现
2.2 方法
await()
等待,无超时,可以被中断
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
boolean await(long,timeUnit):
如果等待超时,则返回false; 如果时间为0或者为负,则立刻返回。
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
countDown():
把Latch的计数减1,如果计数到达0,则释放所有正在等待的线程。
public void countDown() {
sync.releaseShared(1);
}
2.3 使用:
绝大多数synchronizer在jdk中没有使用,原因很简单:这些synchronizer是抽象层次较高的,所以一般只有应用程序才会直接使用。
而在nts生产环境中,只有一处admin.rest.api.RestRequestSender使用了CountDownLatch:
public Map<String, List<JSONObject>> doDelete(final String reqUri, final HttpHeaders _headers)
throws RestAPIException
{
setMethod(Method.DELETE);
setUriTemplate(reqUri);
headers = _headers;
return processBroadcast();
}
doDelete调用了processBraodcast:
private Map<String, List<JSONObject>> processBroadcast() throws RestAPIException
{
...
final Map<String, SaaSServerContext> dServers = RestRequestSender.deployedServers;
if (!dServers.isEmpty())
{
final CountDownLatch doneSignal = new CountDownLatch(dServers.size());
...
for (final String key : dServers.keySet())
{
...
executor.submit(new RRSRunnable(doneSignal, context, results, key, totalRecordsCounter));
} try
{
if (!doneSignal.await(Configuration.NTS_MDM_API_BROADCAST_TIMEOUT.getInt(), TimeUnit.MINUTES)) // if timeout will retrun false
{
XLog.warning("MDM api broadcast timed out after " + Configuration.NTS_MDM_API_BROADCAST_TIMEOUT.getInt()
+ " minutes. Timeout is set via notes.ini key "
+ Configuration.NTS_MDM_API_BROADCAST_TIMEOUT.getNotesIniName());
}
}
catch (final InterruptedException ie)
{
throw new RestAPIException("Interrupted", ie);
} } return results; // if doneSingnal.await has been intrerrupted, will this line still execute?
}
RRSRunnable代码如下:
class RRSRunnable implements Runnable
{
private final CountDownLatch doneSignal;
... RRSRunnable(final CountDownLatch doneSignal, final SaaSServerContext context,
final Map<String, List<JSONObject>> results, final String serverKey,
final MaxRecordCounter recordCounter)
{
this.doneSignal = doneSignal;
...
} @Override
public void run()
{
...
try
{
processRequest(responses, context.getHostName(), client, context, recordCounter);
final long elapsedTime = System.currentTimeMillis() - startTime;
XLog.fine("Traveler API request completed. orgid=" + orgId + ";request=" + reqUri + ";pool=" + serverKey
+ ";time=" + elapsedTime + "ms");
}
catch (final RestAPIException rae)
{
exceptionServerKey = serverKey;
exception = rae;
}
finally
{
doneSignal.countDown();
}
}
}
2.4 更加一般的Latch:
CountDownLatch是一种特殊的Latch,jcip第八章用countdownLatch实现了一种valueLatch。
2.4.1 nts Latch简介:
在nts生产代码中也实现了一种Latch,Latch允许多个线程间的协作:在这种Latch中,有working thread和latching thread之分:
workingThread在做一些工作,latchingThread希望当这些工作完成的时候,锁存这些工作,然后得到workingThread的工作结果。workingThread和latchingThread共享一个Latch对象,workingThread会调用start方法,通知它正在开始针对特定Object的工作已经开始了。同时,latchingThread将调用latch方法,并传进它希望等待的Object。 当workingThread完成对某一Object(start方法传入的)的工作后,它将调用finish方法,传入该对象,以及工作的结果对象。当finish方法被调用后,调用latch方法的线程被唤醒,返回工作结果给latch方法的调用者。多个线程可以锁存同一个将要完成某些工作的object。一旦任意一个线程调用了finish方法,他们都将被唤醒并返回结果对象。如果调用latch方法时,针对latch对象的工作还没有开始,线程立刻返回,并不会block. 所以start(Object)应该首先被调用。
workingThread调用start(Object)方法,表明它开始工作。 同时,latchingThread调用latch(Object,long)方法,等待workingThread的执行完成。 workingThread执行finish(Object,Object)方法,表示工作完成,此时,latchingThread醒来。start(Object) finish(Object,Object) --> working thread 第二个参数为结果。 ?
latch(Object,long) --> latching thread
2.4.2 nts Latch 实现:
start:
public boolean start(final Object obj)
{
final long timeStart = System.currentTimeMillis();
boolean rv = false;
Barrier b = null;
synchronized (this)
{
if (!latched.containsKey(obj))
{
b = new Barrier("Latch:" + name + "_Obj:" + obj, 1);
latched.put(obj, b); // latched is a synchronizedHashMap
rv = true;
}
}
XLog.exiting("name=" + name, "obj=" + obj, "barrier=" + b, "rv=" + rv, ("Elapsed time="
+ (System.currentTimeMillis() - timeStart) + "ms"));
return rv;
}
finish:
public void finish(final Object obj, final Object result)
{
final long timeStart = System.currentTimeMillis();
final Barrier b;
synchronized (this)
{
b = latched.remove(obj);
if (null != b)
{
// there are waiters that need the result
b.result = result;
try
{
b.enter(0);
}
catch (final InterruptedException e)
{
// ignored
}
}
}
XLog.exiting("name=" + name, "obj=" + obj, "result=" + result, "barrier=" + b, ("Elapsed time="
+ (System.currentTimeMillis() - timeStart) + "ms"));
}
3 CyclicBarrier
CountDownLatch一般用于某个线程A等待若干个其他线程执行完任务之后,它(一个线程)才执行; 而CyclicBarrier一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行。
3.1 实现:
使用Lock和Condition实现。不同于AQS。(Condition是基于AQS实现的)
CyclicBarrier包含下面的域:
/** The lock for guarding barrier entry */
private final ReentrantLock lock = new ReentrantLock();
/** Condition to wait on until tripped */
private final Condition trip = lock.newCondition();
3.1.1 构造函数
当在等待栅栏的线程个数到达预定义的个数时,barrier 发生trip, 但是因为没有预定义的动作,所以不执行任何动作。
1 public CyclicBarrier(int parties) {
2 this(parties, null);
3 }
当barrier发生trip时,会由最后一个进入该barrier的线程执行特定的动作:
1 public CyclicBarrier(int parties, Runnable barrierAction) {
2 if (parties <= 0) throw new IllegalArgumentException();
3 this.parties = parties;
4 this.count = parties;
5 this.barrierCommand = barrierAction;
6 }
CyclicBarrier方法:
await():
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
await(Long time, TimeUtil unit):
1 public int await(long timeout, TimeUnit unit)
2 throws InterruptedException,
3 BrokenBarrierException,
4 TimeoutException {
5 return dowait(true, unit.toNanos(timeout));
6 }
上述两种方法都调用了dowait方法:
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
...
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
} int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
} // loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
/* The lock associated with this Condition is atomically
released and the current thread becomes disabled for thread scheduling
purposes */
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
} if (g.broken)
throw new BrokenBarrierException(); if (g != generation)
return index; if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
breakBarrier:
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}
3.2 使用:
在jdk和traveler code中没有使用。这符合Effective Java 69中的描述。在实际使用中较少见到。
4 Synchronizere与wait()/wait(long)/wait(long,int)/notify()/notifyAll()的比较:
在effective java 69中提到,wait()/wait(long)/wait(long,int)/notify()/notifyAll()不易使用且容易出错。一般来讲应该优选更高级的concurrent container 或者 synchronizer。 其中synchronizer比较常用的是Semaphore和CountDownLatch,而CyclicBarrier和Exchanger 则使用的比较少。说从易用性上来讲,wait()/notify()/notifyAll()更象是汇编语言,并发容器和synchronizer更像是高级语言。 但我在nts代码中看到了很多wait/notify,而Semaphore和CountDownLatch则用的很少。
Lock/Condition()是使用AQS实现的,Lock/Condition() 组合可以用来替代Object.wait()/notify()。 而高级的synchronizer: CountDownLatch& Semaphore也是基于AQS实现的。所以理论上,可以替代wait/notify。 Semaphore和CountDownLatch也都包含了跟wait(long timeout)相对应的方法。
考虑以下在下面的例子中,是否可以用Semaphore& CountDownLatch来代替wait/notify ?
public Connection getConnection(final boolean highpriority) throws SQLException
{
final long sTime = System.currentTimeMillis();
Connection conn = null;
boolean createConnection = false;
boolean waitConnection = false; try
{
while (true)
{
createConnection = false;
waitConnection = false; synchronized (dbConnections)
{ final List<Connection> dbConnectionsToBeFreedTemp = new ArrayList<Connection>();
synchronized (dbConnectionsToBeFreed)
{
if (!dbConnectionsToBeFreed.isEmpty())
{
dbConnectionsToBeFreedTemp.addAll(dbConnectionsToBeFreed);
dbConnectionsToBeFreed.clear();
if (1 == dbConnectionsToBeFreedTemp.size())
{
// only one, so only notify one
dbConnectionsToBeFreed.notify(); // Semaphore.release()
}
else
{
dbConnectionsToBeFreed.notifyAll(); // CountDownLatch.await()
}
}
if (isThrottDown && connectionCount <= Tier.ONE.value * .20F)
{
isThrottDown = false;
dbConnectionsToBeFreed.notifyAll();
}
} if (!dbConnectionsToBeFreedTemp.isEmpty())
{
for (final Connection connToFree : dbConnectionsToBeFreedTemp)
{
if (!closeConnectionIfAgedOut(connToFree))
{
dbConnections.add(connToFree);
}
}
dbConnectionsToBeFreedTemp.clear();
} if (Configuration.NTS_DB_CONNECTION_THROTTLING.getBoolean())
{
getCurrentTier(); if ((!dbConnections.isEmpty() && !isThrottDown)
&& (highpriority || ((connectionCount - dbConnections.size()) < currentTier.value))) //Prevent non-high priority requests from grabbing freed high priority connections if connection is capped
{
conn = dbConnections.remove(0);
} if (null == conn)
{
// See if we should create a new connection or have to wait
// if none are in the stack, then see if we should make another
if ((connectionCount < currentTier.value || (highpriority && (connectionCount < maxConnectionCount)))
&& !isThrottDown)
{ createConnection = true;
connectionCount++;
}
else if (!isScheduled && conn == null && !highpriority && !createConnection
&& currentTier != Tier.THREE && !isThrottDown)
{
WallClock.getInstance().addAlarm(ALARM_NAME,
Configuration.NTS_DB_POOL_STEP_INTERVAL_TIMER.getInt(), alarmStepUpTier);
isScheduled = true;
waitConnection = true;
}
else if (conn == null && !createConnection && currentTier == Tier.THREE
&& connectionCount - dbConnections.size() >= Tier.THREE.value && !isScheduled)
{
WallClock.getInstance().addAlarm(ALARM_THROTTLE_DOWN,
Configuration.NTS_DB_POOL_STEP_INTERVAL_TIMER.getInt(), alarmThrottleDown);
isScheduled = true;
waitConnection = true;
}
else
{
// Connections are maxed out, so we have to wait
waitConnection = true;
}
}
}
else
{
if (!dbConnections.isEmpty()
&& (highpriority || ((connectionCount - dbConnections.size()) <= maxConnectionCount)))
{
conn = dbConnections.remove(0);
} if (null == conn)
{
// See if we should create a new connection or have to wait
// if none are in the stack, then see if we should make another
if ((connectionCount < maxConnectionCount) || highpriority)
{ createConnection = true;
connectionCount++;
}
else
{
// Connections are maxed out, so we have to wait
waitConnection = true;
}
}
}
} if (null != conn)
{
// we have a Connection, so we are done
break;
}
else if (createConnection)
{
if (!isDerby)
{
// update user and password from Configuration
connProps.put("user", Configuration.NTS_DBUSER.getString());
connProps.put("password", Configuration.NTS_DBPASSWORD.getString());
} try
{
conn = DriverManager.getConnection(url, connProps);
}
catch (final SQLException sqle)
{
connectionCount--; // count must be decremented since a connection was never created
DatabaseStatus.reportException(sqle);
throw sqle;
} if (conn != null)
{
DatabaseStatus.clearException();
if (peakConnections < connectionCount)
{
peakConnections = connectionCount;
Stats.setStat(Stats.DB_POOL_PEAK_CONNECTIONS_COUNT, peakConnections);
Stats.setStat(Stats.DB_POOL_PEAK_CONNECTIONS_TIME, new Date().toString());
}
allConnections.put(Integer.valueOf(conn.hashCode()), PersistentStore.createDeathTimeStamp()); break;
}
else if (Configuration.NTS_DB_CONNECTION_THROTTLING.getBoolean())
{
// Unexpected Database exceptions will result in connection == null.
// Instead of retrying the connection, throw a checked exception.
throw new DBConnectionsAllUsedException("There are no DB Connections available");
}
else
{
// don't break which will do the retry
}
}
else if (waitConnection)
{
try
{
Stats.inc(Stats.DB_THREADS_WAITING_FOR_CONNECTION);
synchronized (dbConnectionsToBeFreed)
{
dbConnectionsToBeFreed.wait(); // Semaphore.acquire() CountDownLatch.countdown()
}
}
finally
{
Stats.dec(Stats.DB_THREADS_WAITING_FOR_CONNECTION);
}
// don't break which will do the retry
}
else
{
// should never hit this case
break;
}
}
}
catch (final InterruptedException ie)
{
// expected
throw new SQLException("Exception waiting for DB connection.", ie);
}
finally
{ } return conn;
}
5 进一步问题:
5.1 CyclicBarrier方法,await(): 为什么要提供这两种方法来包裹wrapper dowait方法?
5.2 Semaphore和CountDownLatch互换
final Semaphore sem = new Semaphore(0);
for (int i = 0; i < num_threads; ++ i)
{
Thread t = new Thread() {
public void run()
{
try
{
doStuff();
}
finally
{
sem.release();
}
}
};
t.start();
} sem.acquire(num_threads);
final CountDownLatch latch = new CountDownLatch(num_threads);
for (int i = 0; i < num_threads; ++ i)
{
Thread t = new Thread() {
public void run()
{
try
{
doStuff();
}
finally
{
latch.countDown();
}
}
};
t.start();
} latch.await();
6 参考文献:
http://www.cnblogs.com/dolphin0520/p/3920397.html
----- 单例 -----
单例与static方法的区别: 一个是实例,可以传递,另外一个不可以。(好像也没什么用,传递单例)。
http://stackoverflow.com/questions/519520/difference-between-static-class-and-singleton-pattern#