加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 编程开发 > Java > 正文

RxJava中的并行性 – 过滤器

发布时间:2020-12-15 04:35:31 所属栏目:Java 来源:网络整理
导读:我有一些非常简单的代码,读了一堆Strings应用过滤器.我希望过滤器在多个线程上运行. IterableString outputs = Observable .from(Files.readLines(new File("E:SAMATestImageNetBullets.txt"),Charset.forName("utf-8"))) .take(20).subscribeOn(Sched
我有一些非常简单的代码,读了一堆Strings&应用过滤器.我希望过滤器在多个线程上运行.

Iterable<String> outputs = Observable
            .from(Files.readLines(new File("E:SAMATestImageNetBullets.txt"),Charset.forName("utf-8")))
            .take(20).subscribeOn(Schedulers.from(threadPoolExecutor)).filter(str -> isURLOK(str))
            .toBlocking().toIterable();

从日志中,似乎Filter方法只在一个线程上运行:

In Thread pool-1-thread-1
In Thread pool-1-thread-1
http://farm2.static.flickr.com/1258/1479683334_3ff920d217.jpg
In Thread pool-1-thread-1
In Thread pool-1-thread-1

我该如何加快速度?

解决方法

RxJava本质上是顺序的.例如,使用map(Func1),Func1本身将与通过父序列的值非并发执行:

Observable.range(1,10).map(v -> v * v).subscribe(System.out::println);

这里,lambda v – > v * v将以顺序方式调用值1到10.

RxJava可以以管道中的阶段(范围 – > map-> subscribe)可以相对于彼此同时/并行发生的方式异步.例如:

Observable.range(1,10)
.subscribeOn(Schedulers.computation())
.map(v -> v * v)                       // (1)
.observeOn(Schedulers.io())
.map(v -> -v)                          // (2)
.toBlocking()
.subscribe(System.out::println);       // (3)

这里,(1)可以与(2)和(3)并行运行,即,while(2)计算av = 3 * 3,(1)可能已经计算v = 5并且(3)打印出-1同一时间.

如果你想同时处理序列的元素,你必须将序列“分出”成子Observables,然后用flatMap连接结果:

Observable.range(1,10)
.flatMap(v -> 
    Observable.just(v)
    .subscribeOn(Schedulers.computation())
    .map(v -> v * v)
)
.toBlocking()
.subscribe(System.out::println);

这里,每个值v将启动一个在后台线程上运行的新Observable,并通过map()进行计算. v = 1可以在线程1上运行,v = 2可以在线程2上运行,v = 3可以在线程1上运行但严格地在v = 1之后运行.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读