并行Scala流的内存消耗
发布时间:2020-12-16 18:43:27 所属栏目:安全 来源:网络整理
导读:我编写了一个 Scala(2.9.1-1)应用程序,需要处理来自数据库查询的数百万行.我正在使用我的一个 previous questions的答案中显示的技术将ResultSet转换为Stream: class Record(...)val resultSet = statement.executeQuery(...)new Iterator[Record] { def ha
我编写了一个
Scala(2.9.1-1)应用程序,需要处理来自数据库查询的数百万行.我正在使用我的一个
previous questions的答案中显示的技术将ResultSet转换为Stream:
class Record(...) val resultSet = statement.executeQuery(...) new Iterator[Record] { def hasNext = resultSet.next() def next = new Record(resultSet.getString(1),resultSet.getInt(2),...) }.toStream.foreach { record => ... } 这非常有效. 由于foreach闭包的主体非常占用CPU,并且作为函数式编程实用性的证明,如果我在foreach之前添加一个.par,除了确保闭包的主体是线程安全的(它是以函数样式编写的,除了打印到线程安全日志之外没有可变数据). 但是,我担心内存消耗.是.par导致整个结果集加载到RAM中,还是并行操作只加载与活动线程一样多的行?我已经为JVM(64位和-Xmx4g)分配了4G,但是将来我会在更多的行上运行它,并担心我最终会得到一个内存不足. 是否有更好的模式以功能方式进行这种并行处理?我一直在向同事们展示这个应用程序,作为函数式编程和多核机器价值的一个例子. 解决方法
如果你看一下
scaladoc of
Stream ,你会发现par的定义类是Parallelizable特性……如果你看一下
source code of this trait,你会注意到它从原始集合中取出每个元素并将它们放入合成器中.因此,您将每行加载到ParSeq中:
def par: ParRepr = { val cb = parCombiner for (x <- seq) cb += x cb.result } /** The default `par` implementation uses the combiner provided by this method * to create a new parallel collection. * * @return a combiner for the parallel collection of type `ParRepr` */ protected[this] def parCombiner: Combiner[A,ParRepr] 一个可能的解决方案是显式并行化您的计算,这要归功于演员.例如,您可以从akka文档中查看this example,这可能对您的上下文有帮助. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |