聊聊reactor异步线程的变量传递
发布时间:2020-12-15 06:31:49 所属栏目:百科 来源:网络整理
导读:序 本文主要研究下reactor异步线程的变量传递 threadlocal的问题 在传统的请求/应答同步模式中,使用threadlocal来传递上下文变量是非常方便的,可以省得在每个方法参数添加公用的变量,比如当前登录用户。但是业务方法可能使用了async或者在其他线程池中异
序本文主要研究下reactor异步线程的变量传递 threadlocal的问题在传统的请求/应答同步模式中,使用threadlocal来传递上下文变量是非常方便的,可以省得在每个方法参数添加公用的变量,比如当前登录用户。但是业务方法可能使用了async或者在其他线程池中异步执行,这个时候threadlocal的作用就失效了。 这个时候的解决办法就是采取propagation模式,即在同步线程与异步线程衔接处传播这个变量。 TaskDecorator比如spring就提供了TaskDecorator,通过实现这个接口,可以自己控制传播那些变量。例如: class MdcTaskDecorator implements TaskDecorator { @Override public Runnable decorate(Runnable runnable) { // Right now: Web thread context ! // (Grab the current thread MDC data) Map<String,String> contextMap = MDC.getCopyOfContextMap(); return () -> { try { // Right now: @Async thread context ! // (Restore the Web thread context's MDC data) MDC.setContextMap(contextMap); runnable.run(); } finally { MDC.clear(); } }; } } 这里注意在finally里头clear 配置这个taskDecorator @EnableAsync @Configuration public class AsyncConfig implements AsyncConfigurer { @Override public Executor getAsyncExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setTaskDecorator(new MdcTaskDecorator()); executor.initialize(); return executor; } } 完整实例详见 Spring 4.3: Using a TaskDecorator to copy MDC data to @Async threads Reactor Contextspring5引入webflux,其底层是基于reactor,那么reactor如何进行上下文变量的传播呢?官方提供了Context对象来替代threadlocal。 其特性如下:
实例设置及读取@Test public void testSubscriberContext(){ String key = "message"; Mono<String> r = Mono.just("Hello") .flatMap( s -> Mono.subscriberContext() .map( ctx -> s + " " + ctx.get(key))) .subscriberContext(ctx -> ctx.put(key,"World")); StepVerifier.create(r) .expectNext("Hello World") .verifyComplete(); } 这里从最底部的subscriberContext设置message值为World,然后flatMap里头通过subscriberContext来访问。 自底向上@Test public void testContextSequence(){ String key = "message"; Mono<String> r = Mono.just("Hello") //NOTE 这个subscriberContext设置的太高了 .subscriberContext(ctx -> ctx.put(key,"World")) .flatMap( s -> Mono.subscriberContext() .map( ctx -> s + " " + ctx.getOrDefault(key,"Stranger"))); StepVerifier.create(r) .expectNext("Hello Stranger") .verifyComplete(); } 由于这个例子的subscriberContext设置的太高了,不能作用在flatMap里头的Mono.subscriberContext() 不可变@Test public void testContextImmutable(){ String key = "message"; Mono<String> r = Mono.subscriberContext() .map( ctx -> ctx.put(key,"Hello")) //这里返回了一个新的,因此上面的设置失效了 .flatMap( ctx -> Mono.subscriberContext()) .map( ctx -> ctx.getOrDefault(key,"Default")); StepVerifier.create(r) .expectNext("Default") .verifyComplete(); } subscriberContext永远返回一个新的 多个连续的subscriberContext@Test public void testReadOrder(){ String key = "message"; Mono<String> r = Mono.just("Hello") .flatMap( s -> Mono.subscriberContext() .map( ctx -> s + " " + ctx.get(key))) .subscriberContext(ctx -> ctx.put(key,"Reactor")) .subscriberContext(ctx -> ctx.put(key,"World")); StepVerifier.create(r) .expectNext("Hello Reactor") .verifyComplete(); } operator只会读取离它最近的一个context flatMap间的subscriberContext@Test public void testContextBetweenFlatMap(){ String key = "message"; Mono<String> r = Mono.just("Hello") .flatMap( s -> Mono.subscriberContext() .map( ctx -> s + " " + ctx.get(key))) .subscriberContext(ctx -> ctx.put(key,"Reactor")) .flatMap( s -> Mono.subscriberContext() .map( ctx -> s + " " + ctx.get(key))) .subscriberContext(ctx -> ctx.put(key,"World")); StepVerifier.create(r) .expectNext("Hello Reactor World") .verifyComplete(); } flatMap读取离它最近的context flatMap中的subscriberContext@Test public void testContextInFlatMap(){ String key = "message"; Mono<String> r = Mono.just("Hello") .flatMap( s -> Mono.subscriberContext() .map( ctx -> s + " " + ctx.get(key)) ) .flatMap( s -> Mono.subscriberContext() .map( ctx -> s + " " + ctx.get(key)) .subscriberContext(ctx -> ctx.put(key,"Reactor")) ) .subscriberContext(ctx -> ctx.put(key,"World")); StepVerifier.create(r) .expectNext("Hello World Reactor") .verifyComplete(); } 这里第一个flatMap无法读取第二个flatMap内部的context 小结reactor通过提供Context来实现了类似同步线程threadlocal的功能,非常强大,值得好好琢磨。 doc
(编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |