加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 编程开发 > Java > 正文

rx-java – 如何在不丢失发出的项目的情况下暂停Observable?

发布时间:2020-12-15 04:32:49 所属栏目:Java 来源:网络整理
导读:我有一个Observable每秒发出一个滴答声: Observable.interval(0,1,TimeUnit.SECONDS) .take(durationInSeconds + 1)); 我想暂停这个Observable,以便它停止发出数字,并按需恢复它. 有一些陷阱: 根据Observable Javadoc,区间运算符不支持背压 关于backpressu
我有一个Observable每秒发出一个滴答声:

Observable.interval(0,1,TimeUnit.SECONDS)
    .take(durationInSeconds + 1));

我想暂停这个Observable,以便它停止发出数字,并按需恢复它.

有一些陷阱:

>根据Observable Javadoc,区间运算符不支持背压
>关于backpressure的RxJava wiki有一个关于Callstack阻塞的部分作为背压的流量控制替代方案:

Another way of handling an overproductive Observable is to block the callstack (parking the thread that governs the overproductive Observable). This has the disadvantage of going against the “reactive” and non-blocking model of Rx. However this can be a viable option if the problematic Observable is on a thread that can be blocked safely. Currently RxJava does not expose any operators to facilitate this.

有没有办法暂停间隔Observable?或者我应该用一些背压支持实现我自己的’滴答’Observable?

解决方法

有很多方法可以做到这一点.例如,您仍然可以使用interval()并维护另外两个状态:例如boolean flag“paused”和一个计数器.

public static final Observable<Long> pausableInterval(
  final AtomicBoolean paused,long initial,long interval,TimeUnit unit,Scheduler scheduler) {

  final AtomicLong counter = new AtomicLong();
  return Observable.interval(initial,interval,unit,scheduler)
      .filter(tick -> !paused.get())
      .map(tick -> counter.getAndIncrement()); 
}

然后你只需要在某处调用paused.set(true / false)来暂停/恢复

编辑2016-06-04

上面的解决方案存在一些问题.
如果我们多次重用observable实例,它将从最后一次取消订阅时的值开始.例如:

Observable<Long> o = pausableInterval(...)
List<Long> list1 = o.take(5).toList().toBlocking().single();
List<Long> list2 = o.take(5).toList().toBlocking().single();

虽然list1将是[0,2,3,4],但list2实际上是[5,6,7,8,9].
如果不希望上述行为,则必须使观察者成为无状态.这可以通过scan()运算符来实现.
修订版本可能如下所示:

public static final Observable<Long> pausableInterval(final AtomicBoolean pause,final long initialDelay,final long period,Scheduler scheduler) {

    return Observable.interval(initialDelay,period,scheduler)
        .filter(tick->!pause.get())
        .scan((acc,tick)->acc + 1);
  }

或者,如果您不希望依赖Java 8和lambdas,则可以使用Java 6兼容代码执行此类操作:

https://github.com/ybayk/rxjava-recipes/blob/v0.0.2/src/main/java/yurgis/rxjava/recipes/RxRecipes.java#L343-L361

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读