加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 编程开发 > Java > 正文

在Java中使用无限流的并行处理

发布时间:2020-12-14 16:43:32 所属栏目:Java 来源:网络整理
导读:为什么下面的代码不打印任何输出,而如果我们删除并行,它打印0,1? IntStream.iterate(0,i - ( i + 1 ) % 2) .parallel() .distinct() .limit(10) .forEach(System.out::println); 虽然我知道理想的限制应该在不同之前放置,但是我的问题与添加并行处理造成的
为什么下面的代码不打印任何输出,而如果我们删除并行,它打印0,1?
IntStream.iterate(0,i -> ( i + 1 ) % 2)
         .parallel()
         .distinct()
         .limit(10)
         .forEach(System.out::println);

虽然我知道理想的限制应该在不同之前放置,但是我的问题与添加并行处理造成的差异更为相关.

解决方法

真正的原因是有序的并行.distinct()是文档中的 described的完整屏障操作:

Preserving stability for distinct() in parallel pipelines is relatively expensive (requires that the operation act as a full barrier,with substantial buffering overhead),and stability is often not needed.

“完全屏障操作”是指所有上游操作必须在下游启动之前执行. Stream API中只有两个完整的屏障操作:.sorted()(每次)和.distinct()(在有序的并行情况下).由于您提供给.distinct()的非短路无限流,最终会产生无限循环.通过合约.distinct()不能以任何顺序向下游发送元素:它应该始终发出第一个重复元素.虽然在理论上可以更好地实现并行有序的.distinct(),但它将是更复杂的实现.

对于解决方案,@ user140547是正确的:在.distinct()之前添加.unordered()这个将distinct()算法转换为无序的()只使用共享ConcurrentHashMap来存储所有观察到的元素并将每个新元素发送到下游).请注意,在.distinct()之后添加.unordered()将不会有帮助.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读