平时多线程都是用runnable,callable,FutureTask,线程池这类完成任务。而completableFuture可以完成异步任务的编排,更具有灵活性。

创建异步任务

supplyAsync

两种方法,一种是用默认的线程池ForkJoinPool.commonPool(),另一种需要自己定义,推荐后者

1
2
3
4
5
6
// 带返回值异步请求,默认线程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);

// 带返回值的异步请求,可以自定义线程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);

使用方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
ExecutorService myThreadPool = Executors.newFixedThreadPool(4);
//这是有线程池的,这里lambda表达式也可以携程()->1,如果没有别的逻辑的话
CompletableFuture<Integer> test1 = CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread().toString());
return 1;
},myThreadPool);
//默认方法
CompletableFuture<Integer> test2 = CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread().toString());
return 1;
});
System.out.println(test1.get(10, TimeUnit.SECONDS));
myThreadPool.shutdown();
}

结果:

1
2
Thread[ForkJoinPool.commonPool-worker-25,5,main]
1

runAsync

与前面的supplyAsync相比,runAsync没有返回值,就相当于是runnable,前面那个是callable。同样也有两个方法,一个是用默认线程池,另一个自己定义

1
2
3
4
5
6
7
8
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
ExecutorService myThreadPool = Executors.newFixedThreadPool(4);
CompletableFuture<Void> test1 = CompletableFuture.runAsync(()->{
System.out.println(Thread.currentThread());
});
System.out.println(test1.get());
myThreadPool.shutdown();
}

结果(这是默认线程):

1
2
3
Thread[ForkJoinPool.commonPool-worker-25,5,main]
null

获取结果的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 如果完成则返回结果,否则就抛出具体的异常
public T get() throws InterruptedException, ExecutionException

// 最大时间等待返回结果,否则就抛出具体异常
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException

// 完成时返回结果值,否则抛出unchecked异常。为了更好地符合通用函数形式的使用,如果完成此 CompletableFuture所涉及的计算引发异常,则此方法将引发unchecked异常并将底层异常作为其原因
public T join()

// 如果完成则返回结果值(或抛出任何遇到的异常),否则返回给定的 valueIfAbsent。
public T getNow(T valueIfAbsent)

// 如果任务没有完成,返回的值设置为给定值
public boolean complete(T value)

// 如果任务没有完成,就抛出给定异常
public boolean completeExceptionally(Throwable ex)


异步回调处理

apply就是有参数有返回值,accept就是有参数没有返回值,run就是没有参数没有返回值。async都是可以有异步线程池的

thenApply和thenApplyAsync

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
ExecutorService myThreadPool = Executors.newFixedThreadPool(4);

CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(()->{
System.out.println("task1"+Thread.currentThread()+"->"+1);
return 1;
},myThreadPool);

CompletableFuture<Integer> task2 = task1.thenApply((result) -> {
System.out.println("task2"+Thread.currentThread() + "->" + (1 + result));
return 1 + result;
});

CompletableFuture<Integer> task3 = task1.thenApplyAsync((result) -> {
System.out.println("task3"+Thread.currentThread() + "->" + (1 + result));
return 1 + result;
},myThreadPool);

System.out.println(task2.get());

System.out.println(task3.get());

myThreadPool.shutdown();
}

结果:

1
2
3
4
5
task1Thread[pool-1-thread-1,5,main]->1
task2Thread[pool-1-thread-1,5,main]->2
task3Thread[pool-1-thread-2,5,main]->2
2
2

这个结果挺有意思的,需要分析一下。首先是线程池里面提交的线程都是并发的,这里就体现了并发,task2和3先进行了标准输出再完成值的输出。说明并发乱序了。

还有就是thenApply用的是跟前一个任务相同的线程,而带参数的thenApplyAsync用的是不一样的。

默认参数的supplyAsync和thenApply/默认thenApplyAsync

1
2
3
4
5
6
7
8
9
CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(()->{
System.out.println("task1"+Thread.currentThread()+"->"+1);
return 1;
});
//这里用不加参数的thenApplyAsync也是一样的
CompletableFuture<Integer> task2 = task1.thenApply((result) -> {
System.out.println("task2"+Thread.currentThread() + "->" + (1 + result));
return 1 + result;
});
1
2
3
task1Thread[ForkJoinPool.commonPool-worker-25,5,main]->1
task2Thread[ForkJoinPool.commonPool-worker-25,5,main]->2
2

默认参数的supplyAsync和线程池thenApplyAsync

1
2
3
4
5
6
7
8
9
CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(()->{
System.out.println("task1"+Thread.currentThread()+"->"+1);
return 1;
});

CompletableFuture<Integer> task2 = task1.thenApplyAsync((result) -> {
System.out.println("task2"+Thread.currentThread() + "->" + (1 + result));
return 1 + result;
},myThreadPool);
1
2
3
task1Thread[ForkJoinPool.commonPool-worker-25,5,main]->1
task2Thread[pool-1-thread-1,5,main]->2
2

线程池的supplyAsync和thenApply

1
2
3
4
5
6
7
8
9
CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(()->{
System.out.println("task1"+Thread.currentThread()+"->"+1);
return 1;
},myThreadPool);

CompletableFuture<Integer> task2 = task1.thenApply((result) -> {
System.out.println("task2"+Thread.currentThread() + "->" + (1 + result));
return 1 + result;
});
1
2
3
task1Thread[pool-1-thread-1,5,main]->1
task2Thread[pool-1-thread-1,5,main]->2
2

线程池的supplyAsync和线程池thenApplyAsync

1
2
3
4
5
6
7
8
9
CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(()->{
System.out.println("task1"+Thread.currentThread()+"->"+1);
return 1;
},myThreadPool);

CompletableFuture<Integer> task2 = task1.thenApplyAsync((result) -> {
System.out.println("task2"+Thread.currentThread() + "->" + (1 + result));
return 1 + result;
},myThreadPool);
1
2
3
task1Thread[pool-1-thread-1,5,main]->1
task2Thread[pool-1-thread-2,5,main]->2
2

线程池supplyAsync和默认thenApplyAsync

1
2
3
4
5
6
7
8
9
CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(()->{
System.out.println("task1"+Thread.currentThread()+"->"+1);
return 1;
},myThreadPool);

CompletableFuture<Integer> task2 = task1.thenApplyAsync((result) -> {
System.out.println("task2"+Thread.currentThread() + "->" + (1 + result));
return 1 + result;
});
1
2
3
task1Thread[pool-1-thread-1,5,main]->1
task2Thread[ForkJoinPool.commonPool-worker-25,5,main]->2
2

总结:

thenApply无论如何都会与前一个任务用相同的线程,而thenApplyAsync是重新起一个线程完成,如果没有参数,那么就会使用默认的ForkJoinPool.commonPool()

在后面的有async都是这个逻辑

thenAccept和thenAcceptAsync

1
2
3
4
5
6
7
8
9
10
11
12
13
CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(()->{
System.out.println("task1"+Thread.currentThread()+"->"+1);
return 1;
},myThreadPool);

CompletableFuture<Void> task2 = task1.thenAccept((result) -> {
System.out.println("task2" + Thread.currentThread() + "->" + (1 + result));
});

CompletableFuture<Void> task3 = task1.thenAcceptAsync((result) -> {
System.out.println("task3" + Thread.currentThread() + "->" + (1 + result));
},myThreadPool);
System.out.println(task2.get());
1
2
3
4
task1Thread[pool-1-thread-1,5,main]->1
task2Thread[pool-1-thread-1,5,main]->2
task3Thread[pool-1-thread-2,5,main]->2
null

这个结果也是乱序了,而且async的与之前一致。不再赘述。

thenRun和thenRunAsync

无入参数无返回值,其他都一样

多任务

thenCombine、thenAcceptBoth 和runAfterBoth

combine就是有参数有返回值,accept就是有参数没返回值,run就是无参数无返回值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(()->{
System.out.println("task1"+Thread.currentThread()+"->"+1);
return 1;
});

CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {
System.out.println("task2" + Thread.currentThread() + "->" + 2);
return 2;
});
CompletableFuture<Integer> task3 = task1.thenCombine(task2, (a, b) -> {
System.out.println("task3" + Thread.currentThread() + "->" + (a + b));
return a + b;
});

System.out.println(task3.get());

按道理来说task3应该是跟task1一样的线程,有一次结果是一样的,不知道这里为什么是main

1
2
3
4
task1Thread[ForkJoinPool.commonPool-worker-25,5,main]->1
task2Thread[ForkJoinPool.commonPool-worker-18,5,main]->2
task3Thread[main,5,main]->3
3

回答:为什么默认是主线程

  1. 主线程调用get方法:当主线程调用get方法时,如果task1和task2已经完成,组合任务task3会立即执行,而不需要切换到另一个线程。这样可以减少线程切换的开销,提高性能。

  2. 完成的线程:如果任务在某个线程中完成,且没有其他线程等待结果,那么回调任务可能在完成任务的线程中执行。这种行为由CompletableFuture的默认执行策略决定。

使用thenCombineAsync可以显式指定在异步线程中执行回调任务,从而避免在主线程中执行组合任务。

随后试了一下延长处理时间,发现还真是:

task1Thread[pool-1-thread-1,5,main]->1

task2Thread[pool-1-thread-2,5,main]->2

task3Thread[pool-1-thread-1,5,main]->3

3

说明确实是处理时间短,处理时间短会由于固定的策略直接main做,减少线程开销。所以需要async异步,显示给出

applyToEither,acceptEither,runAfterEither

两个里面做完一个就可以了,其他都一样

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(()->{
System.out.println("task1"+Thread.currentThread()+"->"+1);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return 1;
},myThreadPool);

CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {
System.out.println("task2" + Thread.currentThread() + "->" + 2);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return 2;
},myThreadPool);
CompletableFuture<Integer> task3 = task1.applyToEither(task2,(first)->{
System.out.println("task3" + Thread.currentThread() + "->" + first);
return first;
});


System.out.println(task3.get());
1
2
3
4
task1Thread[pool-1-thread-1,5,main]->1
task2Thread[pool-1-thread-2,5,main]->2
task3Thread[pool-1-thread-2,5,main]->2
2

allOf / anyOf

allOf:CompletableFuture是多个任务都执行完成后才会执行,只有有一个任务执行异常,则返回的CompletableFuture执行get方法时会抛出异常,如果都是正常执行,则get返回null。

anyOf :CompletableFuture是多个任务只要有一个任务执行完成,则返回的CompletableFuture执行get方法时会抛出异常,如果都是正常执行,则get返回执行完成任务的结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(()->{
System.out.println("task1"+Thread.currentThread()+"->"+1);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return 1;
},myThreadPool);

CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {
System.out.println("task2" + Thread.currentThread() + "->" + 2);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return 2;
},myThreadPool);
CompletableFuture<Integer> task3 = CompletableFuture.supplyAsync(()->{
System.out.println("task3" + Thread.currentThread() + "->" + 3);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return 3;
},myThreadPool);


System.out.println(CompletableFuture.allOf(task1,task2,task3).get());
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
task1Thread[pool-1-thread-1,5,main]->1
task2Thread[pool-1-thread-2,5,main]->2
task3Thread[pool-1-thread-3,5,main]->3
null


//下面这是task2有1/0的情况
task1Thread[pool-1-thread-1,5,main]->1
task2Thread[pool-1-thread-2,5,main]->2
task3Thread[pool-1-thread-3,5,main]->3
Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at com.hmdp.service.impl.BlogServiceImpl.main(BlogServiceImpl.java:319)
Caused by: java.lang.ArithmeticException: / by zero
at com.hmdp.service.impl.BlogServiceImpl.lambda$main$4(BlogServiceImpl.java:301)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)

anyof很简单就不演示了