加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 编程开发 > Java > 正文

RxJava中的指数退避

发布时间:2020-12-15 04:37:19 所属栏目:Java 来源:网络整理
导读:我有一个API,它接受一个触发事件的Observable. 我想返回一个Observable,如果检测到Internet连接,则每个defaultDelay秒发出一个值,如果没有连接则延迟numberOfFailedAttempts ^ 2次. 我尝试了一堆不同的样式,我遇到的最大问题是重试时,只能评估一次可观察的情
我有一个API,它接受一个触发事件的Observable.

我想返回一个Observable,如果检测到Internet连接,则每个defaultDelay秒发出一个值,如果没有连接则延迟numberOfFailedAttempts ^ 2次.

我尝试了一堆不同的样式,我遇到的最大问题是重试时,只能评估一次可观察的情况:

Observable
    .interval(defaultDelay,TimeUnit.MILLISECONDS)
    .observeOn(Schedulers.io())
    .repeatWhen((observable) ->
         observable.concatMap(repeatObservable -> {
             if(internetConnectionDetector.isInternetConnected()){
                 consecutiveRetries = 0;
                 return observable;
             } else {
                 consecutiveRetries++;
                 int backoffDelay = (int)Math.pow(consecutiveRetries,2);
                 return observable.delay(backoffDelay,TimeUnit.SECONDS);
                }
         }).onBackpressureDrop())
    .onBackpressureDrop();

有什么办法可以做我想做的事吗?我发现了一个相关的问题(目前无法找到它),但采用的方法似乎不适用于动态值.

解决方法

在您的代码中有两个错误:

>为了重复一些可观察的序列,该序列必须是有限的.即而不是间隔你最好使用像just或者fromCallable这样的东西,就像我在下面的例子中所做的那样.
>从repeatWhen的内部函数,您需要返回新的延迟可观察源,因此您必须返回Observable.timer()而不是observable.delay().

工作代码:

public void testRepeat() throws InterruptedException {
    logger.info("test start");

    int DEFAULT_DELAY = 100; // ms
    int ADDITIONAL_DELAY = 100; // ms
    AtomicInteger generator = new AtomicInteger(0);
    AtomicBoolean connectionAlive = new AtomicBoolean(true); // initially alive

    Disposable subscription = Observable.fromCallable(generator::incrementAndGet)
            .repeatWhen(counts -> {
                AtomicInteger retryCounter = new AtomicInteger(0);
                return counts.flatMap(c -> {
                    int retry = 0;
                    if (connectionAlive.get()) {
                        retryCounter.set(0); // reset counter
                    } else {
                        retry = retryCounter.incrementAndGet();
                    }
                    int additionalDelay = ADDITIONAL_DELAY * (int) Math.pow(retry,2);
                    logger.info("retry={},additionalDelay={}ms",retry,additionalDelay);
                    return Observable.timer(DEFAULT_DELAY + additionalDelay,TimeUnit.MILLISECONDS);
                });
            })
            .subscribe(v -> logger.info("got {}",v));

    Thread.sleep(220);
    logger.info("connection dropped");
    connectionAlive.set(false);
    Thread.sleep(2000);
    logger.info("connection is back alive");
    connectionAlive.set(true);
    Thread.sleep(2000);
    subscription.dispose();
    logger.info("test complete");
}

查看有关repeatWhen here的详细文章.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读