Reactor
原文:http://blog.51cto.com/liukang/2090191 Project Reactor与Spring是兄弟项目,侧重于Server端的响应式编程,主要artifact是reactor-core,这是一个基于Java 8的实现了响应式流规范(Reactive Streams specification)的响应式库。 1.Flux与MonoReactor中发布者(Publisher)由Flux和Mono两个类定义,它们都提供了丰富的操作符。一个Flux对象代表一个包含0~N个元素的响应式序列,而一个Mono对象代表一个包含0/1个元素的结果。 即然是“数据流”的发布者,Flux和Mono都可以发现三种“数据信号”:元素值、错误信号、完成信号,错误信号和完成信号都是终止信号,完成信号用于告知下游订阅者该数据流正常结束,错误信号终止数据流的同时将错误传递给下游订阅者。 Flux.just(1,2,3,4,5,6); Mono.just(1); Flux和Mono提供了多种创建数据流的方法,just就是一种比较直接的声明数据流的方式,其参数就是数据元素。 对于上面的Flux,还可以通过如下方式声明: Integer[] array = new Integer[]{1,6}; Flux.fromArray(array); List<Integer> list = Arrays.asList(array); Flux.fromIterable(list); Stream<Integer> stream = list.stream(); Flux.fromStream(stream); // 只有完成信号的空数据流 Flux.just(); Flux.empty(); Mono.empty(); Mono.justOrEmpty(Optional.empty()); // 只有错误信号的数据流 Flux.error(new Exception("some error")); Mono.error(new Exception("some error")); 2.订阅前什么都不会发生数据流有了,假设我们想把每个数据元素原封不动地打印出来 Flux.just(1,6).subscribe(System.out::print); Mono.just(1).subscribe(System.out::println); 此外,Flux和Mono还提供了多个subscribe方法的变体: // 订阅并触发数据流 subscribe(); // 订阅并指定对正常数据元素如何处理 subscribe(Consumer<? super T> consumer); // 订阅并定义对正常数据元素和错误信号的处理 subscribe(Consumer<? super T> consumer,Consumer<? super Throwable> errorConsumer); // 订阅并定义对正常数据元素、错误信号和完成信号的处理 subscribe(Consumer<? super T> consumer,Consumer<? super Throwable> errorConsumer,Runnable completeConsumer); // 订阅并定义对正常数据元素、错误信号和完成信号的处理,以及订阅发生时的处理逻辑 subscribe(Consumer<? super T> consumer,Runnable completeConsumer,Consumer<? super Subscription> subscriptionConsumer); 如果是订阅上边声明的Flux: Flux.just(1,6).subscribe( System.out::println,System.err::println,() -> System.out.println("Completed!")); 输出 : 1 2 3 4 5 6 Completed! 这里需要注意的一点是,Flux.just(1,6)仅仅声明了这个数据流,此时数据元素并未发出,只有subscribe()方法调用的时候才会触发数据流,所以,订阅前什么都不会发生。 3.测试与调试在命令世界,调试通常都是非常直观的:直接看stack trace就可以找到问题出现的位置,以有其他信息。当你切换到响应式异步代码,事情就变得复杂多了,先了解一个基本的单元测试工具-StepVerifier。 最常见的测试Reactor序列的场景就是定义一个Flux或Mono,然后在订阅它的时候测试它的行为。 当你的测试关注于每一个数据元素的时候,就非常贴近使用StepVerifier的测场景。 private Flux<Integer> generateFluxFrom1To6() { return Flux.just(1,6); } private Mono<Integer> generateMonoWithError() { return Mono.error(new Exception("some error")); } @Test public void testViaStepVerifier() { StepVerifier.create(generateFluxFrom1To6()) .expectNext(1,6) .expectComplete() .verify(); StepVerifier.create(generateMonoWithError()) .expectErrorMessage("some error") .verify(); 4.操作符(1)map-元素映射为新元素 map操作符可以将数据元素进行转换/映射,得到一个新元素。 StepVerifier.create(Flux.range(1,6) // 1 .map(i -> i * i)) // 2 .expectNext(1,9,16,25,36) //3 .expectComplete(); // 4 (2)flatMap-元素映射为流 StepVerifier.create( Flux.just("flux","mono") .flatMap(s -> Flux.fromArray(s.split("s*")) // 1.对于每一个字符串s,将其拆分为包含一个字符串流 .delayElements(Duration.ofMillis(100))) // 2.对于每个元素延迟100ms .doOnNext(System.out::print)) // 3.对每个元素进行打印 .expectNextCount(8) // 4.验证是否发出8个元素 .verifyComplete(); flatMap通常用于每个元素又引入数据流的情况,比如我们有一串url,需要请求每个url并收集response数据,假设响应式的请求方法如下: Mono<HttpResponse> requestUrl(String url) {...} 而url数据流为一个Flux<String> urlFluxt,那么为了得到所有HttpReponse,就需要用到flatMap urlFlux.flatMap(url -> requestUrl(url)); (3)filter-过滤 filter操作符可以对数据元素进行筛选。 StepVerifier.create(Flux.range(1,6) .filter(i -> i % 2 == 1) // 1 .map(i -> i * i)) .expectNext(1,25) // 2 .verifyComplete(); (4)zip 看到zip这个词可能会联想到拉链,它能够将多个流一对一的合并起来。zip有多个方法变体,这里介绍最常见的二合一的。 举个例子,假设我们有一个关于zip方法的说明desc,我们希望将这句话拆分为一个一个的单词并以每200ms一个的速度发出,除了前flatMap的例子中用到的delayElements,可以如下操作: private Flux<String> getZipDescFlux() { String desc = "Zip two sources together,that is to say wait for all the sources to emit one element and combine these elements once into a Tuple2."; return Flux.fromArray(desc.split("s+")); // 1.用空格拆分 } @Test public void testSimpleOperators() throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(1); // 2.不使用它的话,测试方法所在线程会直接返回而不等待数据流发出完毕 Flux.zip( getZipDescFlux(),Flux.interval(Duration.ofMillis(200))) // 3.每200ms发出一个数据,因为zip是一对一的,所以字符串流也将具有同样的速度 .subscribe(t -> System.out.println(t.getT1()),null,countDownLatch::countDown); // 4.zip之后的中元素为Tuple2 countDownLatch.await(10,TimeUnit.SECONDS); // 5.最多等待10s } 5.调试器与线程模型在以往的多线程开发场景中,我们通常使用Executors工具类来创建线程池。但在Reactor中,使用调试器(Scheduler)来处理这些事情。Scheduler是一个拥有多个实现类的抽象接口。Schedulers(工具类)提供的静态方法可搭建以下几中线程执行环境:
举例: (1)同步变异步 private String getStringSync() { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } return "Hello,Reactor!"; } 正常情况下,调用这个方法会被阻塞2s,然后同步地返回结果,我们借助elastic调度器将其变为异步,由于是异步,为了保证测试方法所在的线程能够等待结果的返回,我们使用CountDownLatch @Test public void testSyncToAsync() throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(1); Mono.fromCallable(() -> getStringSync()) // 1 .subscribeOn(Schedulers.elastic()) // 2 .subscribe(System.out::println,countDownLatch::countDown); countDownLatch.await(10,TimeUnit.SECONDS); } 6.切换调度器操作符Reactor提供了两种在响应式链中调整调度器Scheduler的方法:publishOn和subscribeOn。它们都接收一个Scheduler作为参数。但是publishOn在链中出现的位置有讲究,而subscribeOn则无所谓。 7.错误处理(1)捕获并返回一个静态的缺省值 Flux.range(1,6) .map(i -> 10/(i-3)) .onErrorReturn(0) // 1.这个方法能够在收到错误信号时提供一个缺省值 .map(i -> i*i) .subscribe(System.out::println,System.err::println); (2)捕获并执行一个异常处理方法或计算一个候补值来顶替 Flux.range(1,6) .map(i -> 10/(i-3)) .onErrorResume(e -> Mono.just(new Random().nextInt(6))) // 提供新的数据流 .map(i -> i*i) .subscribe(System.out::println,System.err::println); (3)捕获,并再包装为某一个业务相关的异常,然后再抛出业务异常 Flux.just("timeout1") .flatMap(k -> callExternalService(k)) // 1 .onErrorMap(original -> new BusinessException("SLA exceeded",original)); // 2 (4)捕获,记录错误日志,然后继续抛出 Flux.just(endpoint1,endpoint2) .flatMap(k -> callExternalService(k)) .doOnError(e -> { // 1 log("uh oh,falling back,service failed for key " + k); // 2 }) .onErrorResume(e -> getFromCache(k)); (5)使用finally来清理资源,或使用Java7引入try-with-resource Flux.using( () -> getResource(),// 1 resource -> Flux.just(resource.getAll()),// 2 MyResource::clean // 3.最终清理资源 ); (6)重试 Flux.range(1,6) .map(i -> 10 / (3 - i)) .retry(1) .subscribe(System.out::println,System.err::println); Thread.sleep(100); // 确保序列执行完 8.回压public void testBackpressure() { Flux.range(1,6) // 1.是一个快的Publiser .doOnRequest(n -> System.out.println("Request " + n + " values...")) // 2.每次request的时候打印request的个数 .subscribe(new BaseSubscriber<Integer>() { // 3.通过重写BaseSubscriber的方法来自定义Subscriber @Override protected void hookOnSubscribe(Subscription subscription) { // 4.在订阅的时候执行 System.out.println("Subscribed and make a request..."); request(1); // 5.订阅时首先向上游请求1个元素 } @Override protected void hookOnNext(Integer value) { // 6.每次收到一个元素的时候操作 try { TimeUnit.SECONDS.sleep(1); // 7.模拟慢的subscriber } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Get value [" + value + "]"); // 8 request(1); // 9 } }); } Reactor提供了一个BaseSubscriber,我们可以通过扩展它的自定义Subscriber,Subscriber通过request(n)的方法来告知上游它的需求速度 。 (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |