1 文章概述
在多线程编程实践中,我们肯定会面临线程间数据交互的问题。在处理这类问题时需要使用一些设计模式,从而保证程序的正确性和健壮性。
保护性暂停设计模式就是解决多线程间数据交互问题的一种模式。本文先从基础案例介绍保护性暂停基本概念和实践,再由浅入深,最终分析DUBBO源码中保护性暂停设计模式使用场景。
2 什么是保护性暂停
我们设想这样一种场景:线程A生产数据,线程B读取数据这个数据。
但是有一种情况:线程B准备读取数据时,此时线程A还没有生产出数据。
在这种情况下线程B不能一直空转,也不能立即退出,线程B要等到生产数据完成并拿到数据之后才退出。
那么在数据没有生产出这段时间,线程B需要执行一种等待机制,这样可以达到对系统保护目的,这就是保护性暂停。
保护性暂停有多种实现方式,本文我们用synchronized/wait/notify的方式实现。
@Getter
@Setter
public class MyData implements Serializable {
private static final long serialVersionUID = 1L;
private String message;
public MyData(String message) {
this.message = message;
}
}
class Resource1 {
private MyData data;
private Object lock = new Object();
public MyData getData() {
synchronized (lock) {
while (data == null) {
try {
// 没有数据则释放锁并暂停等待被唤醒
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return data;
}
}
public void sendData(MyData data) {
synchronized (lock) {
// 生产数据后唤醒消费线程
this.data = data;
lock.notifyAll();
}
}
}
/**
* 保护性暂停实例一
*
* @author 今日头条号「JAVA前线」
*/
public class ProtectDesignTest1 {
public static void main(String[] args) {
Resource1 resource = new Resource1();
new Thread(() -> {
try {
MyData data = new MyData("hello");
System.out.println(Thread.currentThread().getName() + "生产数据=" + data);
// 模拟发送耗时
TimeUnit.SECONDS.sleep(3);
resource.sendData(data);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t1").start();
new Thread(() -> {
MyData data = resource.getData();
System.out.println(Thread.currentThread().getName() + "接收到数据=" + data);
}, "t2").start();
}
}
在上述实例中线程1生产数据,线程2消费数据。Resource1类中通过wait/notify实现了保护性暂停设计模式。
3 加一个超时时间
上述实例中如果线程2没有获取到数据,那么线程2直到拿到数据才会退出。现在我们给获取数据指定一个超时时间,如果在这个时间内没有获取到数据则抛出超时异常。虽然只是加一个参数,但是其中有很多细节需要注意。
3.1 一段有问题的代码
我们分析下面这段代码
class Resource2 {
private MyData data;
private Object lock = new Object();
public MyData getData(int timeOut) {
synchronized (lock) {
while (data == null) {
try {
// 代码1
lock.wait(timeOut);
break;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if (data == null) {
throw new RuntimeException("超时未获取到结果");
}
return data;
}
}
public void sendData(MyData data) {
synchronized (lock) {
this.data = data;
lock.notifyAll();
}
}
}
/**
* 保护性暂停实例二
*
* @author 今日头条号「JAVA前线」
*/
public class ProtectDesignTest2 {
public static void main(String[] args) {
Resource2 resource = new Resource2();
new Thread(() -> {
try {
MyData data = new MyData("hello");
System.out.println(Thread.currentThread().getName() + "生产数据=" + data);
// 模拟发送耗时
TimeUnit.SECONDS.sleep(3);
resource.sendData(data);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t1").start();
new Thread(() -> {
MyData data = resource.getData(1000);
System.out.println(Thread.currentThread().getName() + "接收到数据=" + data);
}, "t2").start();
}
}
这段代码看似没有问题,使用的也是wait带有超时时间的参数,那么问题可能出在哪里呢?
问题是线程虚假唤醒带来的。如果还没有到超时时间代码1就被虚假唤醒,此时data还没有值就会直接跳出循环,这样没有达到我们预期的超时时间才跳出循环的预期。
关于虚假唤醒这个概念,我们看看JDK官方文档相关介绍。
官方文档告诉我们一个线程可能会在没有被notify时虚假唤醒,所以判断是否继续wait时必须用while循环。我们在写代码时一定也要注意线程虚假唤醒问题。
3.2 正确实例
上面我们明白了虚假唤醒问题,现在我们对代码进行修改,说明参看代码注释。
class Resource3 {
private MyData data;
private Object lock = new Object();
public MyData getData(int timeOut) {
synchronized (lock) {
// 运行时长
long timePassed = 0;
// 开始时间
long begin = System.currentTimeMillis();
// 如果结果为空
while (data == null) {
try {
// 如果运行时长大于超时时间退出循环
if (timePassed > timeOut) {
break;
}
// 如果运行时长小于超时时间表示虚假唤醒 -> 只需再等待时间差值
long waitTime = timeOut - timePassed;
// 等待时间差值
lock.wait(waitTime);
// 结果不为空直接返回
if (data != null) {
break;
}
// 被唤醒后计算运行时长
timePassed = System.currentTimeMillis() - begin;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if (data == null) {
throw new RuntimeException("超时未获取到结果");
}
return data;
}
}
public void sendData(MyData data) {
synchronized (lock) {
this.data = data;
lock.notifyAll();
}
}
}
/**
* 保护性暂停实例三
*
* @author 今日头条号「JAVA前线」
*/
public class ProtectDesignTest3 {
public static void main(String[] args) {
Resource3 resource = new Resource3();
new Thread(() -> {
try {
MyData data = new MyData("hello");
System.out.println(Thread.currentThread().getName() + "生产数据=" + data);
// 模拟发送耗时
TimeUnit.SECONDS.sleep(3);
resource.sendData(data);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t1").start();
new Thread(() -> {
MyData data = resource.getData(1000);
System.out.println(Thread.currentThread().getName() + "接收到数据=" + data);
}, "t2").start();
}
}
4 加一个编号
现在再来设想一个场景:现在有三个生产数据的线程1、2、3,三个获取数据的线程4、5、6,我们希望每个获取数据线程都只拿到其中一个生产线程的数据,不能多拿也不能少拿。
这里引入一个Futures模型,这个模型为每个资源进行编号并存储在容器中,例如线程1生产的数据被拿走则从容器中删除,一直到容器为空结束。
@Getter
@Setter
public class MyNewData implements Serializable {
private static final long serialVersionUID = 1L;
private static final AtomicLong ID = new AtomicLong(0);
private Long id;
private String message;
public MyNewData(String message) {
this.id = newId();
this.message = message;
}
/**
* 自增到最大值会回到最小值(负值可以作为识别ID)
*/
private static long newId() {
return ID.getAndIncrement();
}
public Long getId() {
return this.id;
}
}
class MyResource {
private MyNewData data;
private Object lock = new Object();
public MyNewData getData(int timeOut) {
synchronized (lock) {
long timePassed = 0;
long begin = System.currentTimeMillis();
while (data == null) {
try {
if (timePassed > timeOut) {
break;
}
long waitTime = timeOut - timePassed;
lock.wait(waitTime);
if (data != null) {
break;
}
timePassed = System.currentTimeMillis() - begin;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if (data == null) {
throw new RuntimeException("超时未获取到结果");
}
return data;
}
}
public void sendData(MyNewData data) {
synchronized (lock) {
this.data = data;
lock.notifyAll();
}
}
}
class MyFutures {
private static final Map<Long, MyResource> FUTURES = new ConcurrentHashMap<>();
public static MyResource newResource(MyNewData data) {
final MyResource future = new MyResource();
FUTURES.put(data.getId(), future);
return future;
}
public static MyResource getResource(Long id) {
return FUTURES.remove(id);
}
public static Set<Long> getIds() {
return FUTURES.keySet();
}
}
/**
* 保护性暂停实例四
*
* @author 今日头条号「JAVA前线」
*/
public class ProtectDesignTest4 {
public static void main(String[] args) throws Exception {
for (int i = 0; i < 3; i++) {
final int index = i;
new Thread(() -> {
try {
MyNewData data = new MyNewData("hello_" + index);
MyResource resource = MyFutures.newResource(data);
// 模拟发送耗时
TimeUnit.SECONDS.sleep(1);
resource.sendData(data);
System.out.println("生产数据data=" + data);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
TimeUnit.SECONDS.sleep(1);
for (Long i : MyFutures.getIds()) {
final long index = i;
new Thread(() -> {
MyResource resource = MyFutures.getResource(index);
int timeOut = 3000;
System.out.println("接收数据data=" + resource.getData(timeOut));
}).start();
}
}
}
5 DUBBO应用实例
我们顺着这一个链路跟踪代码:消费者发送请求 > 提供者接收请求并执行,并且将运行结果发送给消费者 >消费者接收结果。
(1) 消费者发送请求
消费者发送的数据包含请求ID,并且将关系维护进FUTURES容器
final class HeaderExchangeChannel implements ExchangeChannel {
@Override
public ResponseFuture request(Object request, int timeout) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setData(request);
// 代码1
DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout);
try {
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}
}
class DefaultFuture implements ResponseFuture {
// FUTURES容器
private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<>();
private DefaultFuture(Channel channel, Request request, int timeout) {
this.channel = channel;
this.request = request;
// 请求ID
this.id = request.getId();
this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
FUTURES.put(id, this);
CHANNELS.put(id, channel);
}
}
(2) 提供者接收请求并执行,并且将运行结果发送给消费者
public class HeaderExchangeHandler implements ChannelHandlerDelegate {
void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
// response与请求ID对应
Response res = new Response(req.getId(), req.getVersion());
if (req.isBroken()) {
Object data = req.getData();
String msg;
if (data == null) {
msg = null;
} else if (data instanceof Throwable) {
msg = StringUtils.toString((Throwable) data);
} else {
msg = data.toString();
}
res.setErrorMessage("Fail to decode request due to: " + msg);
res.setStatus(Response.BAD_REQUEST);
channel.send(res);
return;
}
// message = RpcInvocation包含方法名、参数名、参数值等
Object msg = req.getData();
try {
// DubboProtocol.reply执行实际业务方法
CompletableFuture<Object> future = handler.reply(channel, msg);
// 如果请求已经完成则发送结果
if (future.isDone()) {
res.setStatus(Response.OK);
res.setResult(future.get());
channel.send(res);
return;
}
} catch (Throwable e) {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(e));
channel.send(res);
}
}
}
(3) 消费者接收结果
以下DUBBO源码很好体现了保护性暂停这个设计模式,说明参看注释
class DefaultFuture implements ResponseFuture {
private final Lock lock = new ReentrantLock();
private final Condition done = lock.newCondition();
public static void received(Channel channel, Response response) {
try {
// 取出对应的请求对象
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
future.doReceived(response);
} else {
logger.warn("The timeout response finally returned at "
+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
+ ", response " + response
+ (channel == null ? "" : ", channel: " + channel.getLocalAddress()
+ " -> " + channel.getRemoteAddress()));
}
} finally {
CHANNELS.remove(response.getId());
}
}
@Override
public Object get(int timeout) throws RemotingException {
if (timeout <= 0) {
timeout = Constants.DEFAULT_TIMEOUT;
}
if (!isDone()) {
long start = System.currentTimeMillis();
lock.lock();
try {
while (!isDone()) {
// 放弃锁并使当前线程阻塞,直到发出信号中断它或者达到超时时间
done.await(timeout, TimeUnit.MILLISECONDS);
// 阻塞结束后再判断是否完成
if (isDone()) {
break;
}
// 阻塞结束后判断是否超时
if(System.currentTimeMillis() - start > timeout) {
break;
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
// response对象仍然为空则抛出超时异常
if (!isDone()) {
throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
}
}
return returnFromResponse();
}
private void doReceived(Response res) {
lock.lock();
try {
// 接收到服务器响应赋值response
response = res;
if (done != null) {
// 唤醒get方法中处于等待的代码块
done.signal();
}
} finally {
lock.unlock();
}
if (callback != null) {
invokeCallback(callback);
}
}
}
6 文章总结
本文我们从基础案例介绍保护性暂停基本概念和实践,最终分析DUBBO源码中保护性暂停设计模式使用场景。我们在设计并发框架时要注意虚假唤醒问题,以及请求和响应关系对应问题,希望本文对大家有所帮助。