java并发之Future与Callable使用

这篇文章需要大家知道线程、线程池的知识,尤其是线程池。 有的时候我们要获取线程的执行结果,这个时候就需要用到Callable、Future、FutureTask了 先看下Future、Callable接口、RunnableFuture、FutureTask。

Future

Future是一个接口,能够取消任务、获取任务取消状态、获取结果。

package java.util.concurrent;
public interface Future<V> {

    boolean cancel(boolean mayInterruptIfRunning);

    boolean isCancelled();

    /**
     * Returns {@code true} if this task completed.
     *
     * Completion may be due to normal termination, an exception, or
     * cancellation -- in all of these cases, this method will return
     * {@code true}.
     *
     * @return {@code true} if this task completed
     */
    boolean isDone();

    /**
     * Waits if necessary for the computation to complete, and then
     * retrieves its result.
     *
     * @return the computed result
     * @throws CancellationException if the computation was cancelled
     * @throws ExecutionException if the computation threw an
     * exception
     * @throws InterruptedException if the current thread was interrupted
     * while waiting
     */
    V get() throws InterruptedException, ExecutionException;


    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

Callable

Callable只有一个功能就是获取结果的V call()。

@FunctionalInterface
public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

FutureTask

FutureTask是一个实现类,它继承了RunnableFuture,而接口RunnableFuture继承了接口Runable,Future。 因此FutureTask可以通过new Thread(FutureTask task)这样的方式来创建线程。

public class FutureTask<V> implements RunnableFuture<V>

RunnableFuture

public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}

通过Future创建线程

获取线程的执行结果,最重要就是FutureTask调用。下面的例子是通过线程池来执行线程,并获取结果 。 1.创建实现Callable接口的类Task,就是返回数字和 2.创建线程池ThreadPoolExecutor,新建Task实例 3.线程池ThreadPoolExecutor执行pool.submit(task) 4.返回了Future<Integer> result 5.调用Future的get()获取结果 难点是在pool.submit(task)。

package com.java.javabase.thread.future;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @author
 */
@Slf4j
public class FutureTest {
    public static void main(String[] args) {
        //ExecutorService pool = Executors.newCachedThreadPool();
        ThreadPoolExecutor pool =new ThreadPoolExecutor(1,1,0, TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(1));
        Task task =new Task();
        Future<Integer> result=pool.submit(task);
        /*
         public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
         */
        pool.shutdown();
        try {
            log.info("sum {}",result.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

    }

    static class Task implements Callable<Integer> {

        @Override
        public Integer call() throws Exception {
            int sum = 0;
            for (int i = 0; i < 100; i++) {
                sum += i;
            }
            return sum;
        }
    }


}

线程池获取结果流程说明

ThreadPoolExecutor.submit方法中执行void execute(Runnable command); 如果了解线程池就应该知道这个方法是能够让command的run方法执行的。实现接口 RunnableFuture的 FutureTask的run方法最后返回线程结果的关键

    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

ThreadPoolExecutor.execute方法

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();

        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

FutureTask构造器

 public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }

FutureTask的run方法 我们创建FutureTask的时候,传入了实现Callable接口的实现类Task,而run()方法中调用了result = c.call(); 也就是说FutureTask调用了Callable接口的实现类Task的call方法获取结果,并保存,然后通过对外的get()方法返回。

    public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

通过FutureTask创建线程

值得注意的就是获取结果的方式,创建FutureTask,也只能通过FutureTask对象get到结果。 pool.submit(futureTask)返回的Future调用get()返回的是null!

package com.java.javabase.thread.future;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @author
 */
@Slf4j
public class FutureTaskTest {
    public static void main(String[] args) {
        //ExecutorService pool = Executors.newCachedThreadPool();
        ThreadPoolExecutor pool =new ThreadPoolExecutor(1,1,0, TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(1));
        Task task =new Task();
        FutureTask<Integer> futureTask=new FutureTask<Integer>(task);
        //Future<?> result =pool.submit(futureTask);
        pool.submit(futureTask);

        pool.shutdown();
        try {
            log.info("sum {}",futureTask.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }


        Task t2=new Task();
        FutureTask<Integer> futureTask2=new FutureTask<Integer>(t2);
        new Thread(futureTask2).start();
        try {
            log.info("sum 2 {}",futureTask2.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
    static class Task implements Callable<Integer> {

        @Override
        public Integer call() throws Exception {
            int sum = 0;
            for (int i = 0; i < 100; i++) {
                sum += i;
            }
            return sum;
        }
    }
}

只能通过FutureTask对象get到结果的原因

我就简单强调一点下面的:ftask与task的区别就是获取结果不一致的原因 ftask的run方法调用了对象task的run方法,ftask的get()返回的是null,而实际我们看到自己创建的task的run方法才能返回结果

    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }
03-24 07:03