第二十一章:并发
基本的线程机制
并发编程使我们可以将程序划分为多个分离的、独立运行的任务。通过使用多线程机制,这些独立任务(也被称为子任务)中的每一个都将由执行线程来驱动。一个线程就是在进程中的一个单一的顺序控制流,因此,单个进程可以拥有多个并发执行的任务,但是你的程序使得每个任务都好像有其自己的CPU一样。其底层机制是切分CPU时间,但通常你不需要考虑它。
线程模型为编程带来了便利,它简化了在单一程序中同时交织在一起的多个操作的处理。在使用线程时,CPU将轮流给每个任务分配其占用时间“。每个任务都觉得自己在一直占用CPU,但事实上CPU时间是划分成片段分配给了所有的任务(例外情况是程序确实运行在多个CPU之上)。线程的一大好处是可以使你从这个层次抽身出来,即代码不必知道它是运行在具有一个还是多个CPU的机器上。所以,使用线程机制,是一种建立透明的、可扩展的程序的方法,如果程序运行的太慢,为机器增添一个CPU就很容易的加快程序的运行速度。多任务和多线程往往是使用多处理器系统的最合理方式。
定义任务
线程可以驱动任务,因此你需要一种描述任务的方式。
- 使用Runnable接口:要想定义任务,只需实现Runable接口并编写run()方法,使得该任务可以执行你的命令,例如,下面的LiftOff任务将显示发射之前的倒计时:
package concurrency;
/**
* @author Mr.Sun
* @date 2022年09月03日 10:14
*
* 演示通过实现Runnable接口定义任务
*
* <p>
* 显示发射之前的倒计时
* </p>
*/
public class LiftOff implements Runnable {
protected int countDown = 10;
private static int taskCount = 0;
private final int id = taskCount++;
public LiftOff() {
}
public LiftOff(int countDown) {
this.countDown = countDown;
}
public String status() {
return "#" + id + "(" + (countDown > 0 ? countDown : "LiftOff!") + "). ";
}
@Override
public void run() {
while (countDown-- > 0) {
System.out.print(status());
Thread.yield();
}
}
} ///:~
标识符id可以用来区分任务的多个实例,它是final的,因为它一旦被初始化之后就不希望被修改。任务的run()方法通常总会有某种形式的循环,使得任务一直运行下去直到不再需要,所以要设定跳出循环的条件(有一种选择是直接从run)返回)。通常,run()被写成无限循环的形式,这就意味着,除非有某个条件使得run()终止,否则它将永远运行下去(在本章后面将会看到如何安全地终止线程)。
在run()中对静态方法Thread.yield()的调用是对线程调度器(Java线程机制的一部分,可以将CPU从一个线程转移给另一个线程)的一种建议,它在声明:"我已经执行完生命周期中最重要的部分了,此刻正是切换给其他任务执行一段时间的大好时机。”这完全是选择性的,但是这里使用它是因为它会在这些示例中产生更加有趣的输出∶你更有可能会看到任务换进换出的证据。
在下面的实例中,这个任务的run()不是由单独的线程驱动的,它是在main()中直接调用的(实际上,这里仍旧使用了线程,即总是分配给main()的那个线程)∶
package concurrency;
/**
* @author Mr.Sun
* @date 2022年09月03日 10:23
*/
public class MainThread {
public static void main(String[] args) {
LiftOff launch = new LiftOff();
launch.run();
}
} /* Output:
#0(9). #0(8). #0(7). #0(6). #0(5). #0(4). #0(3). #0(2). #0(1). #0(LiftOff!).
*///:~
- Thread类:将Runnable对象转变为工作任务的传统方式是把它交给一个Thread构造器,下面的示例展示了如何使用Thread来驱动LiftOff对象:
package concurrency;
/**
* @author Mr.Sun
* @date 2022年09月03日 10:29
*
* 将Runnable对象转变为工作任务的传统方式是把它交给一个Thread构造器
*/
public class BasicThreads {
public static void main(String[] args) {
Thread t = new Thread(new LiftOff());
t.start();
System.out.println("Waiting for LiftOff");
}
} /* Output:
Waiting for LiftOff
#0(9). #0(8). #0(7). #0(6). #0(5). #0(4). #0(3). #0(2). #0(1). #0(LiftOff!).
*///:~
- 使用Executor
Java SE5的java.util.concurrent包中的执行器(Executor)将为你管理Thread对象,从而简化了并发编程。Executor在客户端和任务执行之间提供了一个间接层;与客户端直接执行任务不同,这个中介对象将执行任务。Executor允许你管理异步任务的执行,而无须显式地管理线程的生命周期。Executor在Java SE5/6中是启动任务的优选方法。
我们可以使用Executor来代替在BasicThreads.java中显示地创建Thread对象。LiftOff 对象知道如何运行具体的任务,与命令设计模式一样,它暴露了要执行的单一方法。ExecutorService(具有服务生命周期的Executor,例如关闭)知道如何构建恰当的上下文来执行Runnable对象。在下面的示例中,CachedThreadPool将为每个任务都创建一个线程。注意,ExecutorService对象是使用静态的Executor方法创建的,这个方法可以确定其Executor类型∶
package concurrency;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author Mr.Sun
* @date 2022年09月03日 16:04
*
* 使用Executor
*/
public class CachedThreadPool {
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
exec.execute(new LiftOff());
}
exec.shutdown();
}
}/* Output:
#0(9). #1(9). #2(9). #3(9). #0(8). #4(9). #1(8). #2(8). #3(8). #0(7). #4(8). #1(7). #2(7). #3(7). #0(6). #4(7). #1(6). #2(6). #3(6). #0(5). #1(5). #4(6). #3(5). #1(4). #2(5). #4(5). #0(4). #1(3). #2(4). #3(4). #0(3). #4(4). #1(2). #2(3). #3(3). #0(2). #4(3). #1(1). #2(2). #3(2). #0(1). #4(2). #1(LiftOff!). #2(1). #3(1). #0(LiftOff!). #4(1). #2(LiftOff!). #3(LiftOff!). #4(LiftOff!).
*///:~
你可以很容易地将上面示例中的CachedThreadPool替换为不同类型的Executor。
package concurrency;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author Mr.Sun
* @date 2022年09月03日 16:10
*/
public class FixedThreadPool {
public static void main(String[] args) {
ExecutorService exec = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
exec.execute(new LiftOff());
}
exec.shutdown();
}
}
有了FixedThreadPool,你就可以一次性预先执行代价高昂的线程分配,因而也就可以限制线程的数量了。这可以节省时间,因为你不用为每个任务都固定地付出创建线程的开销。在事件驱动的系统中,需要线程的事件处理器,通过直接从池中获取线程,也可以如你所愿地尽快得到服务。你不会滥用可获得的资源,因为FixedThreadPool使用的Thread对象的数量是有界的。
注意,在任何线程池中,现有线程在可能的情况下,都会被自动复用。
尽管本书将使用CachedThreadPool,但是也应该考虑在产生线程的代码中使用FixedThreadPool。CachedThreadPool在程序执行过程中通常会创建与所需数量相同的线程,然后在它回收旧线程时停止创建新线程,因此它是合理的Executor的首选。只有当这种方式会引发问题时,你才需要切换到FixedThreadPool。
SingleThreadExecutor就像是线程数量为1的FixedThreadPool。这对于你希望在另一个线程中连续运行的任何事物(长期存活的任务)来说,都是很有用的,例如监听进入的套接字连接的任务。它对于希望在线程中运行的短任务也同样很方便,例如,更新本地或远程日志的小任务,或者是事件分发线程。
如果向SingleThreadExecutor提交了多个任务,那么这些任务将排队,每个任务都会在下一个任务开始之前运行结束,所有的任务将使用相同的线程。在下面的示例中,你可以看到每个任务都是按照它们被提交的顺序,并且是在下一个任务开始之前完成的。因此,SingleThread-Executor会序列化所有提交给它的任务,并会维护它自己(隐藏)的悬挂任务队列。
package concurrency;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author Mr.Sun
* @date 2022年09月03日 16:17
*/
public class SingleThreadExecutor {
public static void main(String[] args) {
ExecutorService exec = Executors.newSingleThreadExecutor();
for (int i = 0; i < 5; i++) {
exec.execute(new LiftOff());
}
exec.shutdown();
}
} /* Output:
#0(9). #0(8). #0(7). #0(6). #0(5). #0(4). #0(3). #0(2). #0(1). #0(LiftOff!). #1(9). #1(8). #1(7). #1(6). #1(5). #1(4). #1(3). #1(2). #1(1). #1(LiftOff!). #2(9). #2(8). #2(7). #2(6). #2(5). #2(4). #2(3). #2(2). #2(1). #2(LiftOff!). #3(9). #3(8). #3(7). #3(6). #3(5). #3(4). #3(3). #3(2). #3(1). #3(LiftOff!). #4(9). #4(8). #4(7). #4(6). #4(5). #4(4). #4(3). #4(2). #4(1). #4(LiftOff!).
*///:~
作为另一个示例,假设你有大量的线程,那它们运行的任务将使用文件系统。你可以用SingleThreadExecutor来运行这些线程,以确保任意时刻在任何线程中都只有唯一的任务在运行。在这种方式中,你不需要在共享资源上处理同步(同时不会过度使用文件系统)。有时更好的解决方案是在资源上同步(你将在本章稍后学习),但是SingleThreadExecutor可以让你省去只是为了维持某些事物的原型而进行的各种协调努力。通过序列化任务,你可以消除对序列化对象的需求。
- 从任务种产生返回值:Runnable是执行工作的独立任务,但是它不返回任何值。如果你希望在任务完成时能够返回一个值,那么可以实现Callable接口而不是Runnable接口。在Java SE5中引入的Callable是一种具有类型参数的泛型,它的类型参数表示的是从方法call()中返回的值,并且必须使用ExecutorService.submit()方法调用它,下面是一个简单示例:
package concurrency;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
/**
* @author Mr.Sun
* @date 2022年09月03日 16:23
*
* 从任务中产生返回值
*/
public class CallableDemo {
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
List<Future<String>> futureList = new ArrayList<>();
for (int i = 0; i < 10; i++) {
futureList.add(exec.submit(new TaskWithResult(i)));
}
for (Future<String> result : futureList) {
try {
// get() 会阻塞知道任务完成
System.out.println(result.get());
} catch (InterruptedException e) {
System.out.println(e);
return;
} catch (ExecutionException e) {
System.out.println(e);
} finally {
exec.shutdown();
}
}
}
}
class TaskWithResult implements Callable<String> {
private int id;
public TaskWithResult(int id) {
this.id = id;
}
@Override
public String call() throws Exception {
return "result of TaskWithResult " + id;
}
} /* Output:
result of TaskWithResult 0
result of TaskWithResult 1
result of TaskWithResult 2
result of TaskWithResult 3
result of TaskWithResult 4
result of TaskWithResult 5
result of TaskWithResult 6
result of TaskWithResult 7
result of TaskWithResult 8
result of TaskWithResult 9
*///:~
submit()方法会产生Future对象,它用Callable返回结果的特定类型进行了参数化。你可以用isDone()方法来查询Future是否已经完成。当任务完成时,它具有一个结果,你可以调用get()方法来获取该结果。你也可以不用isDone()进行检查就直接调用get(),在这种情况下,get()将阻塞,直至结果准备就绪。你还可以在试图调用get()来获取结果之前,先调用具有超时的get(),或者调用isDone()来查看任务是否完成。
捕获异常
由于线程的本质特性,使得你不能捕获从线程中逃逸的异常。下面的任务总是抛出一个异常,该异常会传播到run()方法的外部,并且main()展示了当你运行它时所发生的事情:
package concurrency;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author Mr.Sun
* @date 2022年09月03日 17:00
*
* 线程抛出异常
*/
public class ExceptionThread implements Runnable {
@Override
public void run() {
throw new RuntimeException();
}
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new ExceptionThread());
exec.shutdown();
}
}
运行结果如下图:
将main()的主体放到try-catch子句中是没有作用的,如下:
我们发现使用try-catch后,异常仍未被捕获。
为了解决这个问题,我们要修改Executor产生线程的方式。Thread.UncaughtExceptionHandler是Java SE5中的新接口,它允许你在每个Thread对象上都附着一个异常处理器。Thread.UncaughtExceptionHandler.uncaughtException()会在线程因未捕获的异常而临近死亡时被调用。为了使用它,我们创建了一个新类型的ThreadFactory,它将在每个新创建的Thread对象上附着一个Thread.UncaughtExceptionHandler。我们将这个工厂传递给Executors创建新的ExecutorService的方法:
package concurrency;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
/**
* @author Mr.Sun
* @date 2022年09月03日 17:10
*
* 为每个Thread对象附着一个异常处理器
*/
public class CaptureUncaughtException {
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool(new HandlerThreadFactory());
exec.execute(new ExceptionThread2());
}
}
class ExceptionThread2 implements Runnable {
@Override
public void run() {
Thread t = Thread.currentThread();
System.out.println("run() by " + t);
System.out.println("eh = " + t.getUncaughtExceptionHandler());
throw new RuntimeException();
}
}
class MyUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
@Override
public void uncaughtException(Thread t, Throwable e) {
System.out.println("caught " + e);
}
}
class HandlerThreadFactory implements ThreadFactory {
@Override
public Thread newThread(Runnable r) {
System.out.println(this + " creating new Thread");
Thread t = new Thread(r);
System.out.println("created " + t);
t.setUncaughtExceptionHandler(new MyUncaughtExceptionHandler());
System.out.println("eh = " + t.getUncaughtExceptionHandler());
return t;
}
} /* Output:
concurrency.HandlerThreadFactory@1540e19d creating new Thread
created Thread[Thread-0,5,main]
eh = concurrency.MyUncaughtExceptionHandler@677327b6
run() by Thread[Thread-0,5,main]
eh = concurrency.MyUncaughtExceptionHandler@677327b6
caught java.lang.RuntimeException
*///:~
在程序中添加了额外的跟踪机制,用来验证工厂创建的线程会传递给UncaughtExceptionHandler。你可以看到,未捕获的异常是通过uncaughtException来捕获的。
共享受限资源
1. 不正确访问资源
考虑下面的例子,其中一个任务产生产生偶数,而其他任务消费这些数字。而这些消费者任务的唯一工作就是校验偶数的有效性
package concurrency;
/**
* @author Mr.Sun
* @date 2022年09月03日 21:49
*
* <p>
* 创建一个名为IntGenerator的抽象类,它包含EventChecker必须了解的必不可少的方法,即一个next()方法,和一个可以执行撤销的方法
* </p>
*/
public abstract class IntGenerator {
private volatile boolean canceled = false;
public abstract int next();
/**
* 修改canceled标志的状态
*/
public void cancel() {
canceled = true;
}
/**
* 查看该对象是否已被撤销
*/
public boolean isCanceled() {
return canceled;
}
}
任何IntGenerator 都可以使用下面的EventChecker类来测试:
package concurrency;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author Mr.Sun
* @date 2022年09月03日 21:52
*
* 校验偶数的有效性
*/
public class EventChecker implements Runnable {
private IntGenerator generator;
private final int id;
public EventChecker(IntGenerator generator, int id) {
this.generator = generator;
this.id = id;
}
@Override
public void run() {
while (!generator.isCanceled()) {
int val = generator.next();
if (val % 2 != 0) {
System.out.println(val + " not event");
// 取消 事件检查
generator.cancel();
}
}
}
public static void test(IntGenerator gp, int count) {
System.out.println("按 Ctrl + C 退出");
ExecutorService exec = Executors.newCachedThreadPool();
for (int i = 0; i < count; i++) {
exec.execute(new EventChecker(gp, i));
}
exec.shutdown();
}
public static void test(IntGenerator gp) {
// count 默认值
test(gp, 10);
}
}
注意,在本例中可以被撤销的类不是Runnable,而所有依赖于IntGenerator对象的EvenChecker任务将测试它,以查看它是否已经被撤销,正如你在run()中所见。通过这种方式,共享公共资源(IntGenerator)的任务可以观察该资源的终止信号。这可以消除所谓竞争条件,即两个或更多的任务竞争响应某个条件,因此产生冲突或不一致结果的情况。你必须仔细考虑并防范并发系统失败的所有可能途径,例如,一个任务不能依赖于另一个任务,因为任务关闭的顺序无法得到保证。这里,通过使任务依赖于非任务对象,我们可以消除潜在的竞争条件。
test()方法通过启动大量使用相同的IntGenerator的EvenChecker,设置并执行对任何类型的IntGenerator的测试。如果IntGenerator引发失败,那么test()将报告它并返回,否则,你必须按下Ctrl-C来终止它。
我们看到的第一个IntGenerator有一个可以产生一系列数值的next():
package concurrency;
/**
* @author Mr.Sun
* @date 2022年09月03日 22:02
*/
public class EventGenerator extends IntGenerator {
private int currentEventVal = 0;
@Override
public int next() {
++currentEventVal;
// 可能出现资源竞争问题的地方
++currentEventVal;
return currentEventVal;
}
public static void main(String[] args) {
EventChecker.test(new EventGenerator());
}
}
一个任务有可能在另一个任务执行第一个对currentEvenValue的递增操作之后,但是没有执行第二个操作之前,调用next()方法。这将使这个值处于“不恰当”的状态。为了证明这是可能发生的,EvenChecker.test()创建了一组EvenChecker对象,以连续地读取并输出同一个EvenGenerator,并测试检查每个数值是否都是偶数。如果不是,就会报告错误,而程序也将关闭。
有一点很重要,那就是要注意到递增程序自身也需要多个步骤,并且在递增过程中任务可能会被线程机制挂起——也就是说,在Java中,递增不是原子性的操作。因此,如果不保护任务,即使单一的递增也不是安全的。
2. 解决共享资源竞争
前面的示例展示了使用线程时的一个基本问题∶你永远都不知道一个线程何时在运行。想象一下,你坐在桌边手拿叉子,正要去叉盘子中的最后一片食物,当你的叉子就要够着它时,这片食物突然消失了,因为你的线程被挂起了,而另一个餐者进入并吃掉了它。这正是在你编写并发程序时需要处理的问题。对于并发工作,你需
某种方式来防止两个任务访问相同的资源,至少在关键阶段不能出现这种情况。
防止这种冲突的方法就是当资源被一个任务使用时,在其上加锁。第一个访问某项资源的任务必须锁定这项资源,使其他任务在其被解锁之前,就无法访问它了,而在其被解锁之时,另一个任务就可以锁定并使用它,以此类推。
基本上所有的并发模式在解决线程冲突问题的时候,都是采用序列化访问共享资源的方案。这意味着在给定时刻只允许一个任务访问共享资源。通常这是通过在代码前面加上一条锁语句来实现的,这就使得在一段时间内只有一个任务可以运行这段代码。因为锁语句产生了一种互相排斥的效果,所以这种机制常常称为互斥量(mutex)。
考虑一下屋子里的浴室多个人(即多个由线程驱动的任务)都希望能单独使用浴室(即共享资源)。为了使用浴室,一个人先敲门,看看是否能使用。如果没人的话,他就进入浴室并锁上门。这时其他人要使用浴室的话,就会被“阻挡”,所以他们要在浴室门口等待,直到浴室可以使用。当浴室使用完毕,就该把浴室给其他人使用了(别的任务就可以访问资源了),这个比喻就有点不太准确了。事实上,人们并没有排队,我们也不能确定谁将是下一个使用浴室的人,因为线程调度机制并不是确定性的。实际情况是∶等待使用浴室的人们簇拥在浴室门口,当锁住浴室门的那个人打开锁准备离开的时候,离门最近的那个人可能进入浴室。如前所述,可以通过yield()和setPriority()来给线程调度器提供建议,但这些建议未必会有多大效果,这取决于你的具体平台和JVM实现。
1. 使用synchronized关键字
Java以提供关键字synchronized的形式,为防止资源冲突提供了内置支持。当任务要执行被synchronized关键字保护的代码片段的时候,它将检查锁是否可用,然后获取锁,执行代码,释放锁。共享资源一般是以对象形式存在的内存片段,但也可以是文件、输入/输出端口,或者是打印机。要控制对共享资源的访问,得先把它包装进一个对象。然后把所有要访问这个资源的方法标记为synchronized。如果某个任务处于一个对标记为synchronized的方法的调用中,那么在这个线程从该方法返回之前,其他所有要调用类中任何标记为synchronized方法的线程都会被阻塞。
在生成偶数的代码中,你已经看到了,你应该将类的数据成员都声明为private的,而且只能通过方法来访问这些数据;所以可以把方法标记为synchronized来防止资源冲突。下面是声明synchronized方法的方式∶
synchronized void f() { /* ... */ }
synchronized void g() { /* ... */ }
所有对象都自动含有单一的锁(也称为监视器)。当在对象上调用其任意synchronized方法的时候,此对象都被加锁,这时该对象上的其他synchronized方法只有等到前一个方法调用完毕并释放了锁之后才能被调用。对于前面的方法,如果某个任务对对象调用了f(),对于同一个对象而言,就只能等到f()调用结束并释放了锁之后,其他任务才能调用f()和g()。所以,对于某个特定对象来说,其所有synchronized方法共享同一个锁,这可以被用来防止多个任务同时访问被编码为对象内存。
注意,在使用并发时,将域设置为private是非常重要的,否则,synchronized关键字就不能防止其他任务直接访问域,这样就会产生冲突。
一个任务可以多次获得对象的锁。如果一个方法在同一个对象上调用了第二个方法,后者又调用了同一对象上的另一个方法,就会发生这种情况。JVM负责跟踪对象被加锁的次数。如果一个对象被解锁(即锁被完全释放),其计数变为0。在任务第一次给对象加锁的时候,计数变为1。每当这个相同的任务在这个对象上获得锁时,计数都会递增。显然,只有首先获得了锁的任务才能允许继续获取多个锁。每当任务离开一个synchronized方法,计数递减,当计数为零的时候,锁被完全释放,此时别的任务就可以使用此资源。
针对每个类,也有一个锁(作为类的Class对象的一部分);所以synchronized static方法可以在类的范围内防止对static数据的并发访问。
同步控制EventGenerator,通过在EventGenerator.java中加入synchronized 关键字,可以防止不希望的线程访问:
package concurrency;
/**
* @author Mr.Sun
* @date 2022年09月04日 10:49
*
* 同步控制EventGenerator
*/
public class SyncEventGenerator extends IntGenerator {
private int currentCountVal = 0;
@Override
public synchronized int next() {
++currentCountVal;
Thread.yield();
++currentCountVal;
return currentCountVal;
}
public static void main(String[] args) {
EventChecker.test(new SyncEventGenerator());
}
}
对Thread.yield()的调用被插入到了两个递增操作之间,以提高在currentEvenValue是奇数状态时上下文切换的可能性。因为互斥可以防止多个任务同时进入临界区,所以这不会产生任何失败。但是如果失败将会发生,调用yield()是一种促使其发生的有效方式。
第一个进入next()的任务将获得锁,任何其他试图获取锁的任务都将从其开始尝试之时被阻塞,直至第一个任务释放锁。通过这种方式,任何时刻只有一个任务可以通过由互斥量看护的代码。
2. 使用显示的Lock对象
Java SE5的java.util.concurrent类库还包含有定义在java.util.concurrent.locks中的显式的互斥机制。Lock对象必须被显式地创建、锁定和释放。因此,它与内建的锁形式相比,代码缺乏优雅性。但是,对于解决某些类型的问题来说,它更加灵活。下面用显式的Lock重写的是SyncEventGenerator.java:
package concurrency;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author Mr.Sun
* @date 2022年09月04日 11:13
*
* 使用显示的Lock对象重写SyncEventGenerator
*/
public class MutexEventGenerator extends IntGenerator{
private int currentCountVal = 0;
private Lock lock = new ReentrantLock();
@Override
public int next() {
lock.lock();
try {
++currentCountVal;
Thread.yield();
++currentCountVal;
return currentCountVal;
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
EventChecker.test(new MutexEventGenerator());
}
}
MutexEvenGenerator添加了一个被互斥调用的锁,并使用lock()和unlock()方法在next()内部创建了临界资源。当你在使用Lock对象时,将这里所示的惯用法内部化是很重要的:紧接着的对lock()的调用,你必须放置在finally子句中带有unlock()的try-finally语句中。注意,return 语句必须在try子句中出现,以确保unlock()不会过早发生,从而将数据暴露给了第二个任务。
尽管try-finally所需的代码比synchronized关键字要多,但是这也代表了显式的Lock对象的优点之一。如果在使用synchronized关键字时,某些事物失败了,那么就会抛出一个异常。但是你没有机会去做任何清理工作,以维护系统使其处于良好状态。有了显式的Lock对象,你就可以使用finally子句将系统维护在正确的状态了。
大体上,当你使用synchronized关键字时,需要写的代码量更少,并且用户错误出现的可能性也会降低,因此通常只有在解决特殊问题时,才使用显式的Lock对象。例如,用synchronized 关键字不能尝试着获取锁且最终获取锁会失败,或者尝试着获取锁一段时间,然后放弃它,要实现这些,你必须使用concurrent类库∶
package concurrency;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author Mr.Sun
* @date 2022年09月04日 11:20
*
* 演示使用Lock对象尝试获取锁
*/
public class AttemptLocking {
private ReentrantLock lock = new ReentrantLock();
public void unTimed () {
boolean captured = lock.tryLock();
try {
System.out.println("tryLock(): " + captured);
} finally {
// 获取到锁才能执行释放锁操作,否则会出现IllegalMonitorStateException异常
if (captured) {
lock.unlock();
}
}
}
public void timed () {
boolean captured = false;
try {
captured = lock.tryLock(2, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
try {
System.out.println("lock.tryLock(2, TimeUnit.MINUTES): " + captured);
} finally {
if (captured) {
lock.unlock();
}
}
}
public static void main(String[] args) throws InterruptedException {
final AttemptLocking al = new AttemptLocking();
al.unTimed(); // True -- 锁是可用的
al.timed(); // True -- 锁是可用的
// 现在创建一个单独的任务来获取锁
new Thread() {
{setDaemon(true);}
@Override
public void run() {
al.lock.lock();
System.out.println("acquired");
}
}.start();
// 给第二个任务一个机会
Thread.sleep(100);
al.unTimed(); // false -- lock grabbed by task
al.timed(); // false -- lock grabbed by task
}
} /* Output:
tryLock(): true
lock.tryLock(2, TimeUnit.MINUTES): true
acquired
tryLock(): false
lock.tryLock(2, TimeUnit.MINUTES): false
*///:~
ReentrantLock允许你尝试着获取但最终未获取锁,这样如果其他人已经获取了这个锁,那你就可以决定离开去执行其他一些事情,而不是等待直至这个锁被释放,就像在untimed()方法中所看到的。在timed()中,做出了尝试去获取锁,该尝试可以在2秒之后失败(注意,使用了Java SE5的TimeUnit类来指定时间单位)。在main()中,作为匿名类而创建了一个单独的Thread,它将获取锁,这使得untimed()和timed()方法对某些事物将产生竞争。
显式的Lock对象在加锁和释放锁方面,相对于内建的synchronized锁来说,还赋予了你更细粒度的控制力。这对于实现专有同步结构是很有用的,例如用于遍历链接列表中的节点的节节传递的加锁机制(也称为锁耦合),这种遍历代码必须在释放当前节点的锁之前捕获下一个节点的锁。
3. 关于volatile
volatile关键字确保了应用中的可视性。如果你将一个域声明为volatile的,那么只要对这个域产生了写操作,那么所有的读操作就都可以看到这个修改。即便使用了本地缓存,情况也确实如此,volatile域会立即被写入到主存中,而读取操作就发生在主存中。
理解原子性和易变性是不同的概念这一点很重要。在非volatile域上的原子操作不必刷新到主存中去,因此其他读取该域的任务也不必看到这个新值。如果多个任务在同时访问某个域,那么这个域就应该是volatile的,否则,这个域就应该只能经由同步来访问。同步也会导致向主存中刷新,因此如果一个域完全由synchronized方法或语句块来防护,那就不必将其设置为是volatile的。
使用volatile而不是synchronized的唯一安全的情况是类中只有一个可变的域。再次提醒,你的第一选择应该是使用synchronized关键字,这是最安全的方式,而尝试其他任何方式都是有风险的。
原子类
Java SE5引入了诸如AtomicInteger、AtomicLong、AtomicReference等特殊的原子性变量类,它们提供下面形式的原子性条件更新操作∶
boolean compareAndSet(expectedValue, updateValue);
这些类被调整为可以使用在某些现代处理器上的可获得的,并且是在机器级别上的原子性,因此在使用它们时,通常不需要担心。对于常规编程来说,它们很少会派上用场,但是在涉及性能调优时,它们就大有用武之地了。例如,我们可以使用AtomicInteger来重写MutexEventGenerator.java∶
package concurrency;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author Mr.Sun
* @date 2022年09月04日 15:17
*
* 使用AtomicInteger来重写MutexEventGenerator
*/
public class AtomicEventGenerator extends IntGenerator {
private AtomicInteger currentEventVal = new AtomicInteger(0);
@Override
public int next() {
return currentEventVal.addAndGet(2);
}
public static void main(String[] args) {
EventChecker.test(new AtomicEventGenerator());
}
}
所有其他形式的同步通过使用AtomicInteger得到了根除。
应该强调的是,Atomic类被设计用来构建javautil.concurrent中的类,因此只有在特殊情况下才在自己的代码中使用它们,即便使用了也需要确保不存在其他可能出现的问题。通常依赖于锁要更安全一些(要么是synchronized关键字,要么是显式的Lock对象)。
临界区
有时,你只是希望防止多个线程同时访问方法内部的部分代码而不是防止访问整个方法。通过这种方式分离出来的代码段被称为临界区(critical section),它也使用synchronized关键字建立。这里,synchronized被用来指定某个对象,此对象的锁被用来对花括号内的代码进行同步控制∶
synchronized(syncObject) {
// This code can be accessed
// by only one task at a time 1
}
这也被称为同步拉制块;在进入此段代码前,必须得到syncObject对象的锁。如果其他线程已经得到这个锁,那么就得等到锁被释放以后,才能进入临界区。
通过使用同步控制块,而不是对整个方法进行同步控制,可以使多个任务访问对象的时间性能得到显著提高,下面的例子比较了这两种同步控制方法。此外,它也演示了如何把一个非保护类型的类,在其他类的保护和控制之下,应用于多线程的环境∶
package concurrency;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author Mr.Sun
* @date 2022年09月04日 15:27
*
* 临界区 测试
* <p>
* 演示了如何把一个非线程安全的类,在其他类的保护和控制之下,应用于多线程的环境
* </p>
*/
public class CriticalSelection {
public static void main(String[] args) {
PairManager pman1 = new PairManager1(),
pman2 = new PairManager2();
testApproaches(pman1, pman2);
}
// 测试两种不同的方法
static void testApproaches(PairManager pman1, PairManager pman2) {
ExecutorService exec = Executors.newCachedThreadPool();
PairManipulator
pm1 = new PairManipulator(pman1),
pm2 = new PairManipulator(pman2);
PairChecker
pcheck1 = new PairChecker(pman1),
pcheck2 = new PairChecker(pman2);
exec.execute(pm1);
exec.execute(pm2);
exec.execute(pcheck1);
exec.execute(pcheck2);
try {
TimeUnit.MILLISECONDS.sleep(200);
} catch(InterruptedException e) {
System.out.println("Sleep interrupted");
} finally {
exec.shutdown();
}
System.out.println("pm1: " + pm1 + "\npm2: " + pm2);
System.exit(0);
}
}
class PairChecker implements Runnable {
private PairManager pm;
public PairChecker(PairManager pm) {
this.pm = pm;
}
@Override
public void run() {
while(true) {
pm.checkCounter.incrementAndGet();
pm.getPair().checkState();
}
}
}
class PairManipulator implements Runnable {
private PairManager pm;
public PairManipulator(PairManager pm) {
this.pm = pm;
}
@Override
public void run() {
while(true) {
pm.increment();
}
}
public String toString() {
return "Pair: " + pm.getPair() + " checkCounter = " + pm.checkCounter.get();
}
}
// 线程安全的Pair
abstract class PairManager {
AtomicInteger checkCounter = new AtomicInteger(0);
protected Pair p = new Pair();
private List<Pair> storage = Collections.synchronizedList(new ArrayList<>());
public synchronized Pair getPair() {
// 复制一份以确保原件的安全:
return new Pair(p.getX(), p.getY());
}
// 假设这是一个耗时的操作
protected void store(Pair p) {
storage.add(p);
try {
TimeUnit.MILLISECONDS.sleep(50);
} catch (InterruptedException ignore) {}
}
public abstract void increment();
}
// 同步整个方法
class PairManager1 extends PairManager {
@Override
public synchronized void increment() {
p.incrementX();
p.incrementY();
store(getPair());
}
}
// 同步临界区代码块
class PairManager2 extends PairManager {
@Override
public void increment() {
Pair temp;
synchronized(this) {
p.incrementX();
p.incrementY();
temp = getPair();
}
store(temp);
}
}
// 非线程安全
class Pair {
private int x, y;
public Pair(int x, int y) {
this.x = x;
this.y = y;
}
public Pair() {
this(0, 0);
}
public int getX() {
return x;
}
public int getY() {
return y;
}
public void incrementX () {
// 自增操作不是线程安全的
x++;
}
public void incrementY () {
// 自增操作不是线程安全的
y++;
}
@Override
public String toString() {
return "x: " + x + ", y: " + y ;
}
public class PairValueNotEqualException extends RuntimeException {
public PairValueNotEqualException() {
super("一对不相等的值:" + Pair.this);
}
}
// 任意不变量 —— 两个变量必须相等
public void checkState() {
if(x != y) {
throw new PairValueNotEqualException();
}
}
} /* Output: (Sample)
pm1: Pair: x: 15, y: 15 checkCounter = 272565
pm2: Pair: x: 16, y: 16 checkCounter = 3956974
*///:~
正如注释中注明的,Pair不是线程安全的,因为它的约束条件(虽然是任意的)需要两个变量要维护成相同的值。此外,如本章前面所述,自增加操作不是线程安全的,并且因为Pair类没有任何方法被标记为synchronized,所以不能保证一个Pair对象在多线程程序中不会被破坏。
你可以想象一下这种情况∶某人交给你一个非线程安全的Pair类,而你需要在一个线程环境中使用它。通过创建PairManager类就可以实现这一点,PairManager类持有一个Pair对象并控制对它的一切访问。注意唯一的public方法是getPair(),它是synchronized的。对于抽象方法increment(),对increment()的同步控制将在实现的时候进行处理。
至于PairManager类的结构,它的一些功能在基类中实现,并且其一个或多个抽象方法在派生类中定义,这种结构在设计模式中称为模板方法。设计模式使你得以把变化封装在代码里;在此,发生变化的部分是模板方法increment()。在PairManager1中,整个increment()方法是被同步控制的;但在PairManager2中,increment()方法使用同步控制块进行同步。注意,synchronized关键字不属于方法特征签名的组成部分,所以可以在覆盖方法的时候加上去。
store()方法将一个Pair对象添加到了synchronized ArrayList中,所以这个操作是线程安全的。因此,该方法不必进行防护,可以放置在PairManager2的synchronized语句块的外部。
PairManipulator被创建用来测试两种不同类型的PairManager,其方法是在某个任务中调用increment(),而PairChecker则在另一个任务中执行。为了跟踪可以运行测试的频度,PairChecker在每次成功时都递增checkCounter。在main()中创建了两个PairManipulator对象,并允许它们运行一段时间,之后每个PairManipulator的结果会得到展示。
尽管每次运行的结果可能会非常不同,但一般来说,对于PairChecker的检查频率,PairManagerl.increment()不允许有PairManager2.increment()那样多。后者采用同步控制块进行同步,所以对象不加锁的时间更长。这也是宁愿使用同步控制块而不是对整个方法进行同步控制的典型原因∶使得其他线程能更多地访问(在安全的情况下尽可能多)。
在其它对象上同步
synchronized块必须给定一个在其上进行同步的对象,并且最合理的方式是,使用其方法正在被调用的当前对象∶synchronized(this),这正是PairManager2所使用的方式。在这种方式中,如果获得了synchronized块上的锁,那么该对象其他的synchronized方法和临界区就不能被调用了。因此,如果在this上同步,临界区的效果就会直接缩小在同步的范围内。
有时必须在另一个对象上同步,但是如果你要这么做,就必须确保所有相关的任务都是在同一个对象上同步的。下面的示例演示了两个任务可以同时进入同一个对象,只要这个对象上的方法是在不同的锁上同步的即可∶
package concurrency;
import java.util.Objects;
/**
* @author Mr.Sun
* @date 2022年09月04日 16:19
*
* 在其它对象上同步
*/
public class SyncObject {
public static void main(String[] args) {
final DualSync ds = new DualSync();
new Thread() {
@Override
public void run() {
ds.f();
}
}.start();
ds.g();
}
}
class DualSync {
private Object obj = new Object();
public synchronized void f() {
for (int i = 0; i < 5; i++) {
System.out.println("f()");
Thread.yield();
}
}
public void g() {
synchronized (obj) {
for (int i = 0; i < 5; i++) {
System.out.println("g()");
Thread.yield();
}
}
}
} /* Output: (Sample)
g()
g()
f()
g()
f()
g()
f()
g()
f()
f()
*///:~
DualSync.f()(通过同步整个方法)在this同步,而g()有一个在syncObject上同步的synchronized块。因此,这两个同步是互相独立的。通过在main()中创建调用f()的Thread对这一点进行了演示,因为main()线程是被用来调用g()的。从输出中可以看到,这两个方式在同时运行,因此任何一个方法都没有因为对另一个方法的同步而被阻塞。
线程本地存储
防止任务在共享资源上产生冲突的第二种方式是根除对变量的共享。线程本地存储是一种自动化机制,可以为使用相同变量的每个不同的线程都创建不同的存储。因此,如果你有5个线程都要使用变量x所表示的对象,那线程本地存储就会生成5个用于x的不同的存储块。主要是,它们使得你可以将状态与线程关联起来。
创建和管理线程本地存储可以由java.lang.ThreadLocal类来实现,如下所示∶
package concurrency;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* @author Mr.Sun
* @date 2022年09月04日 16:29
*
* 创建和管理线程本地存储可以使用ThreadLocal来实现
*/
public class ThreadLocalVariableHolder {
private static ThreadLocal<Integer> value = new ThreadLocal<Integer>() {
private Random random = new Random(47);
@Override
protected Integer initialValue() {
return random.nextInt(10000);
}
};
public static void increment () {
value.set(value.get() + 1);
}
public static int get() {
return value.get();
}
public static void main(String[] args) throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
exec.execute(new Accessor(i));
}
TimeUnit.MILLISECONDS.sleep(1000);
exec.shutdown();
System.exit(0);
}
}
class Accessor implements Runnable {
private final int id;
public Accessor(int id) {
this.id = id;
}
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
ThreadLocalVariableHolder.increment();
System.out.println(this);
Thread.yield();
}
}
@Override
public String toString() {
return "#" + id + ": " + ThreadLocalVariableHolder.get();
}
} /* Output: (Sample)
#0: 9259
#1: 556
#2: 6694
#3: 1862
#4: 962
#0: 9260
#1: 557
#2: 6695
#3: 1863
#4: 963
...
*///:~
ThreadLocal对象通常当作静态域存储。在创建ThreadLocal时,你只能通过get()和set()方法来访问该对象的内容,其中,get()方法将返回与其线程相关联的对象的副本,而set()会将参数插入到为其线程存储的对象中,并返回存储中原有的对象。increment()和get()方法在ThreadLocalVariableHolder中演示了这一点。注意,increment()和get()方法都不是synchronized 的,因为ThreadLocal保证不会出现竞争条件。
当运行这个程序时,你可以看到每个单独的线程都被分配了自己的存储,因为它们每个都需要跟踪自己的计数值,即便只有一个ThreadLocalVariableHolder对象。
终结任务
在某些情况下,任务必须突然地终止,首先,让我们观察一个示例,它不仅演示了终止问题,而且还是一个资源共享的示例:
1. 装饰性花园
在这个仿真程序中,花园委员会希望了解每天通过多个大门进入公园的总人数。每个大门都有一个十字转门或者某种其他形式的计数器,并且任何一个十字转门的计数值递增时,就表示公园中的总人数的共享计数值也会递增。
package concurrency;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* @author Mr.Sun
* @date 2022年09月04日 20:11
*
* 装饰花园:演示终结任务
* <p>
* 在这个仿真程序中,花园委员会希望了解每天通过多个大门进入公园的总人数。
* 每个大门都有一个十字转门或者某种其他形式的计数器,并且任何一个十字转门的计数值递增时,就表示公园中的总人数的共享计数值也会递增。
* </p>
*/
public class OrnamentalGarden {
public static void main(String[] args) throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
exec.execute(new Entrance(i));
}
TimeUnit.SECONDS.sleep(3);
Entrance.cancel();
exec.shutdown();
if (!exec.awaitTermination(250, TimeUnit.MILLISECONDS)) {
System.out.println("某些任务未被终止!");
}
System.out.println("Total: " + Entrance.getTotalCount());
System.out.println("Sum of Entrances: " + Entrance.sumEntrances());
}
}
class Count {
private int count = 0;
private Random ran = new Random(47);
// 删除synchronized关键字以查看计数失败
public synchronized int increment() {
int temp = count;
if (ran.nextBoolean()) {
Thread.yield();
}
return count = ++temp;
}
public synchronized int value () {
return count;
}
}
class Entrance implements Runnable {
private static Count count = new Count();
private static List<Entrance> entranceList = new ArrayList<>();
private int number = 0;
private final int id;
private static volatile boolean canceled = false;
/**
* 原子操作
*/
public static void cancel() {
canceled = true;
}
public Entrance(int id) {
this.id = id;
// 将此任务保留在列表中, 也防止无用任务被垃圾回收
entranceList.add(this);
}
@Override
public void run() {
while (! canceled) {
synchronized (this) {
++ number;
}
System.out.println(this + " Total: " + count.increment());
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
System.out.println("sleep interrupted");
}
}
System.out.println("Stopping " + this);
}
public synchronized int getValue() { return number; }
@Override
public String toString() {
return "Entrance " + id + ": " + getValue();
}
public static int getTotalCount() {
return count.value();
}
public static int sumEntrances() {
int sum = 0;
for(Entrance entrance : entranceList)
sum += entrance.getValue();
return sum;
}
} /* Output: (Sample)
Entrance 0: 1 Total: 1
Entrance 2: 1 Total: 3
Entrance 1: 1 Total: 2
Entrance 4: 1 Total: 5
Entrance 3: 1 Total: 4
Entrance 2: 2 Total: 6
Entrance 4: 2 Total: 7
Entrance 0: 2 Total: 8
...
Entrance 3: 29 Total: 143
Entrance 0: 29 Total: 144
Entrance 4: 29 Total: 145
Entrance 2: 30 Total: 147
Entrance 1: 30 Total: 146
Entrance 0: 30 Total: 149
Entrance 3: 30 Total: 148
Entrance 4: 30 Total: 150
Stopping Entrance 2: 30
Stopping Entrance 1: 30
Stopping Entrance 0: 30
Stopping Entrance 3: 30
Stopping Entrance 4: 30
Total: 150
Sum of Entrances: 150
*///:~
这里使用单个的Count对象来跟踪花园参观者的主计数值,并且将其当作Entrance类中的一个静态域进行存储。Count.increment()和Count.value()都是synchronized的,用来控制对count域的访问。increment()方法使用了Random对象,目的是在从把count读取到temp中,到递增temp并将其存储回count的这段时间里,有大约一半的时间产生让步。如果你将increment()上的synchronized关键字注释掉,那么这个程序就会崩溃,因为多个任务将同时访问并修改count(yield()会使问题更快地发生)。
每个Entrance任务都维护着一个本地值number,它包含通过某个特定入口进入的参观者的数量。这提供了对count对象的双重检査,以确保其记录的参观者数量是正确的。Entrance.run()只是递增number和count对象,然后休眠100毫秒。
因为Entrance.canceled是一个volatile布尔标志,而它只会被读取和赋值(不会与其他域组合在一起被读取),所以不需要同步对其的访问,就可以安全地操作它。如果你对诸如此类的情况有任何疑虑,那么最好总是使用synchronized。这个程序在以稳定的方式关闭所有事物方面还有一些小麻烦,其部分原因是为了说明在终止多线程程序时你必须相当小心,而另一部分原因是为了演示interrupt()的值,稍后你将学习有关这个值的知识。
在3秒钟之后,main()向Entrance发送static cancel()消息,然后调用exec对象的shutdown。方法,之后调用exec上的awaitTermination()方法。ExecutorService.awaitTermination()等待每个任务结束,如果所有的任务在超时时间达到之前全部结束,则返回true,否则返回false,表示不是所有的任务都已经结束了。尽管这会导致每个任务都退出其run()方法,并因此作为任务而终止,但是Entrance对象仍旧是有效的,因为在构造器中,每个Entrance对象都存储在称为entranceList的静态List
2.在阻塞时终结
前面示例中的Entrance.run()在其循环中包含对sleep()的调用。我们知道,sleep()最终将唤醒,而任务也将返回循环的开始部分,去检查canceled标志,从而决定是否跳出循环。但是,sleep()一种情况,它使任务从执行状态变为被阻塞状态,而有时你必须终止被阻塞的任务。
线程状态
一个线程可以处于以下四种状态之一:
1)新建(new)∶当线程被创建时,它只会短暂地处于这种状态。此时它已经分配了必需的系统资源,并执行了初始化。此刻线程已经有资格获得CPU时间了,之后调度器将把这个线程转变为可运行状态或阻塞状态。
2)就绪(Runnable)∶在这种状态下,只要调度器把时间片分配给线程,线程就可以运行。也就是说,在任意时刻,线程可以运行也可以不运行。只要调度器能分配时间片给线程,它就可以运行;这不同于死亡和阻塞状态。
3)阻塞(Blocked)∶线程能够运行,但有某个条件阻止它的运行。当线程处于阻塞状态时,调度器将忽略线程,不会分配给线程任何CPU时间。直到线程重新进入了就绪状态,它才有可能执行操作。
4)死亡(Dead)∶处于死亡或终止状态的线程将不再是可调度的,并且再也不会得到CPU时间,它的任务已结束,或不再是可运行的。任务死亡的通常方式是从run()方法返回,但是任务的线程还可以被中断,你将要看到这一点。
进入阻塞状态
一个任务进入阻塞状态,可能有如下原因∶
1)通过调用sleep(milliseconds)使任务进入休眠状态,在这种情况下,任务在指定的时间内不会运行。
2)你通过调用wait()使线程挂起。直到线程得到了notify()或notifyAll()消息(或者在Java SE5的java.util.concurrent类库中等价的signal()或signalAll()消息),线程才会进入就绪状态。我们将在稍后的小节中验证这一点。
3)任务在等待某个输入/输出完成。
4)任务试图在某个对象上调用其同步控制方法,但是对象锁不可用,因为另一个任务已经获取了这个锁。
中断
Thread类包含interrupt()方法,因此你可以终止被阻塞的任务,这个方法将设置线程的中断状态。如果一个线程已经被阻塞,或者试图执行一个阻塞操作,那么设置这个线程的中断状态将抛出InterruptedException。当抛出该异常或者该任务调用Thread.interrupted()时,中断状态将被复位。正如你将看到的,Thread.interrupted()提供了离开run()循环而不抛出异常的第二种方式。
为了调用interrupt(),你必须持有Thread对象。你可能已经注意到了,新的concurrent类库似乎在避免对Thread对象的直接操作,转而尽量通过Executor来执行所有操作。如果你在Executor上调用shutdownNow(),那么它将发送一个interrupt()调用给它启动的所有线程。这么做是有意义的,因为当你完成工程中的某个部分或者整个程序时,通常会希望同时关闭某个特定Executor的所有任务。然而,你有时也会希望只中断某个单一任务。如果使用Executor,那么通过调用submit()而不是executor()来启动任务,就可以持有该任务的上下文。submit()将返回一个泛型Future<?>,其中有一个未修饰的参数,因为你永远都不会在其上调用get()——持有这种Future的关键在于你可以在其上调用cancel(),并因此可以使用它来中断某个特定任务。如果你将true传递给cancel(),那么它就会拥有在该线程上调用interrupt()以停止这个线程的权限。因此,cancel()是一种中断由Executor启动的单个线程的方式。
package concurrency;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/**
* @author Mr.Sun
* @date 2022年09月05日 10:33
*
* 演示interrupt()的使用
*/
public class Interrupting {
private static ExecutorService exex = Executors.newCachedThreadPool();
static void test(Runnable r) throws InterruptedException {
Future<?> future = exex.submit(r);
TimeUnit.MILLISECONDS.sleep(100);
System.out.println("Interrupting " + r.getClass().getName());
future.cancel(true);
System.out.println("Interrupting sent to " + r.getClass().getName());
}
public static void main(String[] args) throws Exception {
test(new SleepBlocked());
test(new IOBlocked(System.in));
test(new SynchronizedBlocked());
TimeUnit.SECONDS.sleep(3);
System.out.println("Aborting with System.exit(0)");
System.exit(0);
}
}
class SleepBlocked implements Runnable {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(100);
} catch (InterruptedException e) {
System.out.println("InterruptedException");
}
System.out.println("Exiting SleepBlocked.run()");
}
}
class IOBlocked implements Runnable {
private InputStream in;
public IOBlocked(InputStream in) {
this.in = in;
}
@Override
public void run() {
System.out.println("Waiting for read(): ");
try {
in.read();
} catch (IOException e) {
System.out.println("Interrupted from blocked I/O");
}
System.out.println("Exiting IOBlocked.run()");
}
}
class SynchronizedBlocked implements Runnable {
public synchronized void f() {
// 永不释放锁,一直占有
while (true) {
// Thread.yield();
}
}
public SynchronizedBlocked() {
new Thread() {
@Override
public void run() {
f(); // 锁会一直被当前线程占用
}
}.start();
}
@Override
public void run() {
System.out.println("Trying to call f()");
f();
System.out.println("Exiting SynchronizedBlocked.run()");
}
} /* Output: (95% match)
Interrupting concurrency.SleepBlocked
InterruptedException
Exiting SleepBlocked.run()
Interrupting sent to concurrency.SleepBlocked
Waiting for read():
Interrupting concurrency.IOBlocked
Interrupting sent to concurrency.IOBlocked
Trying to call f()
Interrupting concurrency.SynchronizedBlocked
Interrupting sent to concurrency.SynchronizedBlocked
Aborting with System.exit(0)
*///:~
上面的每个任务都表示了一种不同类型的阻塞。SleepBlock是可中断的阻塞示例,而IOBlocked和SynchronizedBlocked是不可中断的阻塞示例。这个程序证明I/O和在synchronized 块上的等待是不可中断的,但是通过浏览代码,你也可以预见到这一点——无论是I/O还是尝试调用synchronized方法,都不需要任何InterruptedException处理器。
前两个类很简单直观∶在第一个类中run()方法调用了sleep(),而在第二个类中调用了read()。但是,为了演示SynchronizedBlock,我们必须首先获取锁。这是通过在构造器中创建匿名的Thread类的实例来实现的,这个匿名Thread类的对象通过调用f()获取了对象锁(这个线程必须有别于为SynchronizedBlock驱动run()的线程,因为一个线程可以多次获得某个对象锁)。由于f()永远都不返回,因此这个锁永远不会释放,而SynchronizedBlock.run()在试图调用f(),并阻塞以等待这个锁被释放。
从输出中可以看到,你能够中断对sleep()的调用(或者任何要求抛出InterruptedException 的调用)。但是,你不能中断正在试图获取synchronized锁或者试图执行I/O操作的线程。这有点令人烦恼,特别是在创建执行I/O的任务时,因为这意味着I/O具有锁住你的多线程程序的潜在可能。特别是对于基于Web的程序,这更是关乎利害。
对于这类问题,有一个略显笨拙但是有时确实行之有效的解决方案,即关闭任务在其上发生阻塞的底层资源∶
package concurrency;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* @author Mr.Sun
* @date 2022年09月05日 11:01
*
* 中断I/O操作的线程:关闭任务在其上发生阻塞的底层资源
*/
public class CloseResource {
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
InputStream in = new Socket("localhost", 8080).getInputStream();
exec.execute(new IOBlocked(in));
exec.execute(new IOBlocked(System.in));
TimeUnit.MILLISECONDS.sleep(100);
System.out.println("Shutting down all threads");
exec.shutdownNow();
TimeUnit.SECONDS.sleep(1);
System.out.println("Closing " + in.getClass().getName());
in.close();
TimeUnit.SECONDS.sleep(1);
System.out.println("Closing " + System.in.getClass().getName());
System.in.close();
}
} /* Output: (85% match)
Waiting for read():
Waiting for read():
Shutting down all threads
Closing java.net.SocketInputStream
Interrupted from blocked I/O
Exiting IOBlocked.run()
Closing java.io.BufferedInputStream
Exiting IOBlocked.run()
*///:~
线程之间的协作
wait()和notifyAll()
有两种形式的wait()。第一种版本接受毫秒数作为参数,含义与sleep()方法里参数的意思相同,都是指“在此期间暂停”。但是与sleep()不同的是,对于wait()而言∶
1)在wait()期间对象锁是释放的。而调用sleep()的时候锁并没有被释放,调用yield()也属于这种情况。
2)可以通过notify()、notifyAll(),或者令时间到期,从wait()中恢复执行。
第二种,也是更常用形式的wait()不接受任何参数。这种wait()将无限等待下去,直到线程接收到notify()或者notifyAll()消息。
wait()、notify()以及notifyAll()有一个比较特殊的方面,那就是这些方法是基类Object的一部分,而不是属于Thread的一部分。尽管开始看起来有点奇怪——仅仅针对线程的功能却作为通用基类的一部分而实现,不过这是有道理的,因为这些方法操作的锁也是所有对象的一部分。所以,你可以把wait()放进任何同步控制方法里,而不用考虑这个类是继承自Thread还是实现了Runnable接口。实际上,只能在同步控制方法或同步控制块里调用wait()、notify()和notifyAll() (因为不用操作锁,所以sleep()可以在非同步控制方法里调用)。如果在非同步控制方法里调用这些方法,程序能通过编译,但运行的时候,将得到IlegalMonitorStateException异常,并伴随着一些含糊的消息,比如“当前线程不是拥有者”。消息的意思是,调用wait()、notify()和notifyAll()的任务在调用这些方法前必须"拥有"(获取)对象的锁。
可以让另一个对象执行某种操作以维护其自己的锁。要这么做的话,必须首先得到对象的锁。比如,如果要向对象x发送notifyAll(),那么就必须在能够取得x的锁的同步控制块中这么做∶
synchronized(x) {
x.notifyAll();
}
让我们看一个简单的示例,WaxOMatic.java有两个过程∶一个是将蜡涂到Car上,一个是抛光它。抛光任务在涂蜡任务完成之前,是不能执行其工作的,而涂蜡任务在涂另一层蜡之前,必须等待抛光任务完成。WaxOn和WaxOff都使用了Car对象,该对象在这些任务等待条件变化的时候,使用wait()和notifyAll()来挂起和重新启动这些任务∶
package concurrency;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* @author Mr.Sun
* @date 2022年09月05日 15:25
*/
public class WaxOMatic {
public static void main(String[] args) throws Exception {
Car car = new Car();
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new WaxOff(car));
exec.execute(new WaxOn(car));
TimeUnit.SECONDS.sleep(5);
exec.shutdownNow(); // 中断所有任务
}
}
class Car {
private boolean waxOn = false;
// 涂蜡
public synchronized void waxed() {
waxOn = true; // 准备抛光
notifyAll();
}
// 抛光
public synchronized void buffed() {
waxOn = false; // 为涂另外一层蜡做准备
notifyAll();
}
// 等待涂蜡
public synchronized void waitForWaxing() throws InterruptedException {
while (!waxOn) {
wait();
}
}
// 等待抛光
public synchronized void waitForBuffing() throws InterruptedException {
while (waxOn) {
wait();
}
}
}
class WaxOn implements Runnable {
private Car car;
public WaxOn(Car car) {
this.car = car;
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
System.out.print("Wax on! ");
TimeUnit.MILLISECONDS.sleep(200);
// 给车涂蜡
car.waxed();
// 涂蜡完成,准备抛光
car.waitForBuffing();
}
} catch (InterruptedException e) {
System.out.println("Exiting via interrupt");
}
System.out.println("Ending Wax on task");
}
}
class WaxOff implements Runnable {
private Car car;
public WaxOff(Car car) {
this.car = car;
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
car.waitForWaxing();
System.out.print("Wax off! ");
TimeUnit.MILLISECONDS.sleep(200);
car.buffed();
}
} catch (InterruptedException e) {
System.out.println("Exiting via interrupt");
}
System.out.println("Ending Wax off task");
}
} /* Output:
Wax on! Wax off! Wax on! Wax off! Wax on! Wax off! Wax on! Wax off! Wax on! Wax off! Wax on! Wax off! Wax on! Wax off! Wax on! Wax off! Wax on! Wax off! Wax on! Wax off! Wax on! Wax off! Wax on! Wax off! Wax on! Exiting via interrupt
Ending Wax off task
Exiting via interrupt
Ending Wax on task
*///:~
这里,Car有一个单一的布尔属性waxOn,表示涂蜡-抛光处理的状态。
在waitForWaxing()中将检查waxOn标志,如果它为false,那么这个调用任务将通过调用wait()而被挂起。这个行为发生在synchronized方法中这一点很重要,因为在这样的方法中,任务已经获得了锁。当你调用wait()时,线程被挂起,而锁被释放。锁被释放这一点是本质所在,因为为了安全地改变对象的状态(例如,将waxOn改变为true,如果被挂起的任务要继续执行,就必须执行该动作),其他某个任务就必须能够获得这个锁。在本例中,如果另一个任务调用waxed()来表示“是时候该干点什么了”,那么就必须获得这个锁,从而将waxOn改变为true。之后,waxed()调用notifyAll(),这将唤醒在对wait()的调用中被挂起的任务。为了使该任务从wait()中唤醒,它必须首先重新获得当它进入wait()时释放的锁。在这个锁变得可用之前,这个任务是不会被唤醒的。
WaxOn.run()表示给汽车打蜡过程的第一个步骤,因此它将执行它的操作∶调用sleep()以模拟需要涂蜡的时间,然后告知汽车涂蜡结束,并调用waitForBuffing(),这个方法会用一个wait()调用来挂起这个任务,直至WaxOff任务调用这辆车的buffed(),.从而改变状态并调用notifyAll() 为止。另一方面,WaxOff.run()立即进入waitForWaxing(),并因此而被挂起,直至WaxOn涂完蜡并且waxed()被调用。在运行这个程序时,你可以看到当控制权在两个任务之间来回互相传递时,这个两步骤过程在不断地重复。在5秒钟之后,interrupt()会中止这两个线程;当你调用某个ExecutorService的shutdownNow()时,它会调用所有由它控制的线程的interrupt()。
notify()和notifyAll()
使用notify()而不是notifyAll()是一种优化。使用notify()时,在众多等待同一个锁的任务中只有一个会被唤醒,因此如果你希望使用notify(),就必须保证被唤醒的是恰当的任务。另外,为了使用notify(),所有任务必须等待相同的条件,因为如果你有多个任务在等待不同的条件,那么你就不会知道是否唤醒了恰当的任务。如果使用notify(),当条件发生变化时,必须只有一个任务能够从中受益。最后,这些限制对所有可能存在的子类都必须总是起作用的。如果这些规则中有任何一条不满足,那么你就必须使用notifyAll()而不是notify()。
在有关Java的线程机制的讨论中,有一个令人困惑的描述∶notifyAll()将唤醒“所有正在等待的任务”。这是否意味着在程序中任何地方,任何处于wait()状态中的任务都将被任何对notifyAll()的调用唤醒呢?在下面的示例中,与Task2相关的代码说明了情况并非如此——事实上,当notifyAll()因某个特定锁而被调用时,只有等待这个锁的任务才会被唤醒∶
package concurrency;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* @author Mr.Sun
* @date 2022年09月05日 16:00
*
* notifyAll()并非唤醒所有“正在等待”的任务,而是只会唤醒等待 调用notifyAll()某个特定锁的任务
*/
public class NotifyVsNotifyAll {
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
exec.execute(new Task());
}
exec.execute(new Task2());
Timer timer = new Timer();
timer.scheduleAtFixedRate(new TimerTask() {
boolean prod = true;
@Override
public void run() {
if (prod) {
System.out.print("\nnotify() ");
Task.blocker.prod();
prod = false;
} else {
System.out.print("\nnotifyAll() ");
Task.blocker.prodAll();
prod = true;
}
}
}, 400, 400);
TimeUnit.SECONDS.sleep(5);
timer.cancel();
System.out.println("\nTimer canceled");
TimeUnit.MILLISECONDS.sleep(500);
System.out.print("Task2.blocker.prodAll() ");
Task2.blocker.prodAll();
TimeUnit.MILLISECONDS.sleep(500);
System.out.println("\nShutting down");
exec.shutdownNow(); // 中断所有任务
}
}
class Blocker {
synchronized void waitingCall() {
try {
while (!Thread.interrupted()) {
wait();
System.out.println(Thread.currentThread() + " ");
}
} catch (InterruptedException e) {
}
}
synchronized void prod() {
notify();
}
synchronized void prodAll() {
notifyAll();
}
}
class Task implements Runnable {
static Blocker blocker = new Blocker();
@Override
public void run() {
blocker.waitingCall();
}
}
class Task2 implements Runnable {
// 一个单独的blocker对象
static Blocker blocker = new Blocker();
@Override
public void run() {
blocker.waitingCall();
}
} /* Output:
notify() Thread[pool-1-thread-1,5,main]
notifyAll() Thread[pool-1-thread-1,5,main]
Thread[pool-1-thread-5,5,main]
Thread[pool-1-thread-4,5,main]
Thread[pool-1-thread-3,5,main]
Thread[pool-1-thread-2,5,main]
notify() Thread[pool-1-thread-1,5,main]
notifyAll() Thread[pool-1-thread-1,5,main]
Thread[pool-1-thread-2,5,main]
Thread[pool-1-thread-3,5,main]
Thread[pool-1-thread-4,5,main]
Thread[pool-1-thread-5,5,main]
notify() Thread[pool-1-thread-1,5,main]
notifyAll() Thread[pool-1-thread-1,5,main]
Thread[pool-1-thread-5,5,main]
Thread[pool-1-thread-4,5,main]
Thread[pool-1-thread-3,5,main]
Thread[pool-1-thread-2,5,main]
notify() Thread[pool-1-thread-1,5,main]
notifyAll() Thread[pool-1-thread-1,5,main]
Thread[pool-1-thread-2,5,main]
Thread[pool-1-thread-3,5,main]
Thread[pool-1-thread-4,5,main]
Thread[pool-1-thread-5,5,main]
notify() Thread[pool-1-thread-1,5,main]
notifyAll() Thread[pool-1-thread-1,5,main]
Thread[pool-1-thread-5,5,main]
Thread[pool-1-thread-4,5,main]
Thread[pool-1-thread-3,5,main]
Thread[pool-1-thread-2,5,main]
notify() Thread[pool-1-thread-1,5,main]
notifyAll() Thread[pool-1-thread-1,5,main]
Thread[pool-1-thread-2,5,main]
Thread[pool-1-thread-3,5,main]
Thread[pool-1-thread-4,5,main]
Thread[pool-1-thread-5,5,main]
Timer canceled
Task2.blocker.prodAll() Thread[pool-1-thread-6,5,main]
Shutting down
*///:~
Task和Task2每个都有其自己的Blocker对象,因此每个Task对象都会在Task.blocker上阻塞,而每个Task2都会在Task2.blocker上阻塞。在main()中,java.util.Timer对象被设置为每4/10秒执行一次run()方法,而这个run()方法将经由“激励”方法交替地在Task.blocker上调用notify()和notifyAll()。
从输出中你可以看到,即使存在Task2.blocker上阻塞的Task2对象,也没有任何在Task.blocker上的notify()或notifyAll()调用会导致Task2对象被唤醒。与此类似,在main()的结尾,调用了timer的cancel(),即使计时器被撤销了,前5个任务也依然在运行,并仍旧在它们对Task.blocker.waitingCall()的调用中被阻塞。对Task2.blocker.prodAll()的调用所产生的输出不包括任何在Task.blocker中的锁上等待的任务。
生产者和消费者
请考虑这样一个饭店,它有一个厨师和一个服务员。这个服务员必须等待厨师准备好膳食。当厨师准备好时,他会通知服务员,之后服务员上菜,然后返回继续等待。这是一个任务协作的示例∶厨师代表生产者,而服务员代表消费者。两个任务必须在膳食被生产和消费时进行握手,而系统必须以有序的方式关闭。下面是对这个叙述建模的代码∶
package concurrency;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* @author Mr.Sun
* @date 2022年09月05日 16:34
*
* 生产者消费者示例
*/
public class Restaurant {
Meal meal;
ExecutorService exec = Executors.newCachedThreadPool();
final WaitPerson waitPerson = new WaitPerson(this);
final Chef chef = new Chef(this);
public Restaurant() {
exec.execute(chef);
exec.execute(waitPerson);
}
public static void main(String[] args) {
new Restaurant();
}
}
class Meal {
private final int orderNum;
public Meal(int orderNum) {
this.orderNum = orderNum;
}
@Override
public String toString() {
return "Meal " + orderNum;
}
}
// 服务员 - 消费者
class WaitPerson implements Runnable {
private Restaurant restaurant;
public WaitPerson(Restaurant restaurant) {
this.restaurant = restaurant;
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
synchronized (this) {
while (restaurant.meal == null) {
wait(); // 等待厨师制作食物
}
}
System.out.println("Waitperson got " + restaurant.meal);
synchronized(restaurant.chef) {
restaurant.meal = null;
restaurant.chef.notifyAll(); // Ready for another
}
}
} catch (InterruptedException e) {
System.out.println("WaitPerson interrupted");
}
}
}
// 厨师 - 生产者
class Chef implements Runnable {
private final Restaurant restaurant;
private int count = 0;
public Chef(Restaurant restaurant) {
this.restaurant = restaurant;
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
synchronized (this) {
if (restaurant.meal != null) {
wait(); // 等待被消费
}
}
if (++count == 10) {
System.out.println("Out of food, closing");
restaurant.exec.shutdownNow();
}
System.out.print("order up! ");
synchronized (restaurant.waitPerson) {
restaurant.meal = new Meal(count);
restaurant.waitPerson.notifyAll();
}
TimeUnit.MILLISECONDS.sleep(100);
}
} catch (InterruptedException e) {
System.out.println("Chef interrupted");
}
}
} /* Output:
order up! Waitperson got Meal 1
order up! Waitperson got Meal 2
order up! Waitperson got Meal 3
order up! Waitperson got Meal 4
order up! Waitperson got Meal 5
order up! Waitperson got Meal 6
order up! Waitperson got Meal 7
order up! Waitperson got Meal 8
order up! Waitperson got Meal 9
Out of food, closing
WaitPerson interrupted
order up! Chef interrupted
*///:~
Restaurant是WaitPerson和Chef的焦点,他们都必须知道在为哪个Restaurant工作,因为他们必须和这家饭店的“餐窗”打交道,以便放置或拿取膳食restaurant.meal。在run()中,WaitPerson进入wait()模式,停止其任务,直至被Chef的notifyAll()唤醒。由于这是一个非常简单的程序,因此我们知道只有一个任务将在WaitPerson的锁上等待∶即WaitPerson任务自身。出于这个原因,理论上可以调用notify()而不是notifyAll()。但是,在更复杂的情况下,可能会有多个任务在某个特定对象锁上等待,因此你不知道哪个任务应该被唤醒。因此,调用notifyAll() 要更安全一些,这样可以唤醒等待这个锁的所有任务,而每个任务都必须决定这个通知是否与自己相关。
一旦Chef送上Meal并通知WaitPerson,这个Chef就将等待,直至WaitPerson收集到订单并通知Chef,之后Chef就可以烧下一份Meal了。
生产者-消费者与队列
wait()和notifyAll()方法以一种非常低级的方式解决了任务互操作问题,即每次交互时都握手。在许多情况下,你可以瞄向更高的抽象级别,使用同步队列来解决任务协作问题,同步队列在任何时刻都只允许一个任务插入或移除元素。在java.util.concurrent.BlockingQueue接口中提供了这个队列,这个接口有大量的标准实现。你通常可以使用LinkedBlockingQueue,它是一个无界队列,还可以使用ArrayBlockingQueue,它具有固定的尺寸,因此你可以在它被阻塞之前,向其中放置有限数量的元素。
如果消费者任务试图从队列中获取对象,而该队列此时为空,那么这些队列还可以挂起消费者任务,并且当有更多的元素可用时恢复消费者任务。阻塞队列可以解决非常大量的问题,而其方式与wait()和notifyAll()相比,则简单并可靠得多。
考虑下面这个使用BlockingQueue的示例,有一台机器具有三个任务∶一个制作吐司、一个给吐司抹黄油,另一个在抹过黄油的吐司上涂果酱。我们可以通过各个处理过程之间的BlockingQueue来运行这个吐司制作程序∶
package concurrency.juc;
import container.Stack;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* @author Mr.Sun
* @date 2022年09月06日 9:53
*
* 制作吐司 -> 涂黄油 -> 涂果酱 -> 咀嚼消费
*/
public class ToastOMatic {
public static void main(String[] args) throws Exception {
ToastQueue dryQueue = new ToastQueue(),
butterQueue = new ToastQueue(),
finishedQueue = new ToastQueue();
ExecutorService exec = Executors.newCachedThreadPool();
// 制作吐司
exec.execute(new Toaster(dryQueue));
// 为制作好的吐司涂上黄油
exec.execute(new Butterer(dryQueue, butterQueue));
// 为涂上黄油的吐司涂抹果酱
exec.execute(new Jammer(butterQueue, finishedQueue));
// 将涂抹好果酱的吐司给用户咀嚼消费
exec.execute(new Eater(finishedQueue));
TimeUnit.SECONDS.sleep(5);
exec.shutdownNow();
}
}
class Toast {
public enum Status {DRY, BUFFERED, JAMMED}
private Status status = Status.DRY;
private final int id;
public Toast(int id) {
this.id = id;
}
// 涂抹黄油
public void butter() {
status = Status.BUFFERED;
}
// 涂果酱
public void jam() {
status = Status.JAMMED;
}
public Status getStatus() {
return status;
}
public int getId() {
return id;
}
@Override
public String toString() {
return "Toast " + id + ": " + status;
}
}
class ToastQueue extends LinkedBlockingQueue<Toast> { }
class Toaster implements Runnable {
private final ToastQueue toastQueue;
private int count = 0;
private final Random random = new Random(47);
public Toaster(ToastQueue toastQueue) {
this.toastQueue = toastQueue;
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
TimeUnit.MILLISECONDS.sleep(100 + random.nextInt(500));
// 制作吐司
Toast t = new Toast(count++);
System.out.println(t);
// 插入吐司队列
toastQueue.put(t);
}
} catch (InterruptedException e) {
System.out.println("Toaster Interrupted");
}
System.out.println("Toaster off");
}
}
// 给吐司涂黄油
class Butterer implements Runnable {
private final ToastQueue dryQueue, butteredQueue;
public Butterer(ToastQueue dry, ToastQueue buttered) {
dryQueue = dry;
butteredQueue = buttered;
}
@Override
public void run() {
try {
while(!Thread.interrupted()) {
// 若刚制作的吐司队列为空,则阻塞等待
Toast t = dryQueue.take();
// 不为空,则为该吐司涂上黄油
t.butter();
System.out.println(t);
// 将涂好黄油的吐司放进黄油吐司队列中
butteredQueue.put(t);
}
} catch(InterruptedException e) {
System.out.println("Butterer interrupted");
}
System.out.println("Butterer off");
}
}
// 给涂黄油过后的吐司涂上果酱
class Jammer implements Runnable {
private final ToastQueue butteredQueue, finishedQueue;
public Jammer(ToastQueue buttered, ToastQueue finished) {
butteredQueue = buttered;
finishedQueue = finished;
}
public void run() {
try {
while(!Thread.interrupted()) {
// 若已经涂黄油的吐司队列为空,则阻塞等待
Toast t = butteredQueue.take();
// 获取到涂抹黄油的吐司后,在涂上果酱
t.jam();
System.out.println(t);
// 放入已制作完成的土司队列 中
finishedQueue.put(t);
}
} catch(InterruptedException e) {
System.out.println("Jammer interrupted");
}
System.out.println("Jammer off");
}
}
// 消费吐司
class Eater implements Runnable {
private final ToastQueue finishedQueue;
private int counter = 0;
public Eater(ToastQueue finished) {
finishedQueue = finished;
}
@Override
public void run() {
try {
while(!Thread.interrupted()) {
// 若已完成的吐司队列为空,则阻塞等待
Toast t = finishedQueue.take();
// 验证吐司是否按顺序进行,所有吐司是否已经加了果酱
if(t.getId() != counter++ || t.getStatus() != Toast.Status.JAMMED) {
System.out.println(">>>> Error: " + t);
System.exit(1);
} else
System.out.println("咀嚼! " + t);
}
} catch(InterruptedException e) {
System.out.println("Eater interrupted");
}
System.out.println("Eater off");
}
}
Toast是一个使用enum值的优秀示例。注意,这个示例中没有任何显式的同步(即使用Lock 对象或synchronized关键字的同步),因为同步由队列(其内部是同步的)和系统的设计隐式地管理了——每片Toast在任何时刻都只由一个任务在操作。因为队列的阻塞,使得处理过程将被自动地挂起和恢复。你可以看到由BlockingQueue产生的简化十分明显。在使用显式的wait()和notifyAll()时存在的类和类之间的耦合被消除了,因为每个类都只和它的BlockingQuene通信。
死锁
我们已经知道,任务可以变成阻塞状态,所以就可能出现这种情况:某个任务在等待另一个任务,而后者又在等待别的任务,这样一直下去,直到这个链条上的任务又在等待第一个任务释放锁。这得到了一个任务之间相互等待的连续循环,没有哪个线程能继续,这被称之为死锁。
一个经典的死锁例证就是哲学将就餐问题:给问题的基本描述是指定五个哲学家,这些哲学将花部分时间思考,花部分时间就餐。当他们思考的时候,不需要任何共享资源;当他们就餐的时候,将使用有限数量的餐具。问题中引入的难点是:作为哲学家,他们很穷,他们每人只买得起一根筷子。他们围坐在桌子周围,每人之间放一根筷子。当一个哲学家要就餐的时候,这个哲学家就必须同时得到左边和右边的筷子,如果一个哲学家左边或右边已经有人在使用筷子了,那么这个哲学家就必须等待,直至可得到必需的筷子。
public class Chopstick {
// 筷子是否被拿走
private boolean taken = false;
// 拿筷子
public synchronized void take() throws InterruptedException {
while (taken) {
wait();
}
taken = true;
}
// 归还筷子
public synchronized void drop() {
taken = false;
notifyAll();
}
}
任何两个Philosopher(哲学家)都不能成功take()同一根筷子。另外,如果一根Chopstick(筷子)已经被某个Philosopher获得,那么另一个Philosopher可以wait(),直至这根Chopstick的当前持有者调用drop()使其可用为止。
当一个Philosopher任务调用take()时,这个Philosopher将等待,直至taken标志变为false(直至当前持有Chopstick的Philosopher释放它)。然后这个任务会将taken标志设置为true,以表示现在由新的Philosopher持有这根Chopstick。当这个Philosopher使用完这根Chopstick时,它会调用drop()来修改标志的状态,并notifyAll()所有其他的Philosopher,这些Philosopher中有些可能就在wait()这根Chopstick。
public class Philosopher implements Runnable {
// 左边的筷子
private final Chopstick left;
// 右边的筷子
private final Chopstick right;
private final int id;
// 权重因子
private final int ponderFactor;
private Random random = new Random(47);
public Philosopher(Chopstick left, Chopstick right, int id, int ponderFactor) {
this.left = left;
this.right = right;
this.id = id;
this.ponderFactor = ponderFactor;
}
private void pause() throws InterruptedException {
if (ponderFactor == 0) {
return;
}
TimeUnit.MILLISECONDS.sleep(random.nextInt(ponderFactor * 250));
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
System.out.println(this + " thinking");
pause();
// 哲学家饿了
System.out.println(this + " grabbing right");
right.take();
System.out.println(this + " grabbing left");
left.take();
System.out.println(this + " eating");
pause();
right.drop();
left.drop();
}
} catch (InterruptedException e) {
System.out.println(this + " exiting via interrupt");
}
}
@Override
public String toString() {
return "Philosopher " + id;
}
}
在Philosopher.run()中,每个Philosopher只是不断地思考和吃饭。如果PonderFactor不为0,则pause()方法会休眠(sleeps)一段随机的时间。通过使用这种方式,你将看到Philosopher会在思考上花掉一段随机化的时间,然后尝试着获取(take())右边和左边的Chopstick,随后在吃饭上再花掉一段随机化的时间,之后重复此过程。
现在我们可以建立这个程序的将会产生死锁的版本了∶
package concurrency;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* @author Mr.Sun
* @date 2022年09月06日 11:28
*
* 死锁示例: 哲学家进餐问题
*
* {args: 0 5 timeout}
*/
public class DeadLockDiningPhilosopher {
public static void main(String[] args) throws Exception {
int ponder = 5;
if (args.length > 0 ) {
ponder = Integer.parseInt(args[0]);
}
int size = 5;
if (args.length > 1) {
size = Integer.parseInt(args[1]);
}
ExecutorService exec = Executors.newCachedThreadPool();
Chopstick[] sticks = new Chopstick[size];
for (int i = 0; i < size; i++) {
sticks[i] = new Chopstick();
}
for (int i = 0; i < size; i++) {
exec.execute(new Philosopher(sticks[i], sticks[(i+1) % size], i, ponder));
}
if (args.length == 3 && args[2].equals("timeout")) {
TimeUnit.SECONDS.sleep(10);
} else {
System.out.println("Press 'Enter' to quit");
System.in.read();
}
exec.shutdownNow();
}
}
你会发现,如果Philosopher花在思考上的时间非常少,那么当他们想要进餐时,全都会在Chopstick上产生竞争,而死锁也就会更快地发生。
第一个命令行参数可以调整ponder因子,从而影响每个Philosopher花费在思考上的时间长度。如果有许多Philosopher,或者他们花费很多时间去思考,那么尽管存在死锁的可能,但你可能永远也看不到死锁。值为0的命令行参数倾向于使死锁尽快发生。
注意,Chopstick对象不需要内部标识符,它们是由在数组sticks中的位置来标识的。每个Philosopher构造器都会得到一个对左边和右边Chopstick对象的引用。除了最后一个Philosopher,其他所有的Philosopher都是通过将这个Philosopher定位于下一对Chopstick对象之间而被初始化的,而最后一个Philosopher右边的Chopstick是第0个Chopstick,这样这个循环表也就结束了。因为最后一个Philosopher坐在第一个Philosopher的右边,所以他们会共享第0个Chopstick。现在,所有的Philosopher都有可能希望进餐,从而等待其临近的Philosopher放下它们的Chopstick。这将使程序死锁。
如果Philosopher花费更多的时间去思考而不是进餐(使用非0的ponder值,或者大量的Philosopher),那么他们请求共享资源(Chopstick)的可能性就会小许多,这样你就会确信该程序不会死锁,尽管它们并非如此。这个示例相当有趣,因为它演示了看起来可以正确运行,但实际上会死锁的程序。
要修正死锁问题,你必须明白,当以下四个条件同时满足时,就会发生死锁∶
1)互斥条件。任务使用的资源中至少有一个是不能共享的。这里,一根Chopstick一次就只能被一个Philosopher使用。
2)至少有一个任务它必须持有一个资源且正在等待获取一个当前被别的任务持有的资源。也就是说,要发生死锁,Philosopher必须拿着一根Chopstick并且等待另一根。
3)资源不能被任务抢占,任务必须把资源释放当作普通事件。Philosopher很有礼貌,他们不会从其他Philosopher那里抢Chopstick。
4)必须有循环等待,这时,一个任务等待其他任务所持有的资源,后者又在等待另一个任务所持有的资源,这样一直下去,直到有一个任务在等待第一个任务所持有的资源,使得大家都被锁住。在DeadlockDiningPhilosophers.java中,因为每个Philosopher都试图先得到右边的Chopstick,然后得到左边的Chopstick,所以发生了循环等待。
因为要发生死锁的话,所有这些条件必须全部满足;所以要防止死锁的话,只需破坏其中一个即可。在程序中,防止死锁最容易的方法是破坏第4个条件。有这个条件的原因是每个Philosopher都试图用特定的顺序拿Chopstick∶先右后左。正因为如此,就可能会发生“每个人都拿着右边的Chopstick,并等待左边的Chopstick”的情况,这就是循环等待条件。然而,如果最后一个Philosopher被初始化成先拿左边的Chopstick,后拿右边的Chopstick,那么这个Philosopher将永远不会阻止其右边的Philosopher拿起他们的Chopstick。在本例中,这就可以防止循环等待。这只是问题的解决方法之一,也可以通过破坏其他条件来防止死锁(具体细节请参考更高级的讨论线程的书籍)∶
package concurrency;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* @author Mr.Sun
* @date 2022年09月06日 11:52
*
* 解决哲学家就餐的死锁问题:通过修改循环等待条件,即修改最后一个哲学家先拿走左手边的筷子,再拿走右手边的筷子
* {args: 5 5 timeout}
*/
public class FixedDiningPhilosopher {
public static void main(String[] args) throws Exception {
int ponder = 5;
if (args.length > 0 ) {
ponder = Integer.parseInt(args[0]);
}
int size = 5;
if (args.length > 1) {
size = Integer.parseInt(args[1]);
}
ExecutorService exec = Executors.newCachedThreadPool();
Chopstick[] sticks = new Chopstick[size];
for (int i = 0; i < size; i++) {
sticks[i] = new Chopstick();
}
for (int i = 0; i < size; i++) {
// 若不是最后一个哲学家,则都先拿走右手边的筷子
if (i < (size - 1)) {
exec.execute(new Philosopher(sticks[i], sticks[(i+1)], i, ponder));
} else {
// 最后一个哲学家先拿起左手边的筷子
exec.execute(new Philosopher(sticks[0], sticks[(i)], i, ponder));
}
}
if (args.length == 3 && args[2].equals("timeout")) {
TimeUnit.SECONDS.sleep(10);
} else {
System.out.println("Press 'Enter' to quit");
System.in.read();
}
exec.shutdownNow();
}
}
通过确保最后一个Philosopher先拿起和放下左边的Chopstick,我们可以移除死锁,从而使这个程序平滑地运行。
新类库中的构件
Java SE5的java.util.concurrent引入了大量设计用来解决并发问题的新类,学习使用它们将有助于你编写出更加简单而健壮的并发程序。
CountDownLatch
它被用来同步一个或多个任务,强制它们等待由其他任务执行的一组操作完成。
你可以向CountDownLatch对象设置一个初始计数值,任何在这个对象上调用wait()的方法都将阻塞,直至这个计数值到达0。其他任务在结束其工作时,可以在该对象上调用countDown()来减小这个计数值。CountDownLatch被设计为只触发一次,计数值不能被重置。如果你需要能够重置计数值的版本,则可以使用CyclicBarrier。
调用countDown()的任务在产生这个调用时并没有被阻塞,只有对await()的调用会被阻塞,直至计数值到达0。
CountDownLatch的典型用法是将一个程序分为n个互相独立的可解决任务,并创建值为0的CountDownLatch。当每个任务完成时,都会在这个锁存器上调用countDown()。等待问题被解决的任务在这个锁存器上调用await(),将它们自己拦住,直至锁存器计数结束。下面是演示这种技术的一个框架示例∶
package concurrency;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* @author Mr.Sun
* @date 2022年09月06日 15:02
*
* 倒计时锁存器 使用示例
*/
public class CountDownLatchDemo {
static final int SIZE = 100;
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
// 所有任务都必须共享一个CountDownLatch对象
CountDownLatch latch = new CountDownLatch(SIZE);
for (int i = 0; i < 10; i++) {
exec.execute(new WaitingTask(latch));
}
for (int i = 0; i < SIZE; i++) {
exec.execute(new TaskPortion(latch));
}
System.out.println("latch all tasks");
// 当所有任务都完成的时候退出程序
exec.shutdown();
}
}
class TaskPortion implements Runnable {
private static int counter = 0;
private final int id = counter++;
private static Random random = new Random(47);
private final CountDownLatch latch;
public TaskPortion(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void run() {
try {
doWork();
latch.countDown();
} catch (InterruptedException e) {
}
}
public void doWork() throws InterruptedException {
TimeUnit.MILLISECONDS.sleep(random.nextInt(2000));
System.out.println(this + "completed");
}
@Override
public String toString() {
return String.format("%1$-3d", id);
}
}
class WaitingTask implements Runnable {
private static int counter = 0;
private final int id = counter++;
private final CountDownLatch latch;
public WaitingTask(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void run() {
try {
latch.await();
System.out.println("Latch barrier passed for " + this);
} catch (InterruptedException e) {
System.out.println(this + " interrupted");
}
}
@Override
public String toString() {
return String.format("WaitingTask %1$-3d", id);
}
}
TaskPortion将随机地休眠一段时间,以模拟这部分工作的完成,而WaitingTask表示系统中必须等待的部分,它要等待问题的初始化部分完成为止。所有任务都是用在main()中定义的同一个单一的CountDownLatch。
CyclicBarrier
CyclicBarrier适用于这样的情况∶你希望创建一组任务,它们并行地执行工作,然后在进行下一个步骤之前等待,直至所有任务都完成(看起来有些像join0)。它使得所有的并行任务都将在栅栏处列队,因此可以一致地向前移动。这非常像CountDownLatch,只是CountDownLatch是只触发一次的事件,而CyclicBarrier可以多次重用。
下面以赛马游戏为示例,演示CyclicBarrier的使用:
package concurrency;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.*;
/**
* @author Mr.Sun
* @date 2022年09月06日 15:22
*
* 演示CyclicBarrier的使用
*/
public class HorseRace {
static final int FINISH_LINE = 75;
private List<Horse> horses = new ArrayList<>();
private ExecutorService exec = Executors.newCachedThreadPool();
private CyclicBarrier barrier;
public HorseRace(int nhorses, final int pause) {
barrier = new CyclicBarrier(nhorses, new Runnable() {
@Override
public void run() {
StringBuilder s = new StringBuilder();
for (int i = 0; i < FINISH_LINE; i++) {
s.append("="); // 跑道上的栅栏
}
System.out.println(s);
for (Horse horse : horses) {
System.out.println(horse.tracks());
}
for (Horse horse : horses) {
if (horse.getStrides() >= FINISH_LINE) {
System.out.println(horse + "won!");
exec.shutdownNow();
return;
}
try {
TimeUnit.MILLISECONDS.sleep(pause);
} catch (InterruptedException e) {
System.out.println("barrier-action sleep interrupted");
}
}
}
});
for (int i = 0; i < nhorses; i++) {
Horse horse = new Horse(barrier);
horses.add(horse);
exec.execute(horse);
}
}
public static void main(String[] args) {
int nHorses = 7;
int pause = 200;
if (args.length > 0) {
int n = new Integer(args[0]);
nHorses = n > 0 ? n : nHorses;
}
if (args.length > 1) {
int p = new Integer(args[1]);
pause = p > -1 ? p : pause;
}
new HorseRace(nHorses, pause);
}
}
class Horse implements Runnable {
private static int counter = 0;
private final int id = counter++;
private int strides = 0;
private Random random = new Random(47);
private CyclicBarrier barrier;
public Horse(CyclicBarrier barrier) {
this.barrier = barrier;
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
synchronized (this) {
strides += random.nextInt(3);
}
barrier.await();
}
} catch (InterruptedException e) {
} catch (BrokenBarrierException e) {
throw new RuntimeException(e);
}
}
public synchronized int getStrides() {
return strides;
}
@Override
public String toString() {
return "Horse " + id + " ";
}
public String tracks() {
StringBuilder s = new StringBuilder();
for (int i = 0; i < getStrides(); i++) {
s.append("*");
}
s.append(id);
return s.toString();
}
}
可以向CyclicBarrier提供一个"栅栏动作",它是一个Runnable,当计数值到达0时自动执行————这是CyclicBarrier和CountDownLatch之间的另一个区别。这里,栅栏动作是作为匿名内部类创建的,它被提交给了CyclicBarrier的构造器。
我试图让每匹马都打印自己,但是之后的显示顺序取决于任务管理器。CyclicBarrier使得每匹马都要执行为了向前移动所必需执行的所有工作,然后必须在栅栏处等待其他所有的马都准备完毕。当所有的马都向前移动时,CyclicBarrier将自动调用Runnable栅栏动作任务,按顺序显示马和终点线的位置。
一旦所有的任务都越过了栅栏,它就会自动地为下一回合比赛做好准备。
为了展示这个非常简单的动画效果,你需要将控制台视窗的尺寸调整为小到只有马时,才会展示出来。
DelayQueue
这是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队中取走。这种队列是有序的,即队头对象的延迟到期的时间最长。如果没有任何延迟到期,那么就不会有任何头元素,并且poll()将返回mull(正因为这样,你不能将mull放置到这种队列中)。
下面是一个示例,其中的Delayed对象自身就是任务,而DelayedTaskConsumer将最“紧急”的任务(到期时间最长的任务)从队列中取出,然后运行它。注意,这样DelayQueue就成为了优先级队列的一种变体∶
package concurrency.juc;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.*;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
/**
* @author Mr.Sun
* @date 2022年09月06日 16:08
*
* 延迟队列演示
*/
public class DelayQueueDemo {
public static void main(String[] args) {
Random rand = new Random(47);
ExecutorService exec = Executors.newCachedThreadPool();
DelayQueue<DelayedTask> queue = new DelayQueue<>();
// Fill with tasks that have random delays:
for(int i = 0; i < 20; i++) {
queue.put(new DelayedTask(rand.nextInt(5000)));
}
// Set the stopping point
queue.add(new DelayedTask.EndSentinel(5000, exec));
exec.execute(new DelayedTaskConsumer(queue));
}
}
class DelayedTask implements Runnable, Delayed {
private static int counter = 0;
private final int id = counter++;
private final int delta;
private final long trigger;
protected static List<DelayedTask> sequence = new ArrayList<>();
public DelayedTask(int delayMillisecond) {
this.delta = delayMillisecond;
this.trigger = System.nanoTime() + NANOSECONDS.convert(delta, MILLISECONDS);
sequence.add(this);
}
@Override
public int compareTo(Delayed arg) {
DelayedTask that = (DelayedTask) arg;
if (trigger < that.trigger) {
return -1;
}
if (trigger > that.trigger) {
return 1;
}
return 0;
}
@Override
public void run() {
System.out.print(this + " ");
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(trigger - System.nanoTime(), NANOSECONDS);
}
@Override
public String toString() {
return String.format("[%1$-4d]", delta) + " Task " + id;
}
public String summary() {
return "(" + id + ":" + delta + ")";
}
public static class EndSentinel extends DelayedTask {
private ExecutorService exec;
public EndSentinel(int delay, ExecutorService e) {
super(delay);
exec = e;
}
@Override
public void run() {
for(DelayedTask pt : sequence) {
System.out.print(pt.summary() + " ");
}
System.out.println();
System.out.println(this + " Calling shutdownNow()");
exec.shutdownNow();
}
}
}
class DelayedTaskConsumer implements Runnable {
private DelayQueue<DelayedTask> q;
public DelayedTaskConsumer(DelayQueue<DelayedTask> q) {
this.q = q;
}
@Override
public void run() {
try {
while(!Thread.interrupted())
q.take().run(); // Run task with the current thread
} catch(InterruptedException e) {
// Acceptable way to exit
}
System.out.println("Finished DelayedTaskConsumer");
}
} /* Output:
[128 ] Task 11 [200 ] Task 7 [429 ] Task 5 [520 ] Task 18 [555 ] Task 1 [961 ] Task 4 [998 ] Task 16 [1207] Task 9 [1693] Task 2 [1809] Task 14 [1861] Task 3 [2278] Task 15 [3288] Task 10 [3551] Task 12 [4258] Task 0 [4258] Task 19 [4522] Task 8 [4589] Task 13 [4861] Task 17 [4868] Task 6 (0:4258) (1:555) (2:1693) (3:1861) (4:961) (5:429) (6:4868) (7:200) (8:4522) (9:1207) (10:3288) (11:128) (12:3551) (13:4589) (14:1809) (15:2278) (16:998) (17:4861) (18:520) (19:4258) (20:5000)
[5000] Task 20 Calling shutdownNow()
Finished DelayedTaskConsumer
*///:~
DelayedTask包含一个称为sequence的List
NANOSECONDS.convert(delta, MILLISECONDS);
在getDelay()中,希望使用的单位是作为unit参数传递进来的,你使用它将当前时间与触发时间之间的差转换为调用者要求的单位,而无需知道这些单位是什么(这是策略设计模式的一个简单示例,在这种模式中,算法的一部分是作为参数传递进来的)。
为了排序,Delayed接口还继承了Comparable接口,因此必须实现compareTo(),使其可以产生合理的比较。toString()和summary()提供了输出格式化,而嵌套的EndSentinel类提供了一种关闭所有事物的途径,具体做法是将其放置为队列的最后一个元素。
注意,因为DelayedTaskConsumer自身是一个任务,所以它有自己的Thread,它可以使用这个线程来运行从队中获取的所有任务。由于任务是按照队列优先级的顺序执行的,因此在本例中不需要启动任何单独的线程来运行DelayedTask。
从输出中可以看到,任务创建的顺序对执行顺序没有任何影响,任务是按照所期望的延迟顺序执行的。
PriorityBlockingQueue
这是一个很基础的优先级队列,它具有可阻塞的读取操作。下面是一个示例,其中在优先级队列中的对象是按照优先级顺序从队列中出现的任务。PrioritizedTask被赋予了一个优先级数字,以此来提供这种顺序∶
package concurrency.juc;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* @author Mr.Sun
* @date 2022年09月06日 16:35
*
* 演示优先级阻塞队列
*/
public class PriorityBlockingQueueDemo {
public static void main(String[] args) {
Random rand = new Random(47);
ExecutorService exec = Executors.newCachedThreadPool();
PriorityBlockingQueue<Runnable> queue = new PriorityBlockingQueue<>();
exec.execute(new PrioritizedTaskProducer(queue, exec));
exec.execute(new PrioritizedTaskConsumer(queue));
}
}
class PrioritizedTask implements Runnable, Comparable<PrioritizedTask> {
private Random random = new Random(47);
private static int counter = 0;
private final int id = counter++;
private final int priority;
protected static List<PrioritizedTask> sequence = new ArrayList<>();
public PrioritizedTask(int priority) {
this.priority = priority;
sequence.add(this);
}
@Override
public int compareTo(PrioritizedTask arg) {
return priority < arg.priority ? 1 : (priority > arg.priority ? -1 :0);
}
@Override
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(random.nextInt(250));
} catch(InterruptedException e) {
// Acceptable way to exit
}
System.out.println(this);
}
@Override
public String toString() {
return String.format("[%1$-3d]", priority) + " Task " + id;
}
public String summary() {
return "(" + id + ":" + priority + ")";
}
public static class EndSentinel extends PrioritizedTask {
private ExecutorService exec;
public EndSentinel(ExecutorService e) {
super(-1); // 此程序中的最低优先级
exec = e;
}
@Override
public void run() {
int count = 0;
for(PrioritizedTask pt : sequence) {
System.out.print(pt.summary());
if(++count % 5 == 0) {
System.out.println();
}
}
System.out.println();
System.out.println(this + " Calling shutdownNow()");
exec.shutdownNow();
}
}
}
class PrioritizedTaskProducer implements Runnable {
private Random rand = new Random(47);
private Queue<Runnable> queue;
private ExecutorService exec;
public PrioritizedTaskProducer(Queue<Runnable> q, ExecutorService e) {
queue = q;
exec = e; // Used for EndSentinel
}
@Override
public void run() {
// 无界队列,不会被阻塞
// 用随机优先级快速填充
for(int i = 0; i < 20; i++) {
queue.add(new PrioritizedTask(rand.nextInt(10)));
Thread.yield();
}
// 最高优先级工作的任务
try {
for(int i = 0; i < 10; i++) {
TimeUnit.MILLISECONDS.sleep(250);
queue.add(new PrioritizedTask(10));
}
// 添加作业时,优先级最低的优先:
for(int i = 0; i < 10; i++) {
queue.add(new PrioritizedTask(i));
}
// 停止所有任务的哨兵:
queue.add(new PrioritizedTask.EndSentinel(exec));
} catch(InterruptedException e) {
// Acceptable way to exit
}
System.out.println("Finished PrioritizedTaskProducer");
}
}
class PrioritizedTaskConsumer implements Runnable {
private PriorityBlockingQueue<Runnable> q;
public PrioritizedTaskConsumer(PriorityBlockingQueue<Runnable> q) {
this.q = q;
}
@Override
public void run() {
try {
while(!Thread.interrupted())
// 使用当前线程运行任务
q.take().run();
} catch(InterruptedException e) {
// Acceptable way to exit
}
System.out.println("Finished PrioritizedTaskConsumer");
}
}
与前一个示例相同,PrioritizedTask对象的创建序列被记录在sequence List中,用于和实际的执行顺序比较。run()方法将休眠一小段随机的时间,然后打印对象信息,而EndSentinel提供了和前面相同的功能,要确保它是队列中最后一个对象。
PrioritizedTaskProducer和PrioritizedTaskComsumer通过PriorityBlockinQueue彼此连接。因为这种队列的阻塞特性提供了所有必需的同步,所以你应该注意到了,这里不需要任何显式的同步——不必考虑当你从这种队列中读取时,其中是否有元素,因为这个队列在没有元素时,将直接阻塞读取者。
Semaphore
正常的锁(来自concurrent.locks或内建的synchronized锁)在任何时刻都只允许一个任务访问一项资源,而计数信号量允许n个任务同时访问这个资源。你还可以将信号量看作是在向外分发使用资源的“许可证”,尽管实际上没有使用任何许可证对象。
作为一个示例,请考虑对象池的概念,它管理着数量有限的对象,当要使用对象时可以签出它们,而在用户使用完毕时,可以将它们签回。这种功能可以被封装到一个泛型类中∶
package concurrency.juc;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Semaphore;
/**
* @author Mr.Sun
* @date 2022年09月06日 17:13
*
* 演示 Semaphore
*/
public class Pool<T> {
private int size;
private List<T> items = new ArrayList<>();
private volatile boolean[] checkedOut;
private Semaphore available;
public Pool(Class<T> classObject, int size) {
this.size = size;
checkedOut = new boolean[size];
// 加载具有可检出对象的池
available = new Semaphore(size, true);
for (int i = 0; i < size; i++) {
try {
// Assumes a default constructor:
items.add(classObject.newInstance());
} catch(Exception e) {
throw new RuntimeException(e);
}
}
}
public T checkOut() throws InterruptedException {
available.acquire();
return getItem();
}
public void checkIn(T x) {
if(releaseItem(x)) {
available.release();
}
}
private synchronized T getItem() {
for(int i = 0; i < size; ++i)
if(!checkedOut[i]) {
checkedOut[i] = true;
return items.get(i);
}
return null; // 信号量阻止到达这里
}
private synchronized boolean releaseItem(T item) {
int index = items.indexOf(item);
if(index == -1) {
return false; // Not in the list
}
if(checkedOut[index]) {
checkedOut[index] = false;
return true;
}
return false; // Wasn't checked out
}
}
在这个简化的形式中,构造器使用newInstance0来把对象加载到池中。如果你需要一个新对象,那么可以调用checkOut(),并且在使用完之后,将其递交给checkIn()。
boolean类型的数组checkedOut可以跟踪被签出的对象,并且可以通过getItem()和releaseItem()方法来管理。而这些都将由Semaphore类型的available来加以确保,因此,在checkOut()中,如果没有任何信号量许可证可用(这意味着在池中没有更多的对象了),available 将阻塞调用过程。在checkIn()中,如果被签入的对象有效,则会向信号量返回一个许可证。
为了创建一个示例,我们可以使用Fat,这是一种创建代价高昂的对象类型,因为它的构造器运行起来很耗时∶
package concurrency.juc;
/**
* @author Mr.Sun
* @date 2022年09月06日 17:22
*/
public class Fat {
private volatile double d; // 防止优化
private static int counter = 0;
private final int id = counter++;
public Fat() {
// 耗时,可中断的操作
for(int i = 1; i < 10000; i++) {
d += (Math.PI + Math.E) / (double)i;
}
}
public void operation() {
System.out.println(this);
}
@Override
public String toString() {
return "Fat id: " + id;
}
}
我们在池中管理这些对象,以限制这个构造器所造成的影响。我们可以创建一个任务,它将签出Fat对象,持有一段时间之后再将它们签入,以此来测试Pool这个类∶
package concurrency.juc;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/**
* @author Mr.Sun
* @date 2022年09月06日 17:25
*/
public class SemaphoreDemo {
final static int SIZE = 25;
public static void main(String[] args) throws Exception {
final Pool<Fat> pool = new Pool<>(Fat.class, SIZE);
ExecutorService exec = Executors.newCachedThreadPool();
for(int i = 0; i < SIZE; i++) {
exec.execute(new CheckoutTask<>(pool));
}
System.out.println("All CheckoutTasks created");
List<Fat> list = new ArrayList<>();
for(int i = 0; i < SIZE; i++) {
Fat f = pool.checkOut();
System.out.print(i + ": main() thread checked out ");
f.operation();
list.add(f);
}
Future<?> blocked = exec.submit(new Runnable() {
@Override
public void run() {
try {
// Semaphore prevents additional checkout,
// so call is blocked:
pool.checkOut();
} catch(InterruptedException e) {
System.out.println("checkOut() Interrupted");
}
}
});
TimeUnit.SECONDS.sleep(2);
blocked.cancel(true); // Break out of blocked call
System.out.println("Checking in objects in " + list);
for(Fat f : list) {
pool.checkIn(f);
}
for(Fat f : list) {
pool.checkIn(f); // Second checkIn ignored
}
exec.shutdown();
}
}
// 从池中检出资源的任务:
class CheckoutTask<T> implements Runnable {
private static int counter = 0;
private final int id = counter++;
private Pool<T> pool;
public CheckoutTask(Pool<T> pool) {
this.pool = pool;
}
@Override
public void run() {
try {
T item = pool.checkOut();
System.out.println(this + "checked out " + item);
TimeUnit.SECONDS.sleep(1);
System.out.println(this +"checking in " + item);
pool.checkIn(item);
} catch(InterruptedException e) {
// Acceptable way to terminate
}
}
@Override
public String toString() {
return "CheckoutTask " + id + " ";
}
}
在main()中,创建了一个持有Fat对象的Pool,而一组CheckoutTask则开始操练这个Pool。然后,main()线程签出池中的Fat对象,但是并不签入它们。一旦池中所有的对象都被签出,Semaphore将不再允许执行任何签出操作。blocked的run()方法因此会被阻塞,2秒钟之后,cancel()方法被调用,以此来挣脱Future的束缚。注意,冗余的签入将被Pool忽略。
Exchanger
Exchanger是在两个任务之间交换对象的栅栏。当这些任务进入栅栏时,它们各自拥有一个对象,当它们离开时,它们都拥有之前由对象持有的对象。Exchanger的典型应用场景是∶一个任务在创建对象,这些对象的生产代价很高昂,而另一个任务在消费这些对象。通过这种方式,可以有更多的对象在被创建的同时被消费。