加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 编程开发 > Java > 正文

多线程 – RxJava – 如何将Observer设置为阻止

发布时间:2020-12-15 04:44:58 所属栏目:Java 来源:网络整理
导读:我希望我的Observable阻塞直到操作完成,然后继续下一个方法调用等.看看这段代码: import rx.Observable;import rx.android.schedulers.AndroidSchedulers;import rx.functions.Action1;import rx.functions.Func1;Observable observer1 = Observable.just(1
我希望我的Observable阻塞直到操作完成,然后继续下一个方法调用等.看看这段代码:

import rx.Observable;
import rx.android.schedulers.AndroidSchedulers;
import rx.functions.Action1;
import rx.functions.Func1;

Observable observer1 = Observable.just(1,2,3)
        .observeOn(AndroidSchedulers.mainThread());

Observable observer2 = observer1.map(new Func1<Integer,Integer>() {
    @Override
    public Integer call(Integer myint) {
        //multiples each int by 2
        return myint * 2;
    }
});

observer2.observeOn(AndroidSchedulers.mainThread())
        .subscribeOn(AndroidSchedulers.mainThread());

observer2.subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer i) {
        System.out.println("this is the Integer multiplied by two:" + i);
    }
});

System.out.println("I want this statement to come after multiplication completes");

我意识到我可以使用onComplete回调,但这不是我的观点.我试图找出如何阻止观察者,直到它完成,然后继续我的其余代码.此时日志看起来像这样:

I/System.out﹕ I want this statement to come after multiplication completes
I/System.out﹕ this is the Integer multiplied by two:2
I/System.out﹕ this is the Integer multiplied by two:4
I/System.out﹕ this is the Integer multiplied by two:6

还要注意我在MainThread上观察和订阅所有内容,如果我没有指定,这是默认完成的吗?

解决方法

如果要阻塞直到Observable完成使用observable.toBlocking().forEach()而不是subscribe().

observer2
    .toBlocking()
    .forEach(new Action1<Integer>() {
        @Override
        public void call(Integer i) {
            System.out.println("this is the Integer multiplied by two:" + i);
        }
    });

除了forEach()之外,还可以使用多个Blocking Observable Operators来获得所需的效果.例如,如果您只需要发出的第一个项目,那么使用observable.toBlocking().first()

另请注意,RxJava API会为您正在进行的每个调用返回一个新的Observable.因此,以下行对observable2使用的调度程序没有影响.

observer2.observeOn(AndroidSchedulers.mainThread()).subscribeOn(AndroidSchedulers.mainThread());

它确实使用指定的调度程序创建一个新的Observable,但抛出它,因为返回的Observable没有分配给任何变量.您可以执行以下操作.

observer2
    .observeOn(AndroidSchedulers.mainThread())
    .subscribeOn(AndroidSchedulers.mainThread())
    .toBlocking()
    .forEach(new Action1<Integer>() {
        @Override
        public void call(Integer i) {
            System.out.println("this is the Integer multiplied by two:" + i);
        }
    });

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读