加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 编程开发 > Java > 正文

rx-java – 错误后重试相同的项目

发布时间:2020-12-15 04:31:10 所属栏目:Java 来源:网络整理
导读:我正在尝试使用rx- java构建一个健壮的处理管道,但我遇到了一个问题. 这是一个例子: public static void main(String[] args) { AtomicInteger div = new AtomicInteger(-1); Observable.just(1,1,1).map(item - 1 / div.getAndIncrement()) .retry().subsc
我正在尝试使用rx- java构建一个健壮的处理管道,但我遇到了一个问题.

这是一个例子:

public static void main(String[] args) {
    AtomicInteger div = new AtomicInteger(-1);
    Observable.just(1,1,1).map(item -> 1 / div.getAndIncrement())
            .retry().subscribe(item -> System.out.println(item));
}

这种情况下的输出是4项,因为非流可观察性被重放,但这不相关,所以为了简单起见,请忽略它.我添加了评论,显示了达到结果的计算和重新订阅的重点:

-1 // 1 / -1
// 1/0 (error) - resubscribes to observable
1 // 1 / 1
0 // 1 / 2
0 // 1 / 3

发生这种情况是因为重试操作符(作为所有重试操作符)在传递错误通知后导致重新订阅.

我的预期输出是:

-1 // 1 / -1
// 1/0 (error) - resubscribe but resume erroneous item (1)
1 // 1 / 1
0 // 1 / 0

当传递错误通知时,重新订阅过程应该包括流中的错误项(在同一项上重试) – 因为错误是外部的并且没有嵌入到已处理的项中(因此重新处理是有意义的).

这是一些外部错误(如数据库连接)的情况,我希望那些未处理的项目会被延迟重新处理.我知道使用标准重试操作符可以重新订阅,但所有这些操作都会放弃错误的项目.

我还考虑将我的所有处理包装在try-catch中,我怀疑错误是可能的,但是我的处理代码中添加了样板代码,我不想这样做.

所以我的问题是:有没有一种标准方法可以重试失败的项目?

我已经考虑过做(未经测试)的事情:

someSubject.flatMap(
    item-> Observable.just(item)
        .doOnError(err -> someSubject.onNext(item))).onErrorX...

并压制错误……

但这似乎不自然,在我的用例中很昂贵(为每个项目创建一个可观察的).

是否有运算符或运算符的组合可以导致重试将错误的项目传递回observable的开头而不会“破坏”或将项目包装在不同的可观察对象中?

这也是我习惯使用async-retry的重试方式.

解决方法

这通常不适用于RxJava.如果元素的处理失败,则没有从该位置恢复的内置方式.您可以做的最好的事情是尝试捕获有问题的函数回调并手动重试.第二个最好的事情是使用flatMap,其中可能有问题的计算是可以单独重试的内部Observable:

source.flatMap(v ->
    Observable.just(v).map(v -> v / counter.getAndIncrement()).retry()
)

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读