RxJava. Observable.delay工作奇怪(最后缺少一些项目)
发布时间:2020-12-15 04:46:56 所属栏目:Java 来源:网络整理
导读:我正在努力了解Rx Java.我的测试代码是: import rx.Observable;import rx.Subscriber;import rx.functions.Action1;import java.util.concurrent.TimeUnit;public class Hello { public static void main(String[] args) { ObservableString observable = O
我正在努力了解Rx
Java.我的测试代码是:
import rx.Observable; import rx.Subscriber; import rx.functions.Action1; import java.util.concurrent.TimeUnit; public class Hello { public static void main(String[] args) { Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { try { Thread.sleep(1000); subscriber.onNext("a"); Thread.sleep(1000); subscriber.onNext("b"); Thread.sleep(1000); subscriber.onNext("c"); Thread.sleep(1000); subscriber.onNext("d"); Thread.sleep(1000); subscriber.onNext("e"); Thread.sleep(1000); subscriber.onNext("f"); Thread.sleep(1000); subscriber.onNext("g"); Thread.sleep(1000); subscriber.onNext("h"); } catch (InterruptedException e) { subscriber.onError(e); } } }); observable .delay(2,TimeUnit.SECONDS) .subscribe(new Action1<String>() { @Override public void call(String string) { System.out.println(string); } }); } } 没有.delay(2,TimeUnit.SECONDS)我有输出:a 怎么可能?文档说延迟只是发出源Observable发出的项目在时间上向前移动指定的延迟 解决方法
您正在使用调度的延迟重载在不同的线程上工作并导致隐式竞争条件.所有时间运算符(如延迟,缓冲区和窗口)需要使用调度程序为以后计划效果,这可能导致意外的竞争条件,如果你不知道它并仔细使用它们.在这种情况下,延迟运算符在单独的线程池上调度下游工作.以下是测试中的执行顺序(在主线程上).
>您的Observable订阅并在onNext(“a”)之前等待1000毫秒 为了让它以异步方式执行,您可以安排trampoline调度程序实现的延迟. .delay(2,TimeUnit.SECONDS,Schedulers.trampoline()) (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |