Java中的Future

Future相关概念

定义

官放时这样描述Future接口的:

Future represents the result of an asynchronous computation. Methods are provided to check if the computation is complete, to wait for its completion, and to retrieve the result of the computation. The result can only be retrieved using method get when the computation has completed, blocking if necessary until it is ready. Cancellation is performed by the cancel method. Additional methods are provided to determine if the task completed normally or was cancelled. Once a computation has completed, the computation cannot be cancelled. If you would like to use a Future for the sake of cancellability but not provide a usable result, you can declare types of the form Future<?> and return null as a result of the underlying task.

简单来说就是一个用于异步计算的任务接口,可以返回结果或者抛出异常。

我们写多线程程序的时候,任务进程是实现了Runnable接口,或者直接写个类继承Thread,但是如果我们需要任务执行完成后,给出个结果,以上两种方式只能通过写共享对象或文件来实现目的,无法直接返回

还有个缺点,就是Runnable接口中的run方法无法抛出异常,我们可能想任务在处理过程中抛出异常,但是Runnable接口无法抛出异常,就算抛出异常了,我们在哪里接收那run接口不是通过主动调用的。

为解决以上两个问题,java中定义了Callable接口,接口的具体定义如下:

1
2
3
public interface Callable<V> {
V call() throws Exception;
}

直接了返回值和可以抛出异常,但是这里面就有问题了:

  1. 抛出的异常我们如何接收?
  2. 返回值我们如何接收,如果每个都有返回值,都需要等待子线程执行完毕,那这个多线程变成同步的了,也没啥意义了。

这就是Future需要存在的意义了。Future是Java的接口,类似于容器保存了Callable的返回结果。我们把子任务放入线程池之后,直接返回,进行其他处理,然后再调用Future的get方法来获取结果,Future还可以控制子任务的执行。

Future模式的核心思想是能够让主线程将原来需要同步等待的这段时间用来做其他的事情。(因为可以异步获得执行结果,所以不用一直同步等待去获得执行结果)

上图简单描述了不使用Future和使用Future的区别,不使用Future模式,主线程在invoke完一些耗时逻辑之后需要等待,这个耗时逻辑在实际应用中可能是一次RPC调用,可能是一个本地IO操作等。B图表达的是使用Future模式之后,我们主线程在invoke之后可以立即返回,去做其他的事情,回头再来看看刚才提交的invoke有没有结果。

使用

Future接口包含以下方法:

1
2
3
4
5
6
7
8
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutExceptio
}
  1. get 方法说明:

    • 如果任务执行完毕了,直接返回执行结果。

    • 如果没有执行完成, 则进行阻塞。

    • 如果抛出了异常,则get也抛出ExecutionException异常。

    • 如果任务被取消了,则抛出CancellationException异常。

    • 如果使用get带了时间,在规定时间返回结果则正常,如果没有在规定的时间返回结果,则抛出TimeoutException异常。

  2. isDone方法说明:

    • 任务执行完毕返回true,程序执行失败仍然会返回true。

    • 未执行完毕返回false。

  3. cancel 方法说明:

    • 任务未被执行情况下,取消返回true,被正常取消。

    • 任务被执行,且执行结束情况下,取消失败,返回false。

    • 如果任务在执行中,如果cancel传入true,则标识中断正在执行的任务 ,传入false标识不中断正在执行的任务,等正在执行的任务执行结束,返回false。

  4. isCancelled方法说明:

    • 判断任务是否被取消。

FutureTask

FutureTask实现了RunnableFuture接口:

1
public class FutureTask<V> implements RunnableFuture<V>

而RunnableFuture接口则继承了Runnable接口和Future接口。

1
2
3
4
5
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}

因此FutureTask既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值。

简单使用

举个例子,假设我们要执行一个算法,算法需要两个输入 input1input2, 但是input1input2需要经过一个非常耗时的运算才能得出。由于算法必须要两个输入都存在,才能给出输出,所以我们必须等待两个输入的产生。接下来就模仿一下这个过程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

public class FutureTaskTest {

public static void main(String[] args) throws InterruptedException, ExecutionException {

long starttime = System.currentTimeMillis();

//input2生成, 需要耗费3秒
FutureTask<Integer> input2_futuretask = new FutureTask<>(new Callable<Integer>() {

@Override
public Integer call() throws Exception {
Thread.sleep(3000);
return 5;
}
});

new Thread(input2_futuretask).start();

//input1生成,需要耗费2秒
FutureTask<Integer> input1_futuretask = new FutureTask<>(new Callable<Integer>() {

@Override
public Integer call() throws Exception {
Thread.sleep(2000);
return 3;
}
});
new Thread(input1_futuretask).start();

Integer integer2 = input2_futuretask.get();
Integer integer1 = input1_futuretask.get();
System.out.println(algorithm(integer1, integer2));
long endtime = System.currentTimeMillis();
System.out.println("用时:" + String.valueOf(endtime - starttime));
}

//这是我们要执行的算法
public static int algorithm(int input, int input2) {
return input + input2;
}
}

底层源码

核心属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

//内部持有的callable任务,运行完毕后置空
private Callable<V> callable;

//从get()中返回的结果或抛出的异常
private Object outcome; // non-volatile, protected by state reads/writes

//运行callable的线程
private volatile Thread runner;

//使用Treiber栈保存等待线程
private volatile WaitNode waiters;

//任务状态
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;

需要注意的是,state属性被volatile修饰,即任意一个线程修改该值,其他线程都能感知到。7中状态含义分别为:

  • NEW:新的还没被执行的任务,为初始状态
  • COMPLETING:任务已完成或者执行过程中发生异常,但是执行结果或者异常原因还没有保存到outcome字段。时间通常较短,属于中间字段。
  • NORMAL:任务执行完成并且执行结果已经保存到outcome字段。
  • EXCEPTIONAL:任务执行过程中发生异常并且异常原因已经保存到outcome字段。
  • CANCELLED:任务还没开始执行或者已经开始执行,用户调用了cancel(false)方法取消任务并且不中断任务执行线程,这时候状态从NEW转化为CANCELLED状态。
  • INTERRUPTING:任务还没开始执行或者已经执行,用户调用了cancel(true)方法取消任务并且要中断任务执行线程但是还没有中断任务执行线程之前,状态会从NEW转化为INTERRUPTING。
  • INTERRUPTED:调用interrupt()中断任务执行线程后状态从INTERRUPTING转换到INTERRUPTED。

需要注意的是,所有值大于COMPLETIING的状态都表示任务已经执行完成(正常执行完成或者执行异常或者被中断)。

创建

FutureTask有两个构造函数:

  • FutureTask(Callable callable)

    1
    2
    3
    4
    5
    6
    public FutureTask(Callable<V> callable) {
    if (callable == null)
    throw new NullPointerException();
    this.callable = callable;
    this.state = NEW; // ensure visibility of callable
    }

    这个构造函数会把传入的Callable变量保存在this.callable字段中,该字段定义为private Callable callable;用来保存底层的调用,在被执行完成以后会指向null,接着会初始化state字段为NEW。

  • FutureTask(Runnable runnable, V result)

    1
    2
    3
    4
    5
    public FutureTask(Runnable runnable, V result) {
    //这一句将rnnable转换成callable,采用适配器模式RunnableAdapter
    this.callable = Executors.callable(runnable, result);
    this.state = NEW; // ensure visibility of callable
    }

    这个构造函数会把传入的Runnable封装成一个Callable对象保存在callable字段中,同时如果任务执行成功的话就会返回传入的result。这种情况下如果不需要返回值的话可以传入一个null。

运行

在创建一个FutureTask对象后,接下来就是在另一个线程中执行这个Task。

无论是通过直接new一个还是通过线程池,执行的都是run()方法,其源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public void run() {
//如果状态不是new,或者runner旧值不为null(已经启动过了),就结束
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();//执行任务并将结果保存在result中
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);//发生异常,保存异常
}
if (ran)
set(result);//设置执行结果
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);//处理中断逻辑
}
}

任务执行完成后,会调用setException或者set方法。在这两个方法中,调用finishcompletion方法去唤醒get方法阻塞的线程,并且调用done(该方法为空方法,可以被子类覆盖,实现回调功能),清空callable。

获取结果

通过get()方法获取任务运行结果,get()方法有两个,一个是有超时时间设置,另一个没有超时时间设置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}

public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
// get(timeout, unit) 也很简单, 主要还是在 awaitDone里面
if(unit == null){
throw new NullPointerException();
}
int s = state;
// 判断state状态是否 <= Completing, 调用awaitDone进行自旋等待
if(s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING){
throw new TimeoutException();
}
// 根据state的值进行返回结果或抛出异常
return report(s);
}

两个方法都调用了awaitDone()方法,在该方法中等待任务执行完成、被中断或者超时。

参考资料

  1. https://www.jianshu.com/p/84f58aad8849
  2. https://zhuanlan.zhihu.com/p/54459770
  3. https://pdai.tech/md/java/thread/java-thread-x-juc-executor-FutureTask.html
打赏
  • 版权声明: 本博客所有文章除特别声明外,著作权归作者所有。转载请注明出处!
  • Copyrights © 2021-2022 Yin Peng
  • 引擎: Hexo   |  主题:修改自 Ayer
  • 访问人数: | 浏览次数:

请我喝杯咖啡吧~

支付宝
微信