<img alt="" src="https://oscimg.oschina.net/oscnet/up-1ae86da8e9b92a776ab39ff78a02e22579f.JPEG">-LMLPHP

1 文章概述

在多线程编程实践中,我们肯定会面临线程间数据交互的问题。在处理这类问题时需要使用一些设计模式,从而保证程序的正确性和健壮性。

保护性暂停设计模式就是解决多线程间数据交互问题的一种模式。本文先从基础案例介绍保护性暂停基本概念和实践,再由浅入深,最终分析DUBBO源码中保护性暂停设计模式使用场景。

2 什么是保护性暂停

我们设想这样一种场景:线程A生产数据,线程B读取数据这个数据。

但是有一种情况:线程B准备读取数据时,此时线程A还没有生产出数据。

在这种情况下线程B不能一直空转,也不能立即退出,线程B要等到生产数据完成并拿到数据之后才退出。

那么在数据没有生产出这段时间,线程B需要执行一种等待机制,这样可以达到对系统保护目的,这就是保护性暂停。

保护性暂停有多种实现方式,本文我们用synchronized/wait/notify的方式实现。

@Getter
@Setter
public class MyData implements Serializable {
    private static final long serialVersionUID = 1L;
    private String message;

    public MyData(String message) {
        this.message = message;
    }
}

class Resource1 {
    private MyData data;
    private Object lock = new Object();

    public MyData getData() {
        synchronized (lock) {
            while (data == null) {
                try {
                    // 没有数据则释放锁并暂停等待被唤醒
                    lock.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            return data;
        }
    }

    public void sendData(MyData data) {
        synchronized (lock) {
            // 生产数据后唤醒消费线程
            this.data = data;
            lock.notifyAll();
        }
    }
}

/**
 * 保护性暂停实例一
 *
 * @author 今日头条号「JAVA前线」
 */
public class ProtectDesignTest1 {

    public static void main(String[] args) {
        Resource1 resource = new Resource1();
        new Thread(() -> {
            try {
                MyData data = new MyData("hello");
                System.out.println(Thread.currentThread().getName() + "生产数据=" + data);
                // 模拟发送耗时
                TimeUnit.SECONDS.sleep(3);
                resource.sendData(data);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "t1").start();

        new Thread(() -> {
            MyData data = resource.getData();
            System.out.println(Thread.currentThread().getName() + "接收到数据=" + data);
        }, "t2").start();
    }
}

在上述实例中线程1生产数据,线程2消费数据。Resource1类中通过wait/notify实现了保护性暂停设计模式。

3 加一个超时时间

上述实例中如果线程2没有获取到数据,那么线程2直到拿到数据才会退出。现在我们给获取数据指定一个超时时间,如果在这个时间内没有获取到数据则抛出超时异常。虽然只是加一个参数,但是其中有很多细节需要注意。

3.1 一段有问题的代码

我们分析下面这段代码

class Resource2 {
    private MyData data;
    private Object lock = new Object();

    public MyData getData(int timeOut) {
        synchronized (lock) {
            while (data == null) {
                try {
                    // 代码1
                    lock.wait(timeOut);
                    break;
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            if (data == null) {
                throw new RuntimeException("超时未获取到结果");
            }
            return data;
        }
    }

    public void sendData(MyData data) {
        synchronized (lock) {
            this.data = data;
            lock.notifyAll();
        }
    }
}


/**
 * 保护性暂停实例二
 *
 * @author 今日头条号「JAVA前线」
 */
public class ProtectDesignTest2 {

    public static void main(String[] args) {
        Resource2 resource = new Resource2();
        new Thread(() -> {
            try {
                MyData data = new MyData("hello");
                System.out.println(Thread.currentThread().getName() + "生产数据=" + data);
                // 模拟发送耗时
                TimeUnit.SECONDS.sleep(3);
                resource.sendData(data);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "t1").start();

        new Thread(() -> {
            MyData data = resource.getData(1000);
            System.out.println(Thread.currentThread().getName() + "接收到数据=" + data);
        }, "t2").start();
    }
}

这段代码看似没有问题,使用的也是wait带有超时时间的参数,那么问题可能出在哪里呢?

问题是线程虚假唤醒带来的。如果还没有到超时时间代码1就被虚假唤醒,此时data还没有值就会直接跳出循环,这样没有达到我们预期的超时时间才跳出循环的预期。

关于虚假唤醒这个概念,我们看看JDK官方文档相关介绍。

官方文档告诉我们一个线程可能会在没有被notify时虚假唤醒,所以判断是否继续wait时必须用while循环。我们在写代码时一定也要注意线程虚假唤醒问题。

3.2 正确实例

上面我们明白了虚假唤醒问题,现在我们对代码进行修改,说明参看代码注释。

class Resource3 {
    private MyData data;
    private Object lock = new Object();

    public MyData getData(int timeOut) {
        synchronized (lock) {
            // 运行时长
            long timePassed = 0;
            // 开始时间
            long begin = System.currentTimeMillis();
            // 如果结果为空
            while (data == null) {
                try {
                    // 如果运行时长大于超时时间退出循环
                    if (timePassed > timeOut) {
                        break;
                    }
                    // 如果运行时长小于超时时间表示虚假唤醒 -> 只需再等待时间差值
                    long waitTime = timeOut - timePassed;

                    // 等待时间差值
                    lock.wait(waitTime);

                    // 结果不为空直接返回
                    if (data != null) {
                        break;
                    }
                    // 被唤醒后计算运行时长
                    timePassed = System.currentTimeMillis() - begin;
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            if (data == null) {
                throw new RuntimeException("超时未获取到结果");
            }
            return data;
        }
    }

    public void sendData(MyData data) {
        synchronized (lock) {
            this.data = data;
            lock.notifyAll();
        }
    }
}

/**
 * 保护性暂停实例三
 *
 * @author 今日头条号「JAVA前线」
 */
public class ProtectDesignTest3 {

    public static void main(String[] args) {
        Resource3 resource = new Resource3();
        new Thread(() -> {
            try {
                MyData data = new MyData("hello");
                System.out.println(Thread.currentThread().getName() + "生产数据=" + data);
                // 模拟发送耗时
                TimeUnit.SECONDS.sleep(3);
                resource.sendData(data);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "t1").start();

        new Thread(() -> {
            MyData data = resource.getData(1000);
            System.out.println(Thread.currentThread().getName() + "接收到数据=" + data);
        }, "t2").start();
    }
}

4 加一个编号

现在再来设想一个场景:现在有三个生产数据的线程1、2、3,三个获取数据的线程4、5、6,我们希望每个获取数据线程都只拿到其中一个生产线程的数据,不能多拿也不能少拿。

这里引入一个Futures模型,这个模型为每个资源进行编号并存储在容器中,例如线程1生产的数据被拿走则从容器中删除,一直到容器为空结束。

@Getter
@Setter
public class MyNewData implements Serializable {
    private static final long serialVersionUID = 1L;
    private static final AtomicLong ID = new AtomicLong(0);
    private Long id;
    private String message;

    public MyNewData(String message) {
        this.id = newId();
        this.message = message;
    }

    /**
     * 自增到最大值会回到最小值(负值可以作为识别ID)
     */
    private static long newId() {
        return ID.getAndIncrement();
    }

    public Long getId() {
        return this.id;
    }
}

class MyResource {
    private MyNewData data;
    private Object lock = new Object();

    public MyNewData getData(int timeOut) {
        synchronized (lock) {
            long timePassed = 0;
            long begin = System.currentTimeMillis();
            while (data == null) {
                try {
                    if (timePassed > timeOut) {
                        break;
                    }
                    long waitTime = timeOut - timePassed;
                    lock.wait(waitTime);
                    if (data != null) {
                        break;
                    }
                    timePassed = System.currentTimeMillis() - begin;
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            if (data == null) {
                throw new RuntimeException("超时未获取到结果");
            }
            return data;
        }
    }

    public void sendData(MyNewData data) {
        synchronized (lock) {
            this.data = data;
            lock.notifyAll();
        }
    }
}

class MyFutures {
    private static final Map<Long, MyResource> FUTURES = new ConcurrentHashMap<>();

    public static MyResource newResource(MyNewData data) {
        final MyResource future = new MyResource();
        FUTURES.put(data.getId(), future);
        return future;
    }

    public static MyResource getResource(Long id) {
        return FUTURES.remove(id);
    }

    public static Set<Long> getIds() {
        return FUTURES.keySet();
    }
}


/**
 * 保护性暂停实例四
 *
 * @author 今日头条号「JAVA前线」
 */
public class ProtectDesignTest4 {

    public static void main(String[] args) throws Exception {
        for (int i = 0; i < 3; i++) {
            final int index = i;
            new Thread(() -> {
                try {
                    MyNewData data = new MyNewData("hello_" + index);
                    MyResource resource = MyFutures.newResource(data);
                    // 模拟发送耗时
                    TimeUnit.SECONDS.sleep(1);
                    resource.sendData(data);
                    System.out.println("生产数据data=" + data);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }).start();
        }
        TimeUnit.SECONDS.sleep(1);

        for (Long i : MyFutures.getIds()) {
            final long index = i;
            new Thread(() -> {
                MyResource resource = MyFutures.getResource(index);
                int timeOut = 3000;
                System.out.println("接收数据data=" + resource.getData(timeOut));
            }).start();
        }
    }
}

5 DUBBO应用实例

我们顺着这一个链路跟踪代码:消费者发送请求 > 提供者接收请求并执行,并且将运行结果发送给消费者 >消费者接收结果。

(1) 消费者发送请求

消费者发送的数据包含请求ID,并且将关系维护进FUTURES容器

final class HeaderExchangeChannel implements ExchangeChannel {

    @Override
    public ResponseFuture request(Object request, int timeout) throws RemotingException {
        if (closed) {
            throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
        }
        Request req = new Request();
        req.setVersion(Version.getProtocolVersion());
        req.setTwoWay(true);
        req.setData(request);
        // 代码1
        DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout);
        try {
            channel.send(req);
        } catch (RemotingException e) {
            future.cancel();
            throw e;
        }
        return future;
    }
}

class DefaultFuture implements ResponseFuture {

    // FUTURES容器
    private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<>();

    private DefaultFuture(Channel channel, Request request, int timeout) {
        this.channel = channel;
        this.request = request;
        // 请求ID
        this.id = request.getId();
        this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
        FUTURES.put(id, this);
        CHANNELS.put(id, channel);
    }
}

(2) 提供者接收请求并执行,并且将运行结果发送给消费者

public class HeaderExchangeHandler implements ChannelHandlerDelegate {

    void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
        // response与请求ID对应
        Response res = new Response(req.getId(), req.getVersion());
        if (req.isBroken()) {
            Object data = req.getData();
            String msg;
            if (data == null) {
                msg = null;
            } else if (data instanceof Throwable) {
                msg = StringUtils.toString((Throwable) data);
            } else {
                msg = data.toString();
            }
            res.setErrorMessage("Fail to decode request due to: " + msg);
            res.setStatus(Response.BAD_REQUEST);
            channel.send(res);
            return;
        }
        // message = RpcInvocation包含方法名、参数名、参数值等
        Object msg = req.getData();
        try {

            // DubboProtocol.reply执行实际业务方法
            CompletableFuture<Object> future = handler.reply(channel, msg);

            // 如果请求已经完成则发送结果
            if (future.isDone()) {
                res.setStatus(Response.OK);
                res.setResult(future.get());
                channel.send(res);
                return;
            }
        } catch (Throwable e) {
            res.setStatus(Response.SERVICE_ERROR);
            res.setErrorMessage(StringUtils.toString(e));
            channel.send(res);
        }
    }
}

(3) 消费者接收结果

以下DUBBO源码很好体现了保护性暂停这个设计模式,说明参看注释

class DefaultFuture implements ResponseFuture {
    private final Lock lock = new ReentrantLock();
    private final Condition done = lock.newCondition();

    public static void received(Channel channel, Response response) {
        try {
            // 取出对应的请求对象
            DefaultFuture future = FUTURES.remove(response.getId());
            if (future != null) {
                future.doReceived(response);
            } else {
                logger.warn("The timeout response finally returned at "
                            + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
                            + ", response " + response
                            + (channel == null ? "" : ", channel: " + channel.getLocalAddress()
                               + " -> " + channel.getRemoteAddress()));
            }
        } finally {
            CHANNELS.remove(response.getId());
        }
    }


    @Override
    public Object get(int timeout) throws RemotingException {
        if (timeout <= 0) {
            timeout = Constants.DEFAULT_TIMEOUT;
        }
        if (!isDone()) {
            long start = System.currentTimeMillis();
            lock.lock();
            try {
                while (!isDone()) {

                    // 放弃锁并使当前线程阻塞,直到发出信号中断它或者达到超时时间
                    done.await(timeout, TimeUnit.MILLISECONDS);

                    // 阻塞结束后再判断是否完成
                    if (isDone()) {
                        break;
                    }

                    // 阻塞结束后判断是否超时
                    if(System.currentTimeMillis() - start > timeout) {
                        break;
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }
            // response对象仍然为空则抛出超时异常
            if (!isDone()) {
                throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
            }
        }
        return returnFromResponse();
    }

    private void doReceived(Response res) {
        lock.lock();
        try {
            // 接收到服务器响应赋值response
            response = res;
            if (done != null) {
                // 唤醒get方法中处于等待的代码块
                done.signal();
            }
        } finally {
            lock.unlock();
        }
        if (callback != null) {
            invokeCallback(callback);
        }
    }
}

6 文章总结

本文我们从基础案例介绍保护性暂停基本概念和实践,最终分析DUBBO源码中保护性暂停设计模式使用场景。我们在设计并发框架时要注意虚假唤醒问题,以及请求和响应关系对应问题,希望本文对大家有所帮助。

05-24 11:07