RxJava中的指数退避
发布时间:2020-12-15 04:37:19 所属栏目:Java 来源:网络整理
导读:我有一个API,它接受一个触发事件的Observable. 我想返回一个Observable,如果检测到Internet连接,则每个defaultDelay秒发出一个值,如果没有连接则延迟numberOfFailedAttempts ^ 2次. 我尝试了一堆不同的样式,我遇到的最大问题是重试时,只能评估一次可观察的情
我有一个API,它接受一个触发事件的Observable.
我想返回一个Observable,如果检测到Internet连接,则每个defaultDelay秒发出一个值,如果没有连接则延迟numberOfFailedAttempts ^ 2次. 我尝试了一堆不同的样式,我遇到的最大问题是重试时,只能评估一次可观察的情况: Observable .interval(defaultDelay,TimeUnit.MILLISECONDS) .observeOn(Schedulers.io()) .repeatWhen((observable) -> observable.concatMap(repeatObservable -> { if(internetConnectionDetector.isInternetConnected()){ consecutiveRetries = 0; return observable; } else { consecutiveRetries++; int backoffDelay = (int)Math.pow(consecutiveRetries,2); return observable.delay(backoffDelay,TimeUnit.SECONDS); } }).onBackpressureDrop()) .onBackpressureDrop(); 有什么办法可以做我想做的事吗?我发现了一个相关的问题(目前无法找到它),但采用的方法似乎不适用于动态值. 解决方法
在您的代码中有两个错误:
>为了重复一些可观察的序列,该序列必须是有限的.即而不是间隔你最好使用像just或者fromCallable这样的东西,就像我在下面的例子中所做的那样. 工作代码: public void testRepeat() throws InterruptedException { logger.info("test start"); int DEFAULT_DELAY = 100; // ms int ADDITIONAL_DELAY = 100; // ms AtomicInteger generator = new AtomicInteger(0); AtomicBoolean connectionAlive = new AtomicBoolean(true); // initially alive Disposable subscription = Observable.fromCallable(generator::incrementAndGet) .repeatWhen(counts -> { AtomicInteger retryCounter = new AtomicInteger(0); return counts.flatMap(c -> { int retry = 0; if (connectionAlive.get()) { retryCounter.set(0); // reset counter } else { retry = retryCounter.incrementAndGet(); } int additionalDelay = ADDITIONAL_DELAY * (int) Math.pow(retry,2); logger.info("retry={},additionalDelay={}ms",retry,additionalDelay); return Observable.timer(DEFAULT_DELAY + additionalDelay,TimeUnit.MILLISECONDS); }); }) .subscribe(v -> logger.info("got {}",v)); Thread.sleep(220); logger.info("connection dropped"); connectionAlive.set(false); Thread.sleep(2000); logger.info("connection is back alive"); connectionAlive.set(true); Thread.sleep(2000); subscription.dispose(); logger.info("test complete"); } 查看有关repeatWhen here的详细文章. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |