并发程序设计之Future模式

一)、使用Future模式的原因

当某一段程序提交了一个请求,期待得到一个答复,但服务程序对这个请求的处理可能很慢,在单线程的环境中,调用函数是同步的,必须等到服务程序返回结果后才能进行其他处理,在这段时间里,客户端一直处于等待状态。

二)、Future模式

使用并发的设计思想,解决客户端发送请求到应用程序,等待响应数据时间过长的问题.

三)、Future模式的核心结构

1.main: 系统启动类

作用:调用Client,发送请求。

2.client: 发送请求类

 作用:返回Data对象,即FutureData对象,并开启一个线程给返回的FutureData对象装配RealData。

3.Data:数据接口

 作用:FutureData和RealData共同实现了这个接口。

4.FutureData: 虚假数据类

 作用:Future数据,构造快,虚假数据,装配RealData对象,相当于RealData的一个代理。

5.RealData: 真实数据类

   作用:返回服务程序处理的真实数据。

四)、Future模式的实现流程

1).用户发送client请求服务到应用程序,先返回一个虚假的数据FutureData类,并启一个新的线程,用于装配真正的数据返回对象。

2).提供一个接口类Data,类中提供一个接口方法,虚假类FutureData和真实类RealData分别实现对应的接口方法。

3) .将新的线程装配成功的RealData对象加入FutureData中,FutureData相当于RealData的代理对象,在接口方法中调用RealData,的到应用程序真实的响应数据。

五)、Future模式的简单复现

模拟用户在淘宝购买商品的场景:

购买香水,用户购买商品后返回一个商品订单(相当于FutureData),并没有马上返回商品,用户需等待物流才能获得商品在等待物流派送的这段时间用户可以做其他事情(用户执行其他的业务逻辑),当货物到达再取货。当用户想要取货时,可以通过物流信息来监控货物是否到达(状态值isReady),若物流状态为未到达,用户不可取货,只能等待,当物流到达,用户取货(result())。

系统启动类:ClientService

/**
 * 系统启动类
 */
public class ClientService {
    public static void main(String[] args) {
        //调用Client,发送请求
        Client client = new Client();
        //得到一个FutureData对象
        Data data = client.request("channel香水 x 1:购买");
        System.out.println("我在执行其他的业务");
        //模拟客户端发送请请求后,继续执行其他的业务逻辑
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //用户的到真实的数据对象
        System.out.println(data.result());
    }
}

客户端请求类:Client

public class Client {
    /**
     * 客户端发送请求后,先返回一个FutureData,并开启一个线程,装配一个RealData到返回的FutureData中
     *
     */
    public Data request(String requestStr){
        //创建一个Future对象
        FutureData future = new FutureData();
        //开启一个线程,创建RealData对象,并将该对象装配到future中
        new Thread(){
            @Override
            public void run(){
                //创建RealData对象
                RealData realData = new RealData(requestStr);
                //装配realData对象到future中
                future.setRealData(realData);
            }
        }.start();
        //用户请求后立即返回一个FutureData对象,执行其它的业务逻辑
        return future;
    }
}

接口类:Data

/**
 * FutureData和RealData的共同接口
 */
public interface Data {
    String result();
}

虚假数据类:FutureData

该类需要使用到线程的唤醒机制

1).使用isReady来监控对象是否被注入,isReady = false,对象未注入,线程请求

    request(),使用wait()让线程进入等待状态。

2) .isReady = true,注入对象,使用notifyAll()唤醒等待线程。

原因: RealData的方法构造很慢,当调用FutureData的result()来获取真实数据

          时,RealData对象未被注入,因为FutureData的result()是调用了RealData

          的的result()方法,此时,对象未注入,抛出NullPointException。

注:wait()和notify()/notifyAll要配合synchonized使用。

/**
 * 该类需要使用到线程的唤醒机制:
 *       使用isReady来监控对象是否被注入,isReady = false,对象未注  入,线程请求request(),使用wait()让线程进入等待状态
 *       isReady = true,注入对象,使用notifyAll()唤醒等待线程。
 *
 *       原因: RealData的方法构造很慢,当调用FutureData的result()来获取真实数据时,RealData对象未被注入,因为FutureData的result()
 *             是调用了RealData的result()方法,此时,对象未注入,抛出NullPointException。
 */
public class FutureData implements Data {
    /**
     * 装配对象
     */
    RealData realData = null;

    /**
     * 装配标识符,监控装配对象是否成功注入
     */
    Boolean isReady = false;

    /**
     * 注入RealData
     * @return
     */
    public synchronized void setRealData(RealData realData){
        if(isReady){
            return;
        }
        this.realData = realData;
        //注入对象后,将isReady设置为true
        isReady = true;
        //唤醒等待的线程
        notifyAll();
    }

    /**
     * 实际上使用RealData获取真实的数据并返回结果
     * @return
     */
    @Override
    public synchronized String result() {
        //当对象未被注入时,等待对象的注入
        if(!isReady){
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        return realData.result();
    }
}

真实数据类:RealData

/**
 * 真正的数据返回类
 */
public class RealData implements Data{
    private String requestStr;

    /**
     * 模拟在构造RealData对象时要花费很多时间
     */
    public RealData(String requestStr) {
        StringBuffer sb = new StringBuffer();
        for(int i = 0; i < 100; i++){
            sb.append(requestStr);
        }
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    public String result() {
        return "亲~ ,您的货物已经到件了哦!";
    }
}

结果:

我在执行其他的业务
亲~ ,您的货物已经到件了哦!

六)、JDK内置实现

JDK提供了一个FutureTask和Callable接口来实现程序的异步操作。

1)、FutureTask类:

定义: 是一个线程类,能做为一个单独的线程运行。

1.实现RunableFuture接口

2.RunableFuture 继承了Runable、Future两个接口

2)、主要属性:

private Callable<V> callable;
//call()方法的返回值
private Object outcome;
//call()方法的执行状态
private volatile int state;
//新创建状态
private static final int NEW          = 0;
//完成状态
private static final int COMPLETING   = 1;
//标准状态
private static final int NORMAL       = 2;
//异常状态
private static final int EXCEPTIONAL  = 3;
//取消状态
private static final int CANCELLED    = 4;
//正在中断状态
private static final int INTERRUPTING = 5;
//中断状态
private static final int INTERRUPTED  = 6;

3)、构造方法new FutureTask(Callable callable) :

接收一个Callable类型的对象

public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }

4)、Callable接口:接口提供了一个call()方法。

在FutureTask的run()方法中会调用这个方法。call()方法中实现并发线程的主要逻辑。

public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

5)、FutureTask的run()方法

实现逻辑:

1.调用Callable的call()方法,并获取其返回值。

2.将返回值赋值给outcome对象。

3.设置state状态值 state = NORMAL = 2。

public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    //调用call()方法,并获取返回值
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

//将返回值赋值给outcome变量
    protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }

6)、FutureTask的get()方法

1).判断call()方法的状态。

2).若sate <=COMPLETING = 2,线程会进入等待状态。

3).相反,则返回outcome值,即call()方法的返回值。

public V get() throws InterruptedException, ExecutionException {
        int s = state;
        //判断call()方法的执行状态,判断是否进入等待状态
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }

//返回outcome的值
private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }

7)、使用JDK自带的Future模式复现商品购买场景:

系统启动类:

/**
*  模拟淘宝下单:
*   购买香水,用户购买商品后返回一个商品订单,并没有马上返回商品,用户需  *   等待物流才能获得商品在等待物流派送的这段时间用户可以做其他事情,当货  *   物到达再取货。当用户想要取货时,可以通过物流信息来监控货物是否到达,  *   若物流状态为未到达,用户不可取货,只能等待。
*/
public class JDK_ClientService {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //创建一个FutureTask对象,相当于上文FutureData,该对象是一个线程类
        FutureTask future = new FutureTask(new RealData("chanta(nel香水 x 1: 购买!"));
        //创将线程池执行器,返回一个ThreadPool对象
        ExecutorService executor = Executors.newFixedThreadPool(1);
        //开启线程,发送请求,执行future线程的run()方法,执行请求的具体逻辑
        executor.submit(future);
        System.out.println("我要去执行其他业务了");
        //模拟程序执行其它逻辑
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //得到请求的真实返回数据,get方法会判断监控call()的状态,当call()的状态未执行或未执行完毕,线程进入等待状态。
        System.out.println(future.get());
    }

}

真实类:实现Callabel接口,复写call()方法,定义Run方法的具体逻辑

public class RealData implements Callable {
    private String requestStr;
    /**
     *  轻量级的构造方法
     */
    public RealData(String requestStr) {
        this.requestStr = requestStr;
    }

    /**
     * 将复杂的业务逻辑都放在call方法中
     * @return
     * @throws Exception
     */
    @Override
    public Object call(){
        StringBuffer sb = new StringBuffer();
        for(int i = 0; i < 10; i++){
            sb.append(requestStr);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        //指定FutureTask的返回对象
        return "亲~, 您的香水已到货!";
    }
}

结果:

我要去执行其他业务了
亲~, 您的香水已到货!

结论:

Jdk中Future模式的实现主要是依赖Callable接口,FutureTask类。

首先,在创建FutureTask对象的时候传入一个Callable的子类对象,获取线程池执行器,通过线程池执行器来开启FutureTask的线程,执行run()方法,在run()方法中调用Callable的子类对象的call()方法,并使用一个outcome变量来存储call()方法的返回值,使用一个int类型的变量state来监控call()方法的执行情况,用户可以通过get()方法来获取call()方法的返回值,get()方法根据state值来决定程序是否进入等待状态,若state = normal = 2 = 正常值,返回call()的返回结果。

02-01 01:56
查看更多