加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 百科 > 正文

聊聊reactor extra的retry

发布时间:2020-12-15 06:34:34 所属栏目:百科 来源:网络整理
导读:序 本文主要研究一下reactor extra的retry maven dependency groupIdio.projectreactor.addons/groupId artifactIdreactor-extra/artifactId version3.1.4.RELEASE/version /dependency 实例 TcpClient client = TcpClient.create("localhost",8888); client

本文主要研究一下reactor extra的retry

maven

<dependency>
            <groupId>io.projectreactor.addons</groupId>
            <artifactId>reactor-extra</artifactId>
            <version>3.1.4.RELEASE</version>
        </dependency>

实例

TcpClient client = TcpClient.create("localhost",8888);
        client.newHandler((inbound,outbound) -> {
                return outbound.sendString(Mono.just("Hello World!")).then(inbound.receive()
                        .asString().next().log().then());
        }).doOnError(e -> e.printStackTrace())
                .subscribe();
上面这个TcpClient,在server没有启动的情况下连接会直接报错
io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: localhost/127.0.0.1:8888
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
    at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:325)
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:340)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:633)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.ConnectException: Connection refused
    ... 10 more

连接失败重连

简单重试

client.newHandler((inbound,outbound) -> {
                return outbound.sendString(Mono.just("Hello World!")).then(inbound.receive()
                        .asString().next().log().then());
        }).doOnError(e -> e.printStackTrace())
                .retry(3)
                .subscribe();
retry可以直接指定重试次数

高级重试

client.newHandler((inbound,outbound) -> {
                return outbound.sendString(Mono.just("Hello World!")).then(inbound.receive()
                        .asString().next().log().then());
        }).doOnError(e -> e.printStackTrace())
                .retryWhen(Retry.allBut(RuntimeException.class)
                .retryMax(5000)
                .fixedBackoff(Duration.ofSeconds(5))
                .doOnRetry(e -> {
                    e.exception().printStackTrace();
                })
        )
                .subscribe();
利用reactor extra项目中的Retry帮助类,可以轻松指定高级重试策略,比如fixedBackoff,亦或是exponentialBackoff等
client.newHandler((inbound,outbound) -> {
                return outbound.sendString(Mono.just("Hello World!")).then(inbound.receive()
                        .asString().next().log().then());
        }).doOnError(e -> e.printStackTrace())
                .retryWhen(Retry.allBut(RuntimeException.class)
                .retryMax(5000)           .exponentialBackoffWithJitter(Duration.ofMillis(100),Duration.ofMillis(500))
                .doOnRetry(e -> {
                    e.exception().printStackTrace();
                })
        )
                .subscribe();
这里使用了exponentialBackoffWithJitter,第一个参数是firstBackoff时间,第二个参数是maxBackoff,也就是maxBackoffInterval,如果为null则相当于Duration.ofSeconds(Long.MAX_VALUE)

Retry

reactor-extra-3.1.4.RELEASE-sources.jar!/reactor/retry/Retry.java

/**
     * Returns a retry function that retries errors resulting from all exceptions except
     * the specified non-retriable exceptions. More constraints may be added using
     * {@link #retryMax(int)} or {@link #timeout(Duration)}.
     *
     * @param nonRetriableExceptions exceptions that may not be retried
     * @return retry function that retries all exceptions except the specified non-retriable exceptions.
     */
    @SafeVarargs
    static <T> Retry<T> allBut(final Class<? extends Throwable>... nonRetriableExceptions) {
        Predicate<? super RetryContext<T>> predicate = context -> {
            Throwable exception = context.exception();
            if (exception == null)
                return true;
            for (Class<? extends Throwable> clazz : nonRetriableExceptions) {
                if (clazz.isInstance(exception))
                    return false;
            }
            return true;
        };
        return DefaultRetry.<T>create(predicate);
    }
可以看到使用DefaultRetry来创建

reactor-extra-3.1.4.RELEASE-sources.jar!/reactor/retry/DefaultRetry.java

public static <T> DefaultRetry<T> create(Predicate<? super RetryContext<T>> retryPredicate) {
        return new DefaultRetry<T>(retryPredicate,1,null,Backoff.zero(),Jitter.noJitter(),NOOP_ON_RETRY,(T) null);
    }
注意这里的maxIterations默认为1,也就是如果不指定retryMax,相当于高级重试策略就白费了,这个要额外注意一下。

小结

Reactor Extra提供的Retry工具类非常好用,值得实验一下。

doc

  • Reactor Extra

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读