RxJava中的并行性 – 过滤器
发布时间:2020-12-15 04:35:31 所属栏目:Java 来源:网络整理
导读:我有一些非常简单的代码,读了一堆Strings应用过滤器.我希望过滤器在多个线程上运行. IterableString outputs = Observable .from(Files.readLines(new File("E:SAMATestImageNetBullets.txt"),Charset.forName("utf-8"))) .take(20).subscribeOn(Sched
我有一些非常简单的代码,读了一堆Strings&应用过滤器.我希望过滤器在多个线程上运行.
Iterable<String> outputs = Observable .from(Files.readLines(new File("E:SAMATestImageNetBullets.txt"),Charset.forName("utf-8"))) .take(20).subscribeOn(Schedulers.from(threadPoolExecutor)).filter(str -> isURLOK(str)) .toBlocking().toIterable(); 从日志中,似乎Filter方法只在一个线程上运行: In Thread pool-1-thread-1 In Thread pool-1-thread-1 http://farm2.static.flickr.com/1258/1479683334_3ff920d217.jpg In Thread pool-1-thread-1 In Thread pool-1-thread-1 我该如何加快速度? 解决方法
RxJava本质上是顺序的.例如,使用map(Func1),Func1本身将与通过父序列的值非并发执行:
Observable.range(1,10).map(v -> v * v).subscribe(System.out::println); 这里,lambda v – > v * v将以顺序方式调用值1到10. RxJava可以以管道中的阶段(范围 – > map-> subscribe)可以相对于彼此同时/并行发生的方式异步.例如: Observable.range(1,10) .subscribeOn(Schedulers.computation()) .map(v -> v * v) // (1) .observeOn(Schedulers.io()) .map(v -> -v) // (2) .toBlocking() .subscribe(System.out::println); // (3) 这里,(1)可以与(2)和(3)并行运行,即,while(2)计算av = 3 * 3,(1)可能已经计算v = 5并且(3)打印出-1同一时间. 如果你想同时处理序列的元素,你必须将序列“分出”成子Observables,然后用flatMap连接结果: Observable.range(1,10) .flatMap(v -> Observable.just(v) .subscribeOn(Schedulers.computation()) .map(v -> v * v) ) .toBlocking() .subscribe(System.out::println); 这里,每个值v将启动一个在后台线程上运行的新Observable,并通过map()进行计算. v = 1可以在线程1上运行,v = 2可以在线程2上运行,v = 3可以在线程1上运行但严格地在v = 1之后运行. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |