rx-java – groupBy运算符,来自不同组的项交错
以下代码:
Observable .just(0,1,2,3,4,5,6,7,8,9) .doOnNext(item -> System.out.println("source emitting " + item)) .groupBy(item -> { System.out.println("groupBy called for " + item); return item % 3; }) .subscribe(observable -> { System.out.println("got observable " + observable + " for key " + observable.getKey()); observable.subscribe(item -> { System.out.println("key " + observable.getKey() + ",item " + item); }); }); 让我感到困惑.我得到的输出是: source emitting 0 groupBy called for 0 got observable rx.observables.GroupedObservable@42110406 for key 0 key 0,item 0 source emitting 1 groupBy called for 1 got observable rx.observables.GroupedObservable@1698c449 for key 1 key 1,item 1 source emitting 2 groupBy called for 2 got observable rx.observables.GroupedObservable@5ef04b5 for key 2 key 2,item 2 source emitting 3 groupBy called for 3 key 0,item 3 source emitting 4 groupBy called for 4 key 1,item 4 source emitting 5 groupBy called for 5 key 2,item 5 source emitting 6 groupBy called for 6 key 0,item 6 source emitting 7 groupBy called for 7 key 1,item 7 source emitting 8 groupBy called for 8 key 2,item 8 source emitting 9 groupBy called for 9 key 0,item 9 因此,在顶级订阅方法中,我按预期从GroupedObservable获得3个可观察对象.然后,我一个接一个地订阅了分组的observable – 这里我不明白的事情: 为什么原始项目仍以原始序列(即0,3 ……)发出,而不是0,9 ……用于键0,后面是1,7为键1,键2的后面是2,8; 我想我明白了如何创建组: 1. 0 is emitted,the key function is called and it gets 0 2. it is checked if an observable for 0 exists,it doesn't,so a new one is created and emitted,and then it emits 0 3. the same happens for source items 1 and 2 as they both create new groups,and observables with key 1 and 2 are emitted,and they emit 1 and 2 correspondingly 4. source item 3 is emitted,the key function is called and it gets 0 5. it is checked if an observable for 0 exists,it does -> no new grouped observable is created nor emitted,but 3 is emitted by the already existing observable 6. etc. until the source sequence is drained 似乎虽然我逐个获得了分组的可观测量,但它们的排放在某种程度上是交错的.这是怎么发生的? 解决方法
你已经回答了自己的问题.您按照它们发出的顺序对项目流进行操作.因此,每一个都被发射出来,它会从运算符链中传递下来,你会看到你在这里显示的输出. 您期望的替代输出需要链等待,直到源停止为所有组发出项目.假设你有Observable.just(0,0).那么你期望(0,0),(1,4),(2)作为你的输出组.如果你拥有4个无限的流量怎么办?您的订户永远不会从第一组收到0,3 .. 您可以创建您正在寻找的行为. toList运算符将缓存输出直到源完成,然后传递List< R>.给订户: .subscribe(observable -> { System.out.println("got observable " + observable + " for key " + observable.getKey()); observable.toList().subscribe(items -> { // items is a List<Integer> System.out.println("key " + observable.getKey() + ",items " + items); }); }); (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |