加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 编程开发 > Java > 正文

RxJava. Observable.delay工作奇怪(最后缺少一些项目)

发布时间:2020-12-15 04:46:56 所属栏目:Java 来源:网络整理
导读:我正在努力了解Rx Java.我的测试代码是: import rx.Observable;import rx.Subscriber;import rx.functions.Action1;import java.util.concurrent.TimeUnit;public class Hello { public static void main(String[] args) { ObservableString observable = O
我正在努力了解Rx Java.我的测试代码是:

import rx.Observable;
import rx.Subscriber;
import rx.functions.Action1;

import java.util.concurrent.TimeUnit;

public class Hello {
    public static void main(String[] args) {

            Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                try {
                    Thread.sleep(1000);
                    subscriber.onNext("a");
                    Thread.sleep(1000);
                    subscriber.onNext("b");
                    Thread.sleep(1000);
                    subscriber.onNext("c");
                    Thread.sleep(1000);
                    subscriber.onNext("d");
                    Thread.sleep(1000);
                    subscriber.onNext("e");
                    Thread.sleep(1000);
                    subscriber.onNext("f");
                    Thread.sleep(1000);
                    subscriber.onNext("g");
                    Thread.sleep(1000);
                    subscriber.onNext("h");
                } catch (InterruptedException e) {
                    subscriber.onError(e);
                }
            }
        });

        observable
                .delay(2,TimeUnit.SECONDS)
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String string) {
                        System.out.println(string);
                    }
                });
    }
}

没有.delay(2,TimeUnit.SECONDS)我有输出:a
b
C
d
?
F
G
H
但是使用.delay(2,TimeUnit.SECONDS)
输出缺少“g”和“h”:
一个
b
C
d
?
F

怎么可能?文档说延迟只是发出源Observable发出的项目在时间上向前移动指定的延迟

解决方法

您正在使用调度的延迟重载在不同的线程上工作并导致隐式竞争条件.所有时间运算符(如延迟,缓冲区和窗口)需要使用调度程序为以后计划效果,这可能导致意外的竞争条件,如果你不知道它并仔细使用它们.在这种情况下,延迟运算符在单独的线程池上调度下游工作.以下是测试中的执行顺序(在主线程上).

>您的Observable订阅并在onNext(“a”)之前等待1000毫秒
>接下来它被延迟收到了.这将调度下游onNext 2秒钟.
>控制流程立即返回到您的观察点,等待1000毫秒.
>可观察onNext(“b”)延迟.延迟将“b”的onNext计划为2秒钟.
> ….(重复)
>当您的observable调用onNext(“h”)时,它会调度工作,然后立即从subscribe返回并终止您的测试(导致计划的工作消失).

为了让它以异步方式执行,您可以安排trampoline调度程序实现的延迟.

.delay(2,TimeUnit.SECONDS,Schedulers.trampoline())

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读