深入学习java源码之CompletableFuture.reportGet()与CompletableFuture.supplyAsync()
异步计算
-
所谓异步调用其实就是实现一个可无需等待被调用函数的返回值而让操作继续运行的方法。在 Java 语言中,简单的讲就是另启一个线程来完成调用中的部分计算,使调用继续运行或返回,而不需要等待计算结果。但调用者仍需要取线程的计算结果。
-
JDK5新增了Future接口,用于描述一个异步计算的结果。虽然 Future 以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的 CPU 资源,而且也不能及时地得到计算结果。
- 以前我们获取一个异步任务的结果可能是这样写的
Future 接口的局限性
Future接口可以构建异步应用,但依然有其局限性。它很难直接表述多个Future 结果之间的依赖性。实际开发中,我们经常需要达成以下目的:
-
将多个异步计算的结果合并成一个
-
等待Future集合中的所有任务都完成
-
Future完成事件(即,任务完成以后触发执行动作)
- 。。。
函数式编程
CompletionStage
-
CompletionStage代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段
-
一个阶段的计算执行可以是一个Function,Consumer或者Runnable。比如:stage.thenApply(x -> square(x)).thenAccept(x -> System.out.print(x)).thenRun(() -> System.out.println())
-
一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发
CompletableFuture
- 在Java8中,CompletableFuture提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合 CompletableFuture 的方法。
- 它可能代表一个明确完成的Future,也有可能代表一个完成阶段( CompletionStage ),它支持在计算完成以后触发一些函数或执行某些动作。
- 它实现了Future和CompletionStage接口
创建CompletableFuture
thenApply
当前阶段正常完成以后执行,而且当前阶段的执行的结果会作为下一阶段的输入参数。thenApplyAsync默认是异步执行的。这里所谓的异步指的是不在当前线程内执行。
thenApply相当于回调函数(callback)
thenAccept与thenRun
- 可以看到,thenAccept和thenRun都是无返回值的。如果说thenApply是不停的输入输出的进行生产,那么thenAccept和thenRun就是在进行消耗。它们是整个计算的最后两个阶段。
-
同样是执行指定的动作,同样是消耗,二者也有区别:
-
thenAccept接收上一阶段的输出作为本阶段的输入
- thenRun根本不关心前一阶段的输出,根本不不关心前一阶段的计算结果,因为它不需要输入参数
-
例如,此阶段与其它阶段一起完成,进而触发下一阶段:
whenComplete
例子
事实上,如果每个操作都很简单的话(比如:上面的例子中按照id去查)没有必要用这种多线程异步的方式,因为创建线程还需要时间,还不如直接同步执行来得快。
事实证明,只有当每个操作很复杂需要花费相对很长的时间(比如,调用多个其它的系统的接口;比如,商品详情页面这种需要从多个系统中查数据显示的)的时候用CompletableFuture才合适,不然区别真的不大,还不如顺序同步执行。
java源码
Modifier and TypeMethod and DescriptionCompletableFuture
acceptEither(CompletionStage... cfs)
返回一个新的CompletableFuture,当所有给定的CompletableFutures完成时,完成。
static CompletableFuture
anyOf(CompletableFuture... cfs)
返回一个新的CompletableFuture,当任何一个给定的CompletableFutures完成时,完成相同的结果。
CompletableFuture
applyToEither(CompletionStage other, Runnable action)
返回一个新的CompletionStage,当这个和另一个给定的阶段都正常完成时,执行给定的动作。
CompletableFuture
runAfterBothAsync(CompletionStage other, Runnable action)
返回一个新的CompletionStage,当这个和另一个给定阶段正常完成时,使用此阶段的默认异步执行工具执行给定的操作。
CompletableFuture
runAfterBothAsync(CompletionStage other, Runnable action, Executor executor)
返回一个新CompletionStage,当这和其他特定阶段正常完成,使用附带的执行见执行给定的动作CompletionStage
覆盖特殊的完成规则的文档。
CompletableFuture
runAfterEither(CompletionStage other, Runnable action)
返回一个新的CompletionStage,当这个或另一个给定阶段正常完成时,执行给定的操作。
CompletableFuture
runAfterEitherAsync(CompletionStage other, Runnable action)
返回一个新的CompletionStage,当这个或另一个给定阶段正常完成时,使用此阶段的默认异步执行工具执行给定的操作。
CompletableFuture
runAfterEitherAsync(CompletionStage other, Runnable action, Executor executor)
返回一个新的CompletionStage,当这个或另一个给定阶段正常完成时,使用提供的执行器执行给定的操作。
static CompletableFuture
runAsync(Runnable runnable)
返回一个新的CompletableFuture,它在运行给定操作后由运行在 ForkJoinPool.commonPool()
中的任务 异步完成。
static CompletableFuture
runAsync(Runnable runnable, Executor executor)
返回一个新的CompletableFuture,它在运行给定操作之后由在给定执行程序中运行的任务异步完成。
static CompletableFuture
supplyAsync(Supplier supplier)
返回一个新的CompletableFuture,它通过在 ForkJoinPool.commonPool()
中运行的任务与通过调用给定的供应商获得的值 异步完成。
static CompletableFuture
supplyAsync(Supplier supplier, Executor executor)
返回一个新的CompletableFuture,由给定执行器中运行的任务异步完成,并通过调用给定的供应商获得的值。
CompletableFuture
thenAccept(Consumer c) {
if (c != null) {
while (result == null && !tryPushStack(c))
lazySetNext(c, null); // clear on failure
}
}
final CompletableFuture postFire(CompletableFuture a, int mode) {
if (a != null && a.stack != null) {
if (mode < 0 || a.result == null)
a.cleanStack();
else
a.postComplete();
}
if (result != null && stack != null) {
if (mode < 0)
return this;
else
postComplete();
}
return null;
}
@SuppressWarnings("serial")
static final class UniApply extends UniCompletion {
Function base;
CoCompletion(BiCompletion base) { this.base = base; }
final CompletableFuture tryFire(int mode) {
BiCompletion c; CompletableFuture d;
if ((c = base) == null || (d = c.tryFire(mode)) == null)
return null;
base = null; // detach
return d;
}
final boolean isLive() {
BiCompletion c;
return (c = base) != null && c.dep != null;
}
}
/** Pushes completion to this and b unless both done. */
final void bipush(CompletableFuture b, BiCompletion c) {
if (c != null) {
Object r;
while ((r = result) == null && !tryPushStack(c))
lazySetNext(c, null); // clear on failure
if (b != null && b != this && b.result == null) {
Completion q = (r != null) ? c : new CoCompletion(c);
while (b.result == null && !b.tryPushStack(q))
lazySetNext(q, null); // clear on failure
}
}
}
/** Post-processing after successful BiCompletion tryFire. */
final CompletableFuture postFire(CompletableFuture a,
CompletableFuture b, int mode) {
if (b != null && b.stack != null) { // clean second source
if (mode < 0 || b.result == null)
b.cleanStack();
else
b.postComplete();
}
return postFire(a, mode);
}
@SuppressWarnings("serial")
static final class BiApply extends BiCompletion {
BiFunction b;
if (f == null || (b = o.toCompletableFuture()) == null)
throw new NullPointerException();
CompletableFuture d = new CompletableFuture();
if (e != null || !d.biRun(this, b, f, null)) {
BiRun c = new BiRun(e, d, this, b, f);
bipush(b, c);
c.tryFire(SYNC);
}
return d;
}
@SuppressWarnings("serial")
static final class BiRelay extends BiCompletion { // for And
BiRelay(CompletableFuture dep,
CompletableFuture src,
CompletableFuture snd) {
super(null, dep, src, snd);
}
final CompletableFuture tryFire(int mode) {
CompletableFuture d;
CompletableFuture a;
CompletableFuture b;
if ((d = dep) == null || !d.biRelay(a = src, b = snd))
return null;
src = null; snd = null; dep = null;
return d.postFire(a, b, mode);
}
}
boolean biRelay(CompletableFuture a, CompletableFuture b) {
Object r, s; Throwable x;
if (a == null || (r = a.result) == null ||
b == null || (s = b.result) == null)
return false;
if (result == null) {
if (r instanceof AltResult && (x = ((AltResult)r).ex) != null)
completeThrowable(x, r);
else if (s instanceof AltResult && (x = ((AltResult)s).ex) != null)
completeThrowable(x, s);
else
completeNull();
}
return true;
}
/** Recursively constructs a tree of completions. */
static CompletableFuture andTree(CompletableFuture[] cfs,
int lo, int hi) {
CompletableFuture d = new CompletableFuture();
if (lo > hi) // empty
d.result = NIL;
else {
CompletableFuture a, b;
int mid = (lo + hi) >>> 1;
if ((a = (lo == mid ? cfs[lo] :
andTree(cfs, lo, mid))) == null ||
(b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] :
andTree(cfs, mid+1, hi))) == null)
throw new NullPointerException();
if (!d.biRelay(a, b)) {
BiRelay c = new BiRelay(d, a, b);
a.bipush(b, c);
c.tryFire(SYNC);
}
}
return d;
}
/* ------------- Projected (Ored) BiCompletions -------------- */
/** Pushes completion to this and b unless either done. */
final void orpush(CompletableFuture b, BiCompletion c) {
if (c != null) {
while ((b == null || b.result == null) && result == null) {
if (tryPushStack(c)) {
if (b != null && b != this && b.result == null) {
Completion q = new CoCompletion(c);
while (result == null && b.result == null &&
!b.tryPushStack(q))
lazySetNext(q, null); // clear on failure
}
break;
}
lazySetNext(c, null); // clear on failure
}
}
}
@SuppressWarnings("serial")
static final class OrApply extends BiCompletion {
Function b;
if (f == null || (b = o.toCompletableFuture()) == null)
throw new NullPointerException();
CompletableFuture d = new CompletableFuture();
if (e != null || !d.orRun(this, b, f, null)) {
OrRun c = new OrRun(e, d, this, b, f);
orpush(b, c);
c.tryFire(SYNC);
}
return d;
}
@SuppressWarnings("serial")
static final class OrRelay extends BiCompletion { // for Or
OrRelay(CompletableFuture dep, CompletableFuture src,
CompletableFuture snd) {
super(null, dep, src, snd);
}
final CompletableFuture tryFire(int mode) {
CompletableFuture d;
CompletableFuture a;
CompletableFuture b;
if ((d = dep) == null || !d.orRelay(a = src, b = snd))
return null;
src = null; snd = null; dep = null;
return d.postFire(a, b, mode);
}
}
final boolean orRelay(CompletableFuture a, CompletableFuture b) {
Object r;
if (a == null || b == null ||
((r = a.result) == null && (r = b.result) == null))
return false;
if (result == null)
completeRelay(r);
return true;
}
/** Recursively constructs a tree of completions. */
static CompletableFuture orTree(CompletableFuture[] cfs,
int lo, int hi) {
CompletableFuture d = new CompletableFuture();
if (lo c = new OrRelay(d, a, b);
a.orpush(b, c);
c.tryFire(SYNC);
}
}
return d;
}
/* ------------- Zero-input Async forms -------------- */
@SuppressWarnings("serial")
static final class AsyncSupply extends ForkJoinTask
implements Runnable, AsynchronousCompletionTask {
CompletableFuture dep; Supplier fn;
AsyncSupply(CompletableFuture dep, Supplier fn) {
this.dep = dep; this.fn = fn;
}
public final Void getRawResult() { return null; }
public final void setRawResult(Void v) {}
public final boolean exec() { run(); return true; }
public void run() {
CompletableFuture d; Supplier f;
if ((d = dep) != null && (f = fn) != null) {
dep = null; fn = null;
if (d.result == null) {
try {
d.completeValue(f.get());
} catch (Throwable ex) {
d.completeThrowable(ex);
}
}
d.postComplete();
}
}
}
static CompletableFuture asyncSupplyStage(Executor e,
Supplier f) {
if (f == null) throw new NullPointerException();
CompletableFuture d = new CompletableFuture();
e.execute(new AsyncSupply(d, f));
return d;
}
@SuppressWarnings("serial")
static final class AsyncRun extends ForkJoinTask
implements Runnable, AsynchronousCompletionTask {
CompletableFuture dep; Runnable fn;
AsyncRun(CompletableFuture dep, Runnable fn) {
this.dep = dep; this.fn = fn;
}
public final Void getRawResult() { return null; }
public final void setRawResult(Void v) {}
public final boolean exec() { run(); return true; }
public void run() {
CompletableFuture d; Runnable f;
if ((d = dep) != null && (f = fn) != null) {
dep = null; fn = null;
if (d.result == null) {
try {
f.run();
d.completeNull();
} catch (Throwable ex) {
d.completeThrowable(ex);
}
}
d.postComplete();
}
}
}
static CompletableFuture asyncRunStage(Executor e, Runnable f) {
if (f == null) throw new NullPointerException();
CompletableFuture d = new CompletableFuture();
e.execute(new AsyncRun(d, f));
return d;
}
/* ------------- Signallers -------------- */
@SuppressWarnings("serial")
static final class Signaller extends Completion
implements ForkJoinPool.ManagedBlocker {
long nanos; // wait time if timed
final long deadline; // non-zero if timed
volatile int interruptControl; // > 0: interruptible, < 0: interrupted
volatile Thread thread;
Signaller(boolean interruptible, long nanos, long deadline) {
this.thread = Thread.currentThread();
this.interruptControl = interruptible ? 1 : 0;
this.nanos = nanos;
this.deadline = deadline;
}
final CompletableFuture tryFire(int ignore) {
Thread w; // no need to atomically claim
if ((w = thread) != null) {
thread = null;
LockSupport.unpark(w);
}
return null;
}
public boolean isReleasable() {
if (thread == null)
return true;
if (Thread.interrupted()) {
int i = interruptControl;
interruptControl = -1;
if (i > 0)
return true;
}
if (deadline != 0L &&
(nanos 1) ?
1 0) {
if (ThreadLocalRandom.nextSecondarySeed() >= 0)
--spins;
}
else if (q == null)
q = new Signaller(interruptible, 0L, 0L);
else if (!queued)
queued = tryPushStack(q);
else if (interruptible && q.interruptControl < 0) {
q.thread = null;
cleanStack();
return null;
}
else if (q.thread != null && result == null) {
try {
ForkJoinPool.managedBlock(q);
} catch (InterruptedException ie) {
q.interruptControl = -1;
}
}
}
if (q != null) {
q.thread = null;
if (q.interruptControl < 0) {
if (interruptible)
r = null; // report interruption
else
Thread.currentThread().interrupt();
}
}
postComplete();
return r;
}
private Object timedGet(long nanos) throws TimeoutException {
if (Thread.interrupted())
return null;
if (nanos other,
Runnable action) {
return biRunStage(null, other, action);
}
public CompletableFuture runAfterBothAsync(CompletionStage other,
Runnable action) {
return biRunStage(asyncPool, other, action);
}
public CompletableFuture runAfterBothAsync(CompletionStage other,
Runnable action,
Executor executor) {
return biRunStage(screenExecutor(executor), other, action);
}
public CompletableFuture applyToEither(
CompletionStage other,
Runnable action) {
return orRunStage(null, other, action);
}
public CompletableFuture runAfterEitherAsync(CompletionStage other,
Runnable action) {
return orRunStage(asyncPool, other, action);
}
public CompletableFuture runAfterEitherAsync(CompletionStage other,
Runnable action,
Executor executor) {
return orRunStage(screenExecutor(executor), other, action);
}
public CompletableFuture thenCompose(
Function... cfs) {
return andTree(cfs, 0, cfs.length - 1);
}
public static CompletableFuture anyOf(CompletableFuture... cfs) {
return orTree(cfs, 0, cfs.length - 1);
}
/* ------------- Control and status methods -------------- */
public boolean cancel(boolean mayInterruptIfRunning) {
boolean cancelled = (result == null) &&
internalComplete(new AltResult(new CancellationException()));
postComplete();
return cancelled || isCancelled();
}
public boolean isCancelled() {
Object r;
return ((r = result) instanceof AltResult) &&
(((AltResult)r).ex instanceof CancellationException);
}
public boolean isCompletedExceptionally() {
Object r;
return ((r = result) instanceof AltResult) && r != NIL;
}
public void obtrudeValue(T value) {
result = (value == null) ? NIL : value;
postComplete();
}
public void obtrudeException(Throwable ex) {
if (ex == null) throw new NullPointerException();
result = new AltResult(ex);
postComplete();
}
public int getNumberOfDependents() {
int count = 0;
for (Completion p = stack; p != null; p = p.next)
++count;
return count;
}
public String toString() {
Object r = result;
int count;
return super.toString() +
((r == null) ?
(((count = getNumberOfDependents()) == 0) ?
"[Not completed]" :
"[Not completed, " + count + " dependents]") :
(((r instanceof AltResult) && ((AltResult)r).ex != null) ?
"[Completed exceptionally]" :
"[Completed normally]"));
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long RESULT;
private static final long STACK;
private static final long NEXT;
static {
try {
final sun.misc.Unsafe u;
UNSAFE = u = sun.misc.Unsafe.getUnsafe();
Class k = CompletableFuture.class;
RESULT = u.objectFieldOffset(k.getDeclaredField("result"));
STACK = u.objectFieldOffset(k.getDeclaredField("stack"));
NEXT = u.objectFieldOffset
(Completion.class.getDeclaredField("next"));
} catch (Exception x) {
throw new Error(x);
}
}
}
Modifier and TypeMethod and Description
boolean
cancel(boolean mayInterruptIfRunning)
尝试取消执行此任务。
V
get()
等待计算完成,然后检索其结果。
V
get(long timeout, TimeUnit unit)
如果需要等待最多在给定的时间计算完成,然后检索其结果(如果可用)。
boolean
isCancelled()
如果此任务在正常完成之前被取消,则返回 true
。
boolean
isDone()
返回 true
如果任务已完成。
A Future
计算的结果。 提供方法来检查计算是否完成,等待其完成,并检索计算结果。 结果只能在计算完成后使用方法get
进行检索,如有必要,阻塞,直到准备就绪。 取消由cancel
方法执行。 提供其他方法来确定任务是否正常完成或被取消。 计算完成后,不能取消计算。 如果您想使用Future
,以便不可撤销,但不提供可用的结果,则可以声明Future
表格的类型,并返回null
作为基础任务的结果。
package java.util.concurrent;
public interface Future {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
Modifier and TypeMethod and Description
CompletionStage
acceptEither(CompletionStage other, Runnable action)
返回一个新的CompletionStage,当这个和另一个给定的阶段都正常完成时,执行给定的动作。
CompletionStage
runAfterBothAsync(CompletionStage other, Runnable action)
返回一个新的CompletionStage,当这个和另一个给定阶段正常完成时,使用此阶段的默认异步执行工具执行给定的操作。
CompletionStage
runAfterBothAsync(CompletionStage other, Runnable action, Executor executor)
返回一个新CompletionStage,当这和其他特定阶段正常完成,使用附带的执行见执行给定的动作CompletionStage
覆盖特殊的完成规则的文档。
CompletionStage
runAfterEither(CompletionStage other, Runnable action)
返回一个新的CompletionStage,当这个或另一个给定阶段正常完成时,执行给定的操作。
CompletionStage
runAfterEitherAsync(CompletionStage other, Runnable action)
返回一个新的CompletionStage,当这个或另一个给定阶段正常完成时,使用此阶段的默认异步执行工具执行给定的操作。
CompletionStage
runAfterEitherAsync(CompletionStage other, Runnable action, Executor executor)
返回一个新的CompletionStage,当这个或另一个给定阶段正常完成时,使用提供的执行器执行给定的操作。
CompletionStage
thenAccept(Consumer other,
Runnable action);
public CompletionStage runAfterBothAsync(CompletionStage other,
Runnable action);
public CompletionStage runAfterBothAsync(CompletionStage other,
Runnable action,
Executor executor);
public CompletionStage applyToEither
(CompletionStage other,
Runnable action);
public CompletionStage runAfterEitherAsync
(CompletionStage other,
Runnable action);
public CompletionStage runAfterEitherAsync
(CompletionStage other,
Runnable action,
Executor executor);
public CompletionStage thenCompose
(Function
关注
打赏
最近更新
- 深拷贝和浅拷贝的区别(重点)
- 【Vue】走进Vue框架世界
- 【云服务器】项目部署—搭建网站—vue电商后台管理系统
- 【React介绍】 一文带你深入React
- 【React】React组件实例的三大属性之state,props,refs(你学废了吗)
- 【脚手架VueCLI】从零开始,创建一个VUE项目
- 【React】深入理解React组件生命周期----图文详解(含代码)
- 【React】DOM的Diffing算法是什么?以及DOM中key的作用----经典面试题
- 【React】1_使用React脚手架创建项目步骤--------详解(含项目结构说明)
- 【React】2_如何使用react脚手架写一个简单的页面?