最新要闻

广告

手机

警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案

警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案

票房这么火爆,如何请视障人士“看”一场电影?

票房这么火爆,如何请视障人士“看”一场电影?

家电

CompletableFuture源码解析

来源:博客园

前言

JDK8 为我们带来了 CompletableFuture这个有意思的新类,它提供比 Future更灵活更强大的回调功能,借助 CompletableFuture我们可以更方便的编排异步任务。


(相关资料图)

本着知其然也要知其所以然的想法,笔者结合源码深入了解了一下 CompletableFuture的部分实现,然后写了这边文章作为总结。

一、数据结构

1、CompletableFuture

CompletableFuture实现了 Future接口和 CompletionStageFuture不必多说,而 CompletionStage定义了各种任务编排的 API:

CompletableFuture implements Future, CompletionStage {    volatile Object result;       // Either the result or boxed AltResult    volatile Completion stack;    // Top of Treiber stack of dependent actions}

CompletableFuture的数据结构包括用于记录结果的 result,以及用于持有任务以及任务间依赖关系的 Completion类型的成员变量 stack

如果阅读过 spring 注解相关功能的源码的同学,对于 CompletableFutureCompletion应该会有一种 TypeMappedAnnotationAnnotationTypeMapping的既视感,实际上他们两者之间的关系确实也非常相似。

2、Completion

数据结构

CompletionCompletableFuture中的一个内部类,我们可以简单的认为它就是我们一般所说的“操作”。

abstract static class Completion extends ForkJoinTask    implements Runnable, AsynchronousCompletionTask {    volatile Completion next;}

它通过 next指针在 CompletableFuture中形成一个链表结构。

依赖关系

它还有两个抽象的实现类 UniCompletionBiCompletion

abstract static class UniCompletion extends Completion {        Executor executor;                 // executor to use (null if none)        CompletableFuture dep;          // the dependent to complete        CompletableFuture src;          // source for action}abstract static class BiCompletion extends UniCompletion {        CompletableFuture snd; // second source for action}

其中 executor表示该操作的执行者,而 srcsnd两个指针表示要执行的操作对应的 CompletableFuture实例,而 dep则表示要执行的操作依赖的前置操作对应的 CompletableFuture实例。多个 Completion彼此之间通过这些指针维护彼此的依赖关系。

实现类

CompletableFuture,我们会看到很多格式为 UniXXX或者 BiXXX的内部类,它们大多数都基于上述两抽象类实现,分别对应不同的操作。我们以 UniApply为例:

static final class UniApply extends UniCompletion {    Function fn;}

其本质上就是一个额外挂载了 Function接口的 UniCompletion,同理,XXXAccept就是挂载了 ConsumerCompletion,而 XXXRun就是挂载的 Runnable接口的 Completion

二、构建流程

CompletableFutureCompletion的数据结构有了基本的概念以后,我们一个简单任务的构建-执行过程来分析以下源码。

假设现在有两个异步任务 task1 与 task2,task2 需要在 task1 执行完毕后再执行:

CompletableFuture task1 = new CompletableFuture<>();CompletableFuture task2 = task1.thenApplyAsync(s -> s + " 2");

thenApplyAsync本身提供两个方法,唯一的区别在于后者需要指定线程池,而前者使用默认的线程池:

public  CompletableFuture thenApplyAsync(    Function fn) {    return uniApplyStage(asyncPool, fn);}public  CompletableFuture thenApplyAsync(    Function fn, Executor executor) {    return uniApplyStage(screenExecutor(executor), fn);}

它们都需要通过 uniApplyStage方法完成新任务的构建:

private  CompletableFuture uniApplyStage(    Executor e, Function f) {    if (f == null) throw new NullPointerException();    // 1、构建一个 CompletableFuture,对应下一个任务 Task2    CompletableFuture d =  new CompletableFuture();    if (e != null || !d.uniApply(this, f, null)) {        // 2、构建一个 Completion,dep 指向 Task2,src 指向 Task1        UniApply c = new UniApply(e, d, this, f);        // 3、将该 Completion 压入当前 CompletableFuture 栈顶        push(c);        // 4、尝试以异步模式执行该Completion        c.tryFire(SYNC);    }    return d;}final void push(UniCompletion c) {    if (c != null) {        while (result == null && !tryPushStack(c))            lazySetNext(c, null); // clear on failure    }}final boolean tryPushStack(Completion c) {    Completion h = stack;    // 将 c.next 设置为当前 Task1 持有的 Completion    lazySetNext(c, h);    // CAS,Task1 持有的 Completion 替换为 c    return UNSAFE.compareAndSwapObject(this, STACK, h, c);}static void lazySetNext(Completion c, Completion next) {    // 通过 CAS 把 c.next 指向 next    UNSAFE.putOrderedObject(c, NEXT, next);}

uniApplyStage方法做了四件事:

  1. 若将当前任务作为 Task1,则会为下一个任务构建一个新的 CompletableFuture,姑且称为 Task2
  2. 若将 Task1持有的 Completion称为 C1,则创建一个 UniApply类型的 CompletionC2,其中 C2dep指向 Task2src指向 Task1
  3. C2next指向 C1,然后通过 CAS 将 Task1持有的 C1替换为 C2
  4. 尝试执行 C2

1、多级任务的构建流程

步骤 2 和 3 算是一个组合操作,它完成了创建 Completion- 压入当前 CompletableFuture栈的操作。

我们在原本示例中的 Task1Task2的基础上再最加一个 Task3,这样或许会更有助于了解这一过程。

CompletableFuture task1 = new CompletableFuture<>();CompletableFuture task2 = task1.thenApplyAsync(s -> s + " 2");CompletableFuture task2 = task2.thenApplyAsync(s -> s + " 3");

首先,在不考虑 Task2Task3在构建过程中就完成的情况下,会有如下过程:

第一个任务

最开始,Task1被创建,此时 Task1是个空任务,它的 stackresult都为 null,为了便于理解,我们姑且认为它在 stack指向一个虚拟的空 Completion,称其为 c1

第二个任务

接着,task2通过 task1.thenApplyAsync方法被创建,此时:

  1. 一个新的 Completion被创建,我们称其为 c2c2dep指向 task2src指向 task1
  2. c2next指向 c1
  3. task1stack从指向 c1变成指向 c2

第三个任务

然后,task3通过task2.thenApplyAsync方法被创建,此时:

  1. 一个新的 Completion被创建,我们称其为 c3c3dep指向 task3src指向 task2
  2. c3next指向 c2
  3. task2stack从指向 c2变成指向 c3

2、平级任务的构建流程

如果这个时候我们再回头往 Task1上追加一个与 Task2平级的任务 Task4呢?

CompletableFuture task1 = new CompletableFuture<>();CompletableFuture task2 = task1.thenApplyAsync(s -> s + " 2");CompletableFuture task2 = task2.thenApplyAsync(s -> s + " 3");    CompletableFuture task4 = task1.thenApplyAsync(s -> s + " 4");
  1. 一个新的 Completion被创建,我们称其为 c4c4dep指向 task4src指向 task1
  2. c4next指向 c2
  3. task2stack从指向 c2变成指向 c4

3、整体结构

至此,我们可以总结出一些信息:

CompletableFuture所谓的栈,其实就是 Completion的先进后出队列,假设现在有一个头结点 a,调用 thenApply方法将会向 a之前追加一个新的头结点 b,然后持有 aCompletableFuture转而去持有 b,这样就永远可以通过持有的头结点遍历获取队列中的所有节点;

而当我们调用 thenApply时,都会创建一个 CompletionCompletionsrc总是指向被调用 thenApply方法的 CompletableFuture,换而言之,Completion被压入谁的栈,则 Completion.src就指向谁。

基于上述逻辑,我们再对上面这张图进行简化,忽略栈中 Completion间的 next指针与指向栈所有者的 src指针,则有:

这样看结构就非常清晰了,同理,如果 Task2或者 Task4也有多个后续任务,则这里就会变成一个多叉树结构,反之,若 Task2或者 Task4有多个 src(比如调用了 thenCombine方法)则就可能会变成一张图。

三、执行流程

我们依然回顾 uniApplyStage这个方法:

private  CompletableFuture uniApplyStage(    Executor e, Function f) {    if (f == null) throw new NullPointerException();    // 构建新的 CompletableFuture    CompletableFuture d =  new CompletableFuture();    // 如果是个异步任务,或者是个同步任务但是还没完成才进入判断    if (e != null || !d.uniApply(this, f, null)) {        // 构建 Completion,dep 指向新 CompletableFuture,src 指向 this        UniApply c = new UniApply(e, d, this, f);        // 将新的 Complection 压入 this 的栈中        push(c);        // 尝试执行新的 Complection        c.tryFire(SYNC);    }    return d;}

实际上当我们调用 thenXXX的时候,新的任务就已经在尝试执行了,接下来我们继续以 UniApply为例,分析 Completion的执行流程。

1、执行CompletableFuture

uniApplyStage中,可以看到当新的 CompletableFuture创建后,若该任务未指定 executor,即这是一个同步的任务,则在 !d.uniApply(this, f, null)这段代码先执行一次 uniApply方法,也就是直接尝试执行用户指定的逻辑:

final  boolean uniApply(CompletableFuture a, // 源任务,即若 this 为 Task2,则 a 为 Task1                           Function f,                           UniApply c) {    Object r;Throwable x;    // 1、Task1没完成,就直接返回false    if (a == null || (r = a.result) == null || f == null)        return false;    // 2、如果 Task1 已经完成,并且 Task1 抛出异常了,那么 this 就没必要执行了,也直接抛异常结束    tryComplete: if (result == null) {        if (r instanceof AltResult) {            if ((x = ((AltResult)r).ex) != null) {                completeThrowable(x, r);                break tryComplete;            }            r = null;        }        // 3、如果 Task1 已经完成了,并且没抛出异常,那么直接执行 this        try {            if (c != null && !c.claim())                return false;            @SuppressWarnings("unchecked") S s = (S) r;            // 3.1 执行成功,将结果记录到 this.result            completeValue(f.apply(s));        } catch (Throwable ex) {            // 3.2 执行失败,将异常封装一下也作为一个结果记录到 this.result            completeThrowable(ex);        }    }    return true;}

uniApply主要逻辑如下:

  • 如果源任务未完成,则什么都不做,直接返回 false
  • 如果源任务发生了异常,那么让当前任务也变为完成,并把源任务的结果(异常)作为当前任务的结果;
  • 如果源任务已经正常完成,则获取源任务的结果,然后再将其作为输入参数执行当前任务,并且记录任务的执行结果;

这个方法实际上就是执行 Completion挂载的用户业务逻辑的代码,由于考虑到源任务有可能是个异步任务,当尝试执行子任务的时候源任务还没完成,因此这个方法在后续实际上会被调用多次。

uniApplyStage在没有指定 executor是默认它就是一个同步任务,因此会直接在创建新的 CompletableFuture的时候就执行一次,如果直接完成那后续也不需要再创建 Completion了。

此外,在这里,我们可以很清楚的看到,发生异常的任务也被视为已完成,异常本身也被看成一个任务的执行结果。

2、执行Completion

tryFire方法是 Completion的执行触发点,他会尝试执行当前的 Completion,并在完成后触发 dep指向的 CompletableFuture中,栈里面的 Completion的执行。

执行模式

在看 tryFire方法前,我们需要先简单了解一下 mode参数,它表示 tryFire时的执行模式,默认提供三个选项值:

static final int SYNC   =  0; // 同步执行static final int ASYNC  =  1; // 异步执行static final int NESTED = -1; // 嵌套执行,即 CompletableFuture 在递归中执行栈内的 Completion

SYNCASYNC没什么可介绍的,CompletableFuture中大部分的 Completion都是以 SYNC模式执行的。

不过在接下来的代码中,我们需要重点关注 mode < 0这类判断,它涉及到栈中 Completion的递归执行。

tryFire

tryFire用于主动触发一个 Completion的执行,但与 CompletableFutrueuniApplyStage方法不同的是,它还会处理源任务和子任务栈中的其他任务:

final CompletableFuture tryFire(int mode) {    CompletableFuture d; // 子任务 (this.dep)    CompletableFuture a; // 源任务 (this.src)        // 没有后续的子任务, 或者有子任务但是当前任务执行失败了    if ((d = dep) == null ||        !d.uniApply(a = src, fn, mode > 0 ? null : this))        return null;        // 2、置空相关属性,表示当前 Completion 已完成    dep = null; src = null; fn = null;        // 3、则尝试把 dep 中栈里的 Completion 出栈,压入 src 的栈并执行    return d.postFire(a, mode);}

3、执行关联Completion

tryFire后,当前 Completion就实际完成了,接着就需要处理 Completiondep指向的 CompletableFuture的栈内的哪些子任务,对应到代码就是调用 deppostFire方法,然后再在 dep中调用 postComplete方法:

final CompletableFuture postFire(    CompletableFuture a, // 当前任务的源任务,即 src 指向大 CompletableFuture    int mode) {    // 1、源任务的栈不为空    if (a != null && a.stack != null) {        // 1.1 处于递归执行过程,或者源任务未完成,先清除栈中已经完成的任务        if (mode < 0 || a.result == null)            a.cleanStack();        else        // 1.2 不处于递归过程,且源任务已完成,将栈中的任务出栈并完成            a.postComplete();    }    // 2、当前子任务已完成,且当前子任务的栈不为空    if (result != null && stack != null) {        // 2.1 如果处于递归过程,就直接返回子任务本身        if (mode < 0)            return this;        // 2.2 如果不处于递归过程,则将子任务栈中的任务出栈并完成        else            postComplete();    }    return null;}

而在 postComplete中,当发现源任务或者子任务完成时,会将当前源任务或者子任务的栈中全部任务都出栈,并尝试执行:

final void postComplete() {    /*     * On each step, variable f holds current dependents to pop     * and run.  It is extended along only one path at a time,     * pushing others to avoid unbounded recursion.     */    CompletableFuture f = this;     Completion h;        // 递归直到当前任务以及dep的栈都为空为止    while ((h = f.stack) != null ||           (f != this && (h = (f = this).stack) != null)) {        CompletableFuture d; Completion t;        if (f.casStack(h, t = h.next)) { // 将 f 的栈顶任务 h 出栈                        // 1、h 还不是 f 栈中的最后一个任务            if (t != null) {                if (f != this) {                    pushStack(h); // 将 f 的栈顶任务 h 压入 this 的栈中                    continue;                }                h.next = null;    // detach            }            // 2、h 已经是 f 栈中的最后一个任务了,            // 直接执行任务 h,并让 f 指向该任务的 dep,然后再次循环            f = (d = h.tryFire(NESTED)) == null ? this : d;        }    }}

tryFire -> postFire -> postComplete -> tryFire......构成了一个递归的过程,光看代码可能不是很直观,我们举个例子:

CompletableFuture task2 = task1.thenApply(t -> { System.out.println("2"); return "2"; });task2.thenAccept(t -> System.out.println("2.1"));task2.thenAccept(t -> System.out.println("2.2"))    .thenAccept(t -> System.out.println("2.2.1"))    .thenAccept(t -> System.out.println("2.2.1.1"));CompletableFuture task3 = task1.thenApply(t -> { System.out.println("3"); return "3"; });task3.thenAccept(t -> System.out.println("3.1"));task3.thenAccept(t -> System.out.println("3.2"))    .thenAccept(t -> System.out.println("3.2.1"))    .thenAccept(t -> System.out.println("3.2.1.1"));CompletableFuture task4 = task1.thenApply(t -> { System.out.println("4"); return "4"; });task1.complete("1");// 控制台输出// 4// 3// 3.1// 3.2// 3.2.1// 3.2.1.1// 2// 2.1// 2.2// 2.2.1// 2.2.1.1

根据上述代码的输出,我们可以很直观的意识到,在以 task1为根节点的树结构中,各个任务的调用过程实际上就是深度优先遍历的过程,以被调用 postComplete方法的 Completion为根节点,会将其 dep对应的 CompletableFuture栈中的 Completion弹出并压入根 Completion的栈,然后执行并从夫上述过程。、

接下来我们结合上述代码,简单验证一下这个思路:

**task4分支的执行 **

在最开始,task1的栈中从上到下存放有 4、3、2 三个 Completion

首先,将 task1的栈顶元素 4 出栈并执行,由于 4 的 dep没有指向任何 CompletableFuture,因此 4 这个分支全部的 Completion都执行完毕。

现在,task1的栈目前还有 3 和 2 两个 Completion

task3分支的执行

task1的栈顶元素 3 出栈并执行,由于 3 的 dep指向了另一个 CompletableFuture,该 CompletableFuture的栈中存放有 3.2、3.1 两个 Completion,所以:

  • 先将栈顶元素 3.2 出栈,由于 3.2 不为栈中的最后一个元素,因此将 3.2 压入 task1的栈顶;
  • 再将栈顶元素 3.1 出栈,由于 3.1 已经是栈中最后一个元素,因此直接执行 3.1;

此时 task1中的栈情况如下:

然后将 task1的栈顶元素 3.2 出栈并执行,由于 3.2 的 dep指向的另一个 CompletableFuture,该 CompletableFuture的栈中存放有 3.2.1 ,因此将 3.2.1 出栈,又由于 3.2.1 已经是栈中最后一个元素,因此直接将其执行并返回 dep

由于 3.2.1 的 dep指向了另一个 CompletableFuture,该 CompletableFuture的栈中存放有 3.2.1.1 ,因此将 3.2.1.1 出栈,又由于 3.2.1.1 已经是栈中最后一个元素,因此直接将其执行并返回 dep

3.2.1.1 的 dep没有指向任何 CompletableFuture,说明此时 3.2这条分支上的所有栈都已经清空,此轮执行结束。

现在,task1的栈目前只剩 2 一个 Completion

task2分支的执行

task2分支的执行与 task3完全一致,因此这里只简单的说明:

  1. 弹出 task1栈顶元素 2 并执行,返回 dep指向的 CompletableFuture
  2. CompletableFuture栈存在 2.2 与 2.1 两个 Completion
    • 先弹出 2.2,由于 2.2 不是栈中最后一个元素,因此将其压入 task1的栈;
    • 再弹出 2.1,由于 2.1 已经是栈中最后一个元素,因此将其直接执行;
  3. 弹出 task1栈顶元素 2.1 并执行,返回 dep指向的 CompletableFuture
  4. CompletableFuture栈仅存在 2.2.1 一个 Completion,因此直接执行并返回 dep指向的 CompletableFuture
  5. CompletableFuture栈仅存在 2.2.1.1 一个 Completion,因此直接执行并返回 dep指向的 CompletableFuture
  6. 由于 2.2.1.1 的 dep没有指向任何 CompletableFuture,因此递归到这里就结束了。

总结

到这里,CompletableFuture的构建-执行过程也基本讲完了。回顾整篇文章,不难发现其实大部分内容其实还是在说明以 CompletableFutureCompletion为基础构建出来的数据结构。

每个 CompletableFuture都会持有一个 Completion栈,当我们向一个 CompletableFuture追加任务时,本质上就是生成一个 Completion并压入到栈中。而每个 Completion则关联到另一个 CompletableFuture,该 CompletableFuture对应此 Completion的完成状态。

明白这一点后,我们对 CompletableFutureCompletion的定位就会有更加清晰的了解,如果我们将整个复杂异步流程视为树或者图,那么 CompletableFutureCompletion实际上就是对应着点和边。当我们执行一个 CompletableFuture,实际上就是基于关联的 Completion路径遍历所有的 CompletableFuture

当然,实际上由于 CompletableFuture执行 eitheranybothall等模式,因此实际在执行的时候还会有更多的判断逻辑,不过数据结构是不会变的。

关键词: 当前任务 数据结构 直接执行