FutureTask

FutureTask是Future的实现,用来异步任务的获取结果,可以启动和取消异步任务,查询异步任务是否计算结束以及获取最终的异步任务的结果。通过get()方法来获取异步任务的结果,但是会阻塞当前线程直至异步任务执行结束。一旦任务执行结束,任务不能重新启动或取消,除非调用runAndReset()方法。

代码示例:

public class ThreadTest {

  public static void main(String[] args) throws Exception {

    Callable<String> myCallable = new MyCallableThread();
    FutureTask<String> futureTask = new FutureTask<>(myCallable);
    Thread myCallableThread = new Thread(futureTask);
    myCallableThread.setName("MyThread-implements-Callable-test");
    myCallableThread.start();
    System.out.println("Run by Thread:" + futureTask.get());

    //通过线程池执行
    ExecutorService executorService = Executors.newCachedThreadPool();
    executorService.submit(futureTask);
    executorService.shutdown();
    System.out.println("Run by ExecutorService:" + futureTask.get());
  }
}

class MyCallableThread implements Callable<String> {

  @Override
  public String call() throws Exception {
    return Thread.currentThread().getName();
  }
}

实现一个自己的FutureTask

根据FutureTask核心原理,要实现一个FutureTask必须满足以下方面:

  • 需要泛型定义用以返回结果类型
  • 需要一个callable对象,在构造方法中传入
  • 需要实现runnable接口,在run方法中实现具体结果计算
  • 需要一个公开的get方法来获取结果
  • 如果线程没有执行完,则调用get方法的线程需要进入等待队列
  • 需要一个字段记录线程执行的状态
  • 需要一个等待队列存储等待结果的线程

代码示例:

/**
 * 1. 泛型定义
 * 2. 构造方法 callable
 * 3. 实现了runnable
 * 4. get方法返回callable执行结果
 * 5. get方法有阻塞的效果(未执行结束的话)
 */
public class MyFutureTask<T> implements Runnable {

  // 程序执行的结果
  private T result;

  // 要执行的任务
  private Callable<T> callable;

  // 任务运行的状态
  private volatile int state = NEW;

  // 任务运行的状态值
  private static final int NEW = 0;
  private static final int RUNNING = 1;
  private static final int FINISHED = 2;

  // 获取结果的线程等待队列
  LinkedBlockingQueue<Thread> waiters = new LinkedBlockingQueue<>(100);

  // 执行当前FutureTask的线程,用CAS进行争抢
  AtomicReference<Thread> runner = new AtomicReference<>();

  public MyFutureTask(Callable<T> task) {
    this.callable = task;
  }

  @Override
  public void run() {
    // 判断当前对象的状态,如果是New且抢锁成功就执行
    if (state != NEW || !runner.compareAndSet(null, Thread.currentThread())) return;
    state = RUNNING;
    try {
      result = callable.call();
    } catch (Exception e) {
      e.printStackTrace();
    } finally {
      state = FINISHED;
    }

    // 方法执行完,唤醒所有线程
    while (true) {
      Thread waiterThread = waiters.poll();
      if (waiterThread == null) break;
      LockSupport.unpark(waiterThread);
    }
  }

  public T get() {
    // 如果状态不是FINISHED,则进入等待队列
    if (state != FINISHED) {
      waiters.offer(Thread.currentThread());
    }
    while (state != FINISHED) {
      LockSupport.park();
    }
    return result;
  }
}

// MyFutureTask 测试
public class FutureTaskTest {
  public static void main(String[] args) {

    Callable<String> myCallable = new MyCallableThread();
    MyFutureTask<String> futureTask = new MyFutureTask<>(myCallable);
    Thread myCallableThread = new Thread(futureTask);
    myCallableThread.setName("MyFutureTask-test");
    myCallableThread.start();
    System.out.println("Run by Thread:" + futureTask.get());
  }
}

class MyCallableThread implements Callable<String> {

  @Override
  public String call() throws Exception {
    return Thread.currentThread().getName();
  }
}

自定义FutureTask实现-LMLPHP

08-12 06:59