平时多线程都是用runnable,callable,FutureTask,线程池这类完成任务。而completableFuture可以完成异步任务的编排,更具有灵活性。
创建异步任务 supplyAsync 两种方法,一种是用默认的线程池ForkJoinPool.commonPool(),另一种需要自己定义,推荐后者
1 2 3 4 5 6 public static <U> CompletableFuture<U> supplyAsync (Supplier<U> supplier) ; public static <U> CompletableFuture<U> supplyAsync (Supplier<U> supplier, Executor executor) ;
使用方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public static void main (String[] args) throws ExecutionException, InterruptedException, TimeoutException { ExecutorService myThreadPool = Executors.newFixedThreadPool(4 ); CompletableFuture<Integer> test1 = CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread().toString()); return 1 ; },myThreadPool); CompletableFuture<Integer> test2 = CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread().toString()); return 1 ; }); System.out.println(test1.get(10 , TimeUnit.SECONDS)); myThreadPool.shutdown(); }
结果:
1 2 Thread[ForkJoinPool.commonPool-worker-25,5,main] 1
runAsync 与前面的supplyAsync相比,runAsync没有返回值,就相当于是runnable,前面那个是callable。同样也有两个方法,一个是用默认线程池,另一个自己定义
1 2 3 4 5 6 7 8 public static void main (String[] args) throws ExecutionException, InterruptedException, TimeoutException { ExecutorService myThreadPool = Executors.newFixedThreadPool(4 ); CompletableFuture<Void> test1 = CompletableFuture.runAsync(()->{ System.out.println(Thread.currentThread()); }); System.out.println(test1.get()); myThreadPool.shutdown(); }
结果(这是默认线程):
1 2 3 Thread[ForkJoinPool.commonPool-worker-25,5,main] null
获取结果的方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public T get () throws InterruptedException, ExecutionException public T get (long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException public T join () public T getNow (T valueIfAbsent) public boolean complete (T value) public boolean completeExceptionally (Throwable ex)
异步回调处理
apply就是有参数有返回值,accept就是有参数没有返回值,run就是没有参数没有返回值。async都是可以有异步线程池的
thenApply和thenApplyAsync 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public static void main (String[] args) throws ExecutionException, InterruptedException, TimeoutException { ExecutorService myThreadPool = Executors.newFixedThreadPool(4 ); CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(()->{ System.out.println("task1" +Thread.currentThread()+"->" +1 ); return 1 ; },myThreadPool); CompletableFuture<Integer> task2 = task1.thenApply((result) -> { System.out.println("task2" +Thread.currentThread() + "->" + (1 + result)); return 1 + result; }); CompletableFuture<Integer> task3 = task1.thenApplyAsync((result) -> { System.out.println("task3" +Thread.currentThread() + "->" + (1 + result)); return 1 + result; },myThreadPool); System.out.println(task2.get()); System.out.println(task3.get()); myThreadPool.shutdown(); }
结果:
1 2 3 4 5 task1Thread[pool-1-thread-1,5,main]->1 task2Thread[pool-1-thread-1,5,main]->2 task3Thread[pool-1-thread-2,5,main]->2 2 2
这个结果挺有意思的,需要分析一下。首先是线程池里面提交的线程都是并发的,这里就体现了并发,task2和3先进行了标准输出再完成值的输出。说明并发乱序了。
还有就是thenApply用的是跟前一个任务相同的线程,而带参数的thenApplyAsync用的是不一样的。
默认参数的supplyAsync和thenApply/默认thenApplyAsync
1 2 3 4 5 6 7 8 9 CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(()->{ System.out.println("task1" +Thread.currentThread()+"->" +1 ); return 1 ; }); CompletableFuture<Integer> task2 = task1.thenApply((result) -> { System.out.println("task2" +Thread.currentThread() + "->" + (1 + result)); return 1 + result; });
1 2 3 task1Thread[ForkJoinPool.commonPool-worker-25,5,main]->1 task2Thread[ForkJoinPool.commonPool-worker-25,5,main]->2 2
默认参数的supplyAsync和线程池thenApplyAsync
1 2 3 4 5 6 7 8 9 CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(()->{ System.out.println("task1" +Thread.currentThread()+"->" +1 ); return 1 ; }); CompletableFuture<Integer> task2 = task1.thenApplyAsync((result) -> { System.out.println("task2" +Thread.currentThread() + "->" + (1 + result)); return 1 + result; },myThreadPool);
1 2 3 task1Thread[ForkJoinPool.commonPool-worker-25,5,main]->1 task2Thread[pool-1-thread-1,5,main]->2 2
线程池的supplyAsync和thenApply
1 2 3 4 5 6 7 8 9 CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(()->{ System.out.println("task1" +Thread.currentThread()+"->" +1 ); return 1 ; },myThreadPool); CompletableFuture<Integer> task2 = task1.thenApply((result) -> { System.out.println("task2" +Thread.currentThread() + "->" + (1 + result)); return 1 + result; });
1 2 3 task1Thread[pool-1-thread-1,5,main]->1 task2Thread[pool-1-thread-1,5,main]->2 2
线程池的supplyAsync和线程池thenApplyAsync
1 2 3 4 5 6 7 8 9 CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(()->{ System.out.println("task1" +Thread.currentThread()+"->" +1 ); return 1 ; },myThreadPool); CompletableFuture<Integer> task2 = task1.thenApplyAsync((result) -> { System.out.println("task2" +Thread.currentThread() + "->" + (1 + result)); return 1 + result; },myThreadPool);
1 2 3 task1Thread[pool-1-thread-1,5,main]->1 task2Thread[pool-1-thread-2,5,main]->2 2
线程池supplyAsync和默认thenApplyAsync
1 2 3 4 5 6 7 8 9 CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(()->{ System.out.println("task1" +Thread.currentThread()+"->" +1 ); return 1 ; },myThreadPool); CompletableFuture<Integer> task2 = task1.thenApplyAsync((result) -> { System.out.println("task2" +Thread.currentThread() + "->" + (1 + result)); return 1 + result; });
1 2 3 task1Thread[pool-1-thread-1,5,main]->1 task2Thread[ForkJoinPool.commonPool-worker-25,5,main]->2 2
总结:
thenApply无论如何都会与前一个任务用相同的线程,而thenApplyAsync是重新起一个线程完成,如果没有参数,那么就会使用默认的ForkJoinPool.commonPool()
在后面的有async都是这个逻辑
thenAccept和thenAcceptAsync 1 2 3 4 5 6 7 8 9 10 11 12 13 CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(()->{ System.out.println("task1" +Thread.currentThread()+"->" +1 ); return 1 ; },myThreadPool); CompletableFuture<Void> task2 = task1.thenAccept((result) -> { System.out.println("task2" + Thread.currentThread() + "->" + (1 + result)); }); CompletableFuture<Void> task3 = task1.thenAcceptAsync((result) -> { System.out.println("task3" + Thread.currentThread() + "->" + (1 + result)); },myThreadPool); System.out.println(task2.get());
1 2 3 4 task1Thread[pool-1-thread-1,5,main]->1 task2Thread[pool-1-thread-1,5,main]->2 task3Thread[pool-1-thread-2,5,main]->2 null
这个结果也是乱序了,而且async的与之前一致。不再赘述。
thenRun和thenRunAsync 无入参数无返回值,其他都一样
多任务 thenCombine、thenAcceptBoth 和runAfterBoth combine就是有参数有返回值,accept就是有参数没返回值,run就是无参数无返回值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(()->{ System.out.println("task1" +Thread.currentThread()+"->" +1 ); return 1 ; }); CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> { System.out.println("task2" + Thread.currentThread() + "->" + 2 ); return 2 ; }); CompletableFuture<Integer> task3 = task1.thenCombine(task2, (a, b) -> { System.out.println("task3" + Thread.currentThread() + "->" + (a + b)); return a + b; }); System.out.println(task3.get());
按道理来说task3应该是跟task1一样的线程,有一次结果是一样的,不知道这里为什么是main
1 2 3 4 task1Thread[ForkJoinPool.commonPool-worker-25,5,main]->1 task2Thread[ForkJoinPool.commonPool-worker-18,5,main]->2 task3Thread[main,5,main]->3 3
回答:为什么默认是主线程
主线程调用get方法:当主线程调用get方法时,如果task1和task2已经完成,组合任务task3会立即执行,而不需要切换到另一个线程。这样可以减少线程切换的开销,提高性能。
完成的线程:如果任务在某个线程中完成,且没有其他线程等待结果,那么回调任务可能在完成任务的线程中执行。这种行为由CompletableFuture的默认执行策略决定。
使用thenCombineAsync可以显式指定在异步线程中执行回调任务,从而避免在主线程中执行组合任务。
随后试了一下延长处理时间,发现还真是:
task1Thread[pool-1-thread-1,5,main]->1
task2Thread[pool-1-thread-2,5,main]->2
task3Thread[pool-1-thread-1,5,main]->3
3
说明确实是处理时间短,处理时间短会由于固定的策略直接main做,减少线程开销。所以需要async异步,显示给出
applyToEither,acceptEither,runAfterEither 两个里面做完一个就可以了,其他都一样
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(()->{ System.out.println("task1" +Thread.currentThread()+"->" +1 ); try { Thread.sleep(100 ); } catch (InterruptedException e) { throw new RuntimeException (e); } return 1 ; },myThreadPool); CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> { System.out.println("task2" + Thread.currentThread() + "->" + 2 ); try { Thread.sleep(10 ); } catch (InterruptedException e) { throw new RuntimeException (e); } return 2 ; },myThreadPool); CompletableFuture<Integer> task3 = task1.applyToEither(task2,(first)->{ System.out.println("task3" + Thread.currentThread() + "->" + first); return first; }); System.out.println(task3.get());
1 2 3 4 task1Thread[pool-1-thread-1,5,main]->1 task2Thread[pool-1-thread-2,5,main]->2 task3Thread[pool-1-thread-2,5,main]->2 2
allOf / anyOf allOf:CompletableFuture是多个任务都执行完成后才会执行,只有有一个任务执行异常,则返回的CompletableFuture执行get方法时会抛出异常,如果都是正常执行,则get返回null。
anyOf :CompletableFuture是多个任务只要有一个任务执行完成,则返回的CompletableFuture执行get方法时会抛出异常,如果都是正常执行,则get返回执行完成任务的结果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(()->{ System.out.println("task1" +Thread.currentThread()+"->" +1 ); try { Thread.sleep(100 ); } catch (InterruptedException e) { throw new RuntimeException (e); } return 1 ; },myThreadPool); CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> { System.out.println("task2" + Thread.currentThread() + "->" + 2 ); try { Thread.sleep(100 ); } catch (InterruptedException e) { throw new RuntimeException (e); } return 2 ; },myThreadPool); CompletableFuture<Integer> task3 = CompletableFuture.supplyAsync(()->{ System.out.println("task3" + Thread.currentThread() + "->" + 3 ); try { Thread.sleep(100 ); } catch (InterruptedException e) { throw new RuntimeException (e); } return 3 ; },myThreadPool); System.out.println(CompletableFuture.allOf(task1,task2,task3).get());
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 task1Thread[pool-1-thread-1,5,main]->1 task2Thread[pool-1-thread-2,5,main]->2 task3Thread[pool-1-thread-3,5,main]->3 null //下面这是task2有1/0的情况 task1Thread[pool-1-thread-1,5,main]->1 task2Thread[pool-1-thread-2,5,main]->2 task3Thread[pool-1-thread-3,5,main]->3 Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) at com.hmdp.service.impl.BlogServiceImpl.main(BlogServiceImpl.java:319) Caused by: java.lang.ArithmeticException: / by zero at com.hmdp.service.impl.BlogServiceImpl.lambda$main$4(BlogServiceImpl.java:301) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750)
anyof很简单就不演示了