闭锁
一种可以延迟线程的进度直到其到达终止状态.可以用来确保某些活动直到其他活动都完成后才继续执行
例如:
- 确保某个计算在其需要的所有资源都被初始化了之后才继续执行.
- 确保某个服务在其他依赖的服务都启动了之后才开始执行
- 等待某个操作的所有参与者(如LOL) 都就绪了之后再继续执行.
锁的实现
1.CountDownLatch
CountDownLatch 是一种灵活的闭锁实现. 可以在以上的各种类型情况下使用.它可以使一个或多个线程等待一组事件的发生.
闭锁状态包括一个计数器,该计数器被初始化为一个正数.表示需要被等待的事件的数量. countDown 方法用于递减计数器,表示有一个事件已经发生,而await 方法等待计数器到达零时就会执行, 否则会一直阻塞直到计数器为零,或者等待中的线程中断, 或者等待超时.
import java.util.concurrent.*; public class TestHarness {
public long timeTasks(int nThreads, final Runnable task)
throws InterruptedException {
final CountDownLatch startGate = new CountDownLatch(1);
final CountDownLatch endGate = new CountDownLatch(nThreads); for (int i = 0; i < nThreads; i++) {
Thread t = new Thread() {
public void run() {
try {
startGate.await();
try {
task.run();
} finally {
endGate.countDown();
}
} catch (InterruptedException ignored) {
}
}
};
t.start();
} long start = System.nanoTime();
startGate.countDown();
endGate.await();
long end = System.nanoTime();
return end - start;
} public static void main(String[] args) throws InterruptedException {
new TestHarness().timeTasks(9, new Runnable() {
public void run() {
System.out.println(this);
}
});
}
}
以上程序.它使用两个闭锁,分别表示起始门 "startGate" 和 结束门 "endGate" 来确保所有线程都准备就绪后才继续执行,而每个线程做的最后一件事都是让 "endGate" 减一,这能使主线程高效地等待直到所有工作线程都执行完成,因此可以统计所消耗的时间.
2.FutureTask
futureTask 也可以用作闭锁. futureTask 是通过Callable 来实现的. 相当于一种可用于生产结果的runnable , 并且可以用于以下3钟等待状态
- 等待运行(Waiting to run )
- 正在运行(Running)
- 运行完成(completed)
执行完成 ,表示计算的所有可能结束的方式. 包括 正常结束,由于取消而结束和由于异常而结束等.
Future.get() 的行为取决于任务的状态,如果任务已完成,那么get 会立即返回结果,get 将阻塞直到这个任务进去完成状态.然后返回结果或者抛出异常. FutureTask 将计算结果从执行计算的线程传递到获取这个结果的线程, 而 FutureTask 的规范确保了这种传递的过程能实现结果的安全发布.
public class Preloader {
ProductInfo loadProductInfo() throws DataLoadException {
return null;
} private final FutureTask<ProductInfo> future = new FutureTask<ProductInfo>(
new Callable<ProductInfo>() {
public ProductInfo call() throws DataLoadException {
return loadProductInfo();
}
});
private final Thread thread = new Thread(future); public void start() {
thread.start();
} public ProductInfo get() throws DataLoadException, InterruptedException {
try {
return future.get();
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof DataLoadException)
throw (DataLoadException) cause;
else
throw LaunderThrowable.launderThrowable(cause);
}
} interface ProductInfo {
}
} class DataLoadException extends Exception {
}
3. Semaphore
计数信号量(Counting Semaphore) 用来控制同是访问某个特定资源的操作数量,或者同时执行某个指定操作的数量. 计数信号量还可以用来实现某种资源池,或者对容器施加边界(如:blockingQueue)
Semaphore 中管理着一组虚拟许可(permit), 许可的初始化数量可以通过构造函数来指定,在执行操作前先获取许可(只要还有剩余许可),并在使用后释放,如果没有许可,那么acquire 将阻塞到有许可(或者直到被中断或者操作超时). release 方法将返回一个许可给信号量(许可与线程无关,一个许可可以在一个线程获取,在另一个线程释放,且不具备重入性)
public class BoundedHashSet <T> {
private final Set<T> set;
private final Semaphore sem; public BoundedHashSet(int bound) {
this.set = Collections.synchronizedSet(new HashSet<T>());
sem = new Semaphore(bound);
} public boolean add(T o) throws InterruptedException {
sem.acquire();
boolean wasAdded = false;
try {
wasAdded = set.add(o);
return wasAdded;
} finally {
if (!wasAdded)
sem.release();
}
} public boolean remove(Object o) {
boolean wasRemoved = set.remove(o);
if (wasRemoved)
sem.release();
return wasRemoved;
}
}
4. Barrier
栅栏 类似于闭锁,它能阻塞一组线程直到某个事件发生,栅栏与闭锁的区别关键在于:所有线程必须同时到达栅栏的位置,才能继续,闭锁用于等待某件事情,而栅栏用于实现一些协议.例如: 几个人决定在某个地方集合:'所有人6:00 在 麦当劳碰头,到了以后要等其他人,之后再讨论下一步要做的事.'
CyclicBarrier 可以使一定数量的参与方法反复在栅栏位置会聚,它在并行迭代算法中非常有用,这种算法通常将一个问题拆分成一系列相互独立的子问题,当线程到达栅栏位置时将调用await 方法, 这个方法将阻塞直到所有线程都到达栅栏位置,如果所有线程都到达栅栏位置,那么栅栏将打开, 此时所有线程都被释放,而栅栏将被重置以便下次使用, 如果对await的调用超时,或者await 阻塞的线程被中断, 那么栅栏将被认为是打破了, 所有阻塞的await 调用都将终止并抛出 BrokenBarrierException . 如果成功通过栅栏,那么await 将为每个线程返回一个唯一的到达索引号,我们可以利用这些索引来"选举" 产生一个领导线程.并在下一次迭代中由该领导线程执行一些特殊的工作.CyclicBarrier 还可以是你将一个栅栏操作传递给构造函数,这是一个Runnable ,当成功通过栅栏时会(在一个子任务线程中) 执行它.但在阻塞线程被释放钱是不能执行的.
在模拟程序中经常使用栅栏.
public class CellularAutomata {
private final Board mainBoard;
private final CyclicBarrier barrier;
private final Worker[] workers; public CellularAutomata(Board board) {
this.mainBoard = board;
int count = Runtime.getRuntime().availableProcessors();
this.barrier = new CyclicBarrier(count,
new Runnable() {
public void run() {
mainBoard.commitNewValues();
}});
this.workers = new Worker[count];
for (int i = 0; i < count; i++)
workers[i] = new Worker(mainBoard.getSubBoard(count, i));
} private class Worker implements Runnable {
private final Board board; public Worker(Board board) { this.board = board; }
public void run() {
while (!board.hasConverged()) {
for (int x = 0; x < board.getMaxX(); x++)
for (int y = 0; y < board.getMaxY(); y++)
board.setNewValue(x, y, computeValue(x, y));
try {
barrier.await();
} catch (InterruptedException ex) {
return;
} catch (BrokenBarrierException ex) {
return;
}
}
} private int computeValue(int x, int y) {
// Compute the new value that goes in (x,y)
return 0;
}
} public void start() {
for (int i = 0; i < workers.length; i++)
new Thread(workers[i]).start();
mainBoard.waitForConvergence();
} interface Board {
int getMaxX();
int getMaxY();
int getValue(int x, int y);
int setNewValue(int x, int y, int value);
void commitNewValues();
boolean hasConverged();
void waitForConvergence();
Board getSubBoard(int numPartitions, int index);
}
}
张孝祥的案例:
public class CyclicBarrierTest {
public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
final CyclicBarrier cb = new CyclicBarrier(3); // 三个线程同时到达
for (int i = 0; i < 3; i++) {
Runnable runnable = new Runnable() {
public void run() {
try {
Thread.sleep((long) (Math.random() * 10000));
System.out.println("线程"
+ Thread.currentThread().getName()
+ "即将到达集合地点1,当前已有"
+ (cb.getNumberWaiting() + 1)
+ "个已到达"
+ (cb.getNumberWaiting() == 2 ? "都到齐了,继续走啊"
: "正在等候"));
try {
cb.await();
} catch (BrokenBarrierException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
Thread.sleep((long) (Math.random() * 10000));
System.out.println("线程"
+ Thread.currentThread().getName()
+ "即将到达集合地点2,当前已有"
+ (cb.getNumberWaiting() + 1)
+ "个已到达"
+ (cb.getNumberWaiting() == 2 ? "都到齐了,继续走啊"
: "正在等候"));
try {
cb.await();
} catch (BrokenBarrierException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
Thread.sleep((long) (Math.random() * 10000));
System.out.println("线程"
+ Thread.currentThread().getName()
+ "即将到达集合地点3,当前已有"
+ (cb.getNumberWaiting() + 1)
+ "个已到达"
+ (cb.getNumberWaiting() == 2 ? "都到齐了,继续走啊"
: "正在等候"));
try {
cb.await();
} catch (BrokenBarrierException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
};
service.execute(runnable);
}
service.shutdown();
}
}
另一种形式的栅栏是 Exchanger ,它是一种两方(Two-Party)栅栏 , 各方在栅栏位置上交换数据,当两方执行不对称的操作时, Exchanger 会非常有用.
例如: 当一个线程想缓冲区写入数据, 而另一个线程用缓冲区中读取数据.这些线程可以使用Exchanger来汇聚,并将满的缓冲区与空的缓冲区交换.当两个线程通过Exchanger交换对象时.这种交换就把两个对象安全地发布给另一方.
数据交换的实际取决于应用程序的相应需求. 最简单的方案是. 当缓冲区被填满时,由填充任务进行交换. 当缓冲区为空时,由清空任务进行交换. 这样会把需要交换的次数降至最低, 但如果新数据的到达不可预测,那么一些数据的处理过程就将延迟.另一个方法是,不仅当缓冲区被填满时进行交换. 并且当缓冲区被充到一定程度,并保持一段时间后.也进行交换.
/**
*
*/
package mjorcen.nio.test2; import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Exchanger; /**
*
*
* @author mjorcen
* @email [email protected]
* @dateTime Jan 19, 2015 6:57:56 PM
* @version 1
*/
public class ExchangerTest {
final Exchanger<List<String>> exchanger; public ExchangerTest(Exchanger<List<String>> exchanger) {
super();
this.exchanger = exchanger;
} public static void main(String[] args) {
Exchanger<List<String>> exchanger = new Exchanger<List<String>>(); new Thread(new ExchangerThread01(exchanger)).start();
new Thread(new ExchangerThread02(exchanger)).start(); }
} class ExchangerThread01 implements Runnable {
final Exchanger<List<String>> exchanger; public ExchangerThread01(Exchanger<List<String>> exchanger) {
super();
this.exchanger = exchanger;
} /*
* (non-Javadoc)
*
* @see java.lang.Runnable#run()
*/
public void run() {
System.out.println("ExchangerThread01 begin ... ");
try {
List<String> list = new LinkedList<String>();
for (int i = 0; i < 20; i++) {
list.add("str_01_" + i);
}
list = exchanger.exchange(list);
for (String string : list) {
System.out.println("Thread01 is " + string);
}
System.out.println("ExchangerThread01 end ... ");
} catch (InterruptedException e) {
e.printStackTrace();
}
} } class ExchangerThread02 implements Runnable {
final Exchanger<List<String>> exchanger; public ExchangerThread02(Exchanger<List<String>> exchanger) {
super();
this.exchanger = exchanger;
} /*
* (non-Javadoc)
*
* @see java.lang.Runnable#run()
*/
public void run() {
System.out.println("ExchangerThread02 begin... ");
List<String> list = new LinkedList<String>();
for (int i = 0; i < 10; i++) {
list.add("str_02_" + i);
}
try {
Thread.sleep(1000);
list = exchanger.exchange(list);
for (String string : list) {
System.out.println("Thread02 is " + string);
}
System.out.println("ExchangerThread02 end ... ");
} catch (InterruptedException e) {
e.printStackTrace();
}
} }
6. 构建高效且可伸缩的结果缓存
public class Memoizer <A, V> implements Computable<A, V> {
private final ConcurrentMap<A, Future<V>> cache
= new ConcurrentHashMap<A, Future<V>>();
private final Computable<A, V> c; public Memoizer(Computable<A, V> c) {
this.c = c;
} public V compute(final A arg) throws InterruptedException {
while (true) {
Future<V> f = cache.get(arg);
if (f == null) {
Callable<V> eval = new Callable<V>() {
public V call() throws InterruptedException {
return c.compute(arg);
}
};
FutureTask<V> ft = new FutureTask<V>(eval);
f = cache.putIfAbsent(arg, ft);
if (f == null) {
f = ft;
ft.run();
}
}
try {
return f.get();
} catch (CancellationException e) {
cache.remove(arg, f);
} catch (ExecutionException e) {
throw LaunderThrowable.launderThrowable(e.getCause());
}
}
}
}
以上内容出自: <<java并发编程实践>>