加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

并行Scala流的内存消耗

发布时间:2020-12-16 18:43:27 所属栏目:安全 来源:网络整理
导读:我编写了一个 Scala(2.9.1-1)应用程序,需要处理来自数据库查询的数百万行.我正在使用我的一个 previous questions的答案中显示的技术将ResultSet转换为Stream: class Record(...)val resultSet = statement.executeQuery(...)new Iterator[Record] { def ha
我编写了一个 Scala(2.9.1-1)应用程序,需要处理来自数据库查询的数百万行.我正在使用我的一个 previous questions的答案中显示的技术将ResultSet转换为Stream:

class Record(...)

val resultSet = statement.executeQuery(...)

new Iterator[Record] {
  def hasNext = resultSet.next()
  def next = new Record(resultSet.getString(1),resultSet.getInt(2),...)
}.toStream.foreach { record => ... }

这非常有效.

由于foreach闭包的主体非常占用CPU,并且作为函数式编程实用性的证明,如果我在foreach之前添加一个.par,除了确保闭包的主体是线程安全的(它是以函数样式编写的,除了打印到线程安全日志之外没有可变数据).

但是,我担心内存消耗.是.par导致整个结果集加载到RAM中,还是并行操作只加载与活动线程一样多的行?我已经为JVM(64位和-Xmx4g)分配了4G,但是将来我会在更多的行上运行它,并担心我最终会得到一个内存不足.

是否有更好的模式以功能方式进行这种并行处理?我一直在向同事们展示这个应用程序,作为函数式编程和多核机器价值的一个例子.

解决方法

如果你看一下 scaladoc of Stream,你会发现par的定义类是Parallelizable特性……如果你看一下 source code of this trait,你会注意到它从原始集合中取出每个元素并将它们放入合成器中.因此,您将每行加载到ParSeq中:

def par: ParRepr = {
    val cb = parCombiner
    for (x <- seq) cb += x
    cb.result
  }

  /** The default `par` implementation uses the combiner provided by this method
   *  to create a new parallel collection.
   *
   *  @return  a combiner for the parallel collection of type `ParRepr`
   */
  protected[this] def parCombiner: Combiner[A,ParRepr]

一个可能的解决方案是显式并行化您的计算,这要归功于演员.例如,您可以从akka文档中查看this example,这可能对您的上下文有帮助.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读