Java并发 CompletableFuture异步编程的实现
前面我们不止一次提到,用多线程优化性能,其实不过就是将串行操作变成并行操作。如果仔细观察,你还会发现在串行转换成并行的过程中,一定会涉及到异步化,例如下面的示例代码,现在是串行的,为了提升性能,我们得把它们并行化。 // 以下两个方法都是耗时操作 doBizA(); doBizB(); //创建两个子线程去执行就可以了,两个操作已经被异步化了。 new Thread(()->doBizA()) .start(); new Thread(()->doBizB()) .start();
CompletableFuture 的核心优势 为了领略 CompletableFuture 异步编程的优势,这里我们用 CompletableFuture 重新实现前面曾提及的烧水泡茶程序。首先还是需要先完成分工方案,在下面的程序中,我们分了 3 个任务:任务 1 负责洗水壶、烧开水,任务 2 负责洗茶壶、洗茶杯和拿茶叶,任务 3 负责泡茶。其中任务 3 要等待任务 1 和任务 2 都完成后才能开始。这个分工如下图所示。 烧水泡茶分工方案 // 任务 1:洗水壶 -> 烧开水 CompletableFuture<Void> f1 = CompletableFuture.runAsync(()->{ System.out.println("T1: 洗水壶..."); sleep(1,TimeUnit.SECONDS); System.out.println("T1: 烧开水..."); sleep(15,TimeUnit.SECONDS); }); // 任务 2:洗茶壶 -> 洗茶杯 -> 拿茶叶 CompletableFuture<String> f2 = CompletableFuture.supplyAsync(()->{ System.out.println("T2: 洗茶壶..."); sleep(1,TimeUnit.SECONDS); System.out.println("T2: 洗茶杯..."); sleep(2,TimeUnit.SECONDS); System.out.println("T2: 拿茶叶..."); sleep(1,TimeUnit.SECONDS); return " 龙井 "; }); // 任务 3:任务 1 和任务 2 完成后执行:泡茶 CompletableFuture<String> f3 = f1.thenCombine(f2,(__,tf)->{ System.out.println("T1: 拿到茶叶:" + tf); System.out.println("T1: 泡茶..."); return " 上茶:" + tf; }); // 等待任务 3 执行结果 System.out.println(f3.join()); void sleep(int t,TimeUnit u) { try { u.sleep(t); }catch(InterruptedException e){} } // 一次执行结果: T1: 洗水壶... T2: 洗茶壶... T1: 烧开水... T2: 洗茶杯... T2: 拿茶叶... T1: 拿到茶叶: 龙井 T1: 泡茶... 上茶: 龙井 从整体上来看,我们会发现
领略 CompletableFuture 异步编程的优势之后,下面我们详细介绍 CompletableFuture 的使用。 创建 CompletableFuture 对象 创建 CompletableFuture 对象主要靠下面代码中展示的这 4 个静态方法,我们先看前两个。在烧水泡茶的例子中,我们已经使用了 前两个方法和后两个方法的区别在于:后两个方法可以指定线程池参数。 默认情况下 CompletableFuture 会使用公共的 ForkJoinPool 线程池,这个线程池默认创建的线程数是 CPU 的核数(也可以通过 // 使用默认线程池 static CompletableFuture<Void> runAsync(Runnable runnable) static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) // 可以指定线程池 static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor) static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor) 创建完 CompletableFuture 对象之后,会自动地异步执行 runnable.run() 方法或者 supplier.get() 方法,对于一个异步操作,你需要关注两个问题:一个是异步操作什么时候结束,另一个是如何获取异步操作的执行结果。因为 CompletableFuture 类实现了 Future 接口,所以这两个问题你都可以通过 Future 接口来解决。另外,CompletableFuture 类还实现了 CompletionStage 接口,这个接口内容实在是太丰富了,在 1.8 版本里有 40 个方法,这些方法我们该如何理解呢? 理解 CompletionStage 接口 可以站在分工的角度类比一下工作流。任务是有时序关系的,比如有 串行关系 并行关系 汇聚关系 CompletionStage 接口可以清晰地描述任务之间的这种时序关系,例如前面提到的 最后就是异常,CompletionStage 接口也可以方便地描述异常处理。 下面我们就来一一介绍,CompletionStage 接口如何描述串行关系、AND 聚合关系、OR 聚合关系以及异常处理。 1. 描述串行关系 CompletionStage 接口里面描述串行关系,主要是 thenApply、thenAccept、thenRun 和 thenCompose 这四个系列的接口。 thenApply 系列函数里参数 fn 的类型是接口 Function<T,R>,这个接口里与 CompletionStage 相关的方法是 而 thenAccept 系列方法里参数 consumer 的类型是接口 thenRun 系列方法里 action 的参数是 Runnable,所以 action 既不能接收参数也不支持返回值,所以 thenRun 系列方法返回的也是 这些方法里面 Async 代表的是异步执行 fn、consumer 或者 action。其中,需要你注意的是 thenCompose 系列方法,这个系列的方法会新创建出一个子流程,最终结果和 thenApply 系列是相同的。 CompletionStage<R> thenApply(fn); CompletionStage<R> thenApplyAsync(fn); CompletionStage<Void> thenAccept(consumer); CompletionStage<Void> thenAcceptAsync(consumer); CompletionStage<Void> thenRun(action); CompletionStage<Void> thenRunAsync(action); CompletionStage<R> thenCompose(fn); CompletionStage<R> thenComposeAsync(fn); 通过下面的示例代码,你可以看一下 thenApply() 方法是如何使用的。首先通过 supplyAsync() 启动一个异步流程,之后是两个串行操作,整体看起来还是挺简单的。不过,虽然这是一个异步流程,但任务①②③却是串行执行的,②依赖①的执行结果,③依赖②的执行结果。 CompletableFuture<String> f0 = CompletableFuture.supplyAsync( () -> "Hello World") //① .thenApply(s -> s + " QQ") //② .thenApply(String::toUpperCase);//③ System.out.println(f0.join()); // 输出结果 HELLO WORLD QQ 2. 描述 AND 汇聚关系 CompletionStage 接口里面描述 AND 汇聚关系,主要是 thenCombine、thenAcceptBoth 和 runAfterBoth 系列的接口,这些接口的区别也是源自 fn、consumer、action 这三个核心参数不同。 CompletionStage<R> thenCombine(other,fn); CompletionStage<R> thenCombineAsync(other,fn); CompletionStage<Void> thenAcceptBoth(other,consumer); CompletionStage<Void> thenAcceptBothAsync(other,consumer); CompletionStage<Void> runAfterBoth(other,action); CompletionStage<Void> runAfterBothAsync(other,action); 3. 描述 OR 汇聚关系 CompletionStage 接口里面描述 OR 汇聚关系,主要是 applyToEither、acceptEither 和 runAfterEither 系列的接口,这些接口的区别也是源自 fn、consumer、action 这三个核心参数不同。 CompletionStage applyToEither(other,fn); CompletionStage applyToEitherAsync(other,fn); CompletionStage acceptEither(other,consumer); CompletionStage acceptEitherAsync(other,consumer); CompletionStage runAfterEither(other,action); CompletionStage runAfterEitherAsync(other,action); CompletableFuture<String> f1 = CompletableFuture.supplyAsync(()->{ int t = getRandom(5,10); sleep(t,TimeUnit.SECONDS); return String.valueOf(t); }); CompletableFuture<String> f2 = CompletableFuture.supplyAsync(()->{ int t = getRandom(5,TimeUnit.SECONDS); return String.valueOf(t); }); CompletableFuture<String> f3 = f1.applyToEither(f2,s -> s); System.out.println(f3.join()); 4. 异常处理 虽然上面我们提到的 fn、consumer、action 它们的核心方法都 CompletableFuture<Integer> f0 = CompletableFuture. .supplyAsync(()->(7/0)) .thenApply(r->r*10); System.out.println(f0.join()); CompletionStage 接口给我们提供的方案非常简单,比 try{}catch{}还要简单,下面是相关的方法,使用这些方法进行异常处理和串行操作是一样的,都支持链式编程方式。 CompletionStage exceptionally(fn); CompletionStage<R> whenComplete(consumer); CompletionStage<R> whenCompleteAsync(consumer); CompletionStage<R> handle(fn); CompletionStage<R> handleAsync(fn); 下面的示例代码展示了如何使用 exceptionally() 方法来处理异常,exceptionally() 的使用非常类似于 try{}catch{}中的 catch{},但是由于支持链式编程方式,所以相对更简单。 whenComplete() 和 handle() 系列方法就类似于 try{}finally{}中的 finally{},无论是否发生异常都会执行 whenComplete() 中的回调函数 consumer 和 handle() 中的回调函数 fn。 whenComplete() 和 handle() 的区别在于 whenComplete() 不支持返回结果,而 handle() 是支持返回结果的。 CompletableFuture<Integer> f0 = CompletableFuture .supplyAsync(()->7/0)) .thenApply(r->r*10) .exceptionally(e->0); System.out.println(f0.join()); 总结 不过最近几年,伴随着 CompletableFuture 已经能够满足简单的异步编程需求,如果你对异步编程感兴趣,可以重点关注 RxJava 这个项目,利用 RxJava,即便在 Java 1.6 版本也能享受异步编程的乐趣。 以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。 (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |