rx-java – 如何在不丢失发出的项目的情况下暂停Observable?
我有一个Observable每秒发出一个滴答声:
Observable.interval(0,1,TimeUnit.SECONDS) .take(durationInSeconds + 1)); 我想暂停这个Observable,以便它停止发出数字,并按需恢复它. 有一些陷阱: >根据Observable Javadoc,区间运算符不支持背压
有没有办法暂停间隔Observable?或者我应该用一些背压支持实现我自己的’滴答’Observable? 解决方法
有很多方法可以做到这一点.例如,您仍然可以使用interval()并维护另外两个状态:例如boolean flag“paused”和一个计数器.
public static final Observable<Long> pausableInterval( final AtomicBoolean paused,long initial,long interval,TimeUnit unit,Scheduler scheduler) { final AtomicLong counter = new AtomicLong(); return Observable.interval(initial,interval,unit,scheduler) .filter(tick -> !paused.get()) .map(tick -> counter.getAndIncrement()); } 然后你只需要在某处调用paused.set(true / false)来暂停/恢复 编辑2016-06-04 上面的解决方案存在一些问题. Observable<Long> o = pausableInterval(...) List<Long> list1 = o.take(5).toList().toBlocking().single(); List<Long> list2 = o.take(5).toList().toBlocking().single(); 虽然list1将是[0,2,3,4],但list2实际上是[5,6,7,8,9]. public static final Observable<Long> pausableInterval(final AtomicBoolean pause,final long initialDelay,final long period,Scheduler scheduler) { return Observable.interval(initialDelay,period,scheduler) .filter(tick->!pause.get()) .scan((acc,tick)->acc + 1); } 或者,如果您不希望依赖Java 8和lambdas,则可以使用Java 6兼容代码执行此类操作: https://github.com/ybayk/rxjava-recipes/blob/v0.0.2/src/main/java/yurgis/rxjava/recipes/RxRecipes.java#L343-L361 (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |