rx-java – 错误后重试相同的项目
我正在尝试使用rx-
java构建一个健壮的处理管道,但我遇到了一个问题.
这是一个例子: public static void main(String[] args) { AtomicInteger div = new AtomicInteger(-1); Observable.just(1,1,1).map(item -> 1 / div.getAndIncrement()) .retry().subscribe(item -> System.out.println(item)); } 这种情况下的输出是4项,因为非流可观察性被重放,但这不相关,所以为了简单起见,请忽略它.我添加了评论,显示了达到结果的计算和重新订阅的重点: -1 // 1 / -1 // 1/0 (error) - resubscribes to observable 1 // 1 / 1 0 // 1 / 2 0 // 1 / 3 发生这种情况是因为重试操作符(作为所有重试操作符)在传递错误通知后导致重新订阅. 我的预期输出是: -1 // 1 / -1 // 1/0 (error) - resubscribe but resume erroneous item (1) 1 // 1 / 1 0 // 1 / 0 当传递错误通知时,重新订阅过程应该包括流中的错误项(在同一项上重试) – 因为错误是外部的并且没有嵌入到已处理的项中(因此重新处理是有意义的). 这是一些外部错误(如数据库连接)的情况,我希望那些未处理的项目会被延迟重新处理.我知道使用标准重试操作符可以重新订阅,但所有这些操作都会放弃错误的项目. 我还考虑将我的所有处理包装在try-catch中,我怀疑错误是可能的,但是我的处理代码中添加了样板代码,我不想这样做. 所以我的问题是:有没有一种标准方法可以重试失败的项目? 我已经考虑过做(未经测试)的事情: someSubject.flatMap( item-> Observable.just(item) .doOnError(err -> someSubject.onNext(item))).onErrorX... 并压制错误…… 但这似乎不自然,在我的用例中很昂贵(为每个项目创建一个可观察的). 是否有运算符或运算符的组合可以导致重试将错误的项目传递回observable的开头而不会“破坏”或将项目包装在不同的可观察对象中? 这也是我习惯使用async-retry的重试方式. 解决方法
这通常不适用于RxJava.如果元素的处理失败,则没有从该位置恢复的内置方式.您可以做的最好的事情是尝试捕获有问题的函数回调并手动重试.第二个最好的事情是使用flatMap,其中可能有问题的计算是可以单独重试的内部Observable:
source.flatMap(v -> Observable.just(v).map(v -> v / counter.getAndIncrement()).retry() ) (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |