加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 如何并行化groupBy

发布时间:2020-12-16 18:32:33 所属栏目:安全 来源:网络整理
导读:我正在玩.par,我想知道以下计算是否可以进一步并行化以获得性能增益,或者是否有其他方法可以更快地计算结果.我不认为最终结果取决于分组的顺序,所以我希望还有其他可能的收获. object Test { val data = (1 to 500000) map { i = (i % 100) - (i % 10000) }
我正在玩.par,我想知道以下计算是否可以进一步并行化以获得性能增益,或者是否有其他方法可以更快地计算结果.我不认为最终结果取决于分组的顺序,所以我希望还有其他可能的收获.

object Test {
  val data = (1 to 500000) map { i => (i % 100) -> (i % 10000) }

  def mutableIndex = {
    val map = collection.mutable.Map[Int,Set[Int]]().withDefaultValue(
      Set[Int]())
    for ((k,v) <- data) { map(k) = map(k) + v }
    map
  }

  def immutableIndex = data.groupBy(_._1).map{ case (k,seq) =>
    k -> seq.map(_._2).toSet
  }

  def immutableParIndex = data.par.groupBy(_._1).map{ case (k,seq) =>
    k -> seq.map(_._2).toSet
  }

  def main(args: Array[String]) {
    def bench(id: String)(block: => Unit) {
      val times = (new testing.Benchmark { def run() = block }).runBenchmark(10)
      println(id + " " + times + " sum: " + times.sum)
    }
    println("avail procs " + Runtime.getRuntime.availableProcessors)
    bench("mutable"){ mutableIndex }
    bench("immutable"){ immutableIndex }
    bench("immutable par"){ immutableParIndex }
  }

}

运行它打印这个 – 使用2.9.1:

$scalac -d classes -optimize A.scala
$scala -cp classes Test
avail procs 4
mutable List(718,343,296,297,312,312) sum: 3526
immutable List(312,266,265,249,265) sum: 2683
immutable par List(546,234,202,187,172,188,171) sum: 2293

一些说明:

>虽然上面的输出非常好,但并行版本也更加不一致,这取决于我在数据中使用的常量以及我在工作台中配置的迭代次数(有时效率低于顺序版本).我想知道是否期望并行收藏.
>随着集合变小(通过减少数据中的最后一个模数),mutable变得更快
>如果我的基准测试存在缺陷,请告诉我如何修复它(例如,我对所有迭代使用相同的数据,不确定是否会导致结果偏差)

编辑:这是一个基于并发hashmap的版本,并以groupBy的库代码为模型:

def syncIndex = {
  import collection.mutable.Builder
  import java.util.concurrent.ConcurrentHashMap
  import collection.JavaConverters._
  val m = new ConcurrentHashMap[Int,Builder[Int,Set[Int]]]().asScala
  for ((k,v) <- data.par) {
    val bldr = Set.newBuilder[Int]
    m.putIfAbsent(k,bldr) match {
      case Some(bldr) => bldr.synchronized(bldr += v)
      case None => bldr.synchronized(bldr += v)
    }
  }
  val b = Map.newBuilder[Int,Set[Int]]
  for ((k,v) <- m)
    b += ((k,v.result))
  b.result
}

它看起来可以在2个核心上提供一个很好的加速,但不会在4个核心上.

解决方法

不是你的问题的答案,但我发现.par特别在Hotspot(32位?)客户端上提供了加速,而在Hotspot Server上则没有那么多.我在REPL中运行它,基准测试在后续运行中变得更快,因为它已经预热了.

我在任务管理器和每个任务管理器上观察了处理器的使用情况,从非并行化任务的54%到并行化的75%.

Java 7也提供了相当大的速度提升.

欢迎使用Scala版本2.9.0.1(Java HotSpot(TM)客户端VM,Java 1.6.0_22).

scala> Test.main(Array[String]())
avail procs 2
mutable List(1303,1086,1058,1132,1071,1068,1035,1037,1036,1032) sum: 10858
immutable List(874,872,869,856,858,857,855,849) sum: 8602
immutable par List(688,502,482,479,480,465,473,471,472) sum: 4985

scala> Test.main(Array[String]())
avail procs 2
mutable List(1015,1025,1090,1026,1011,1021,1014,1017,1015) sum: 10245
immutable List(863,868,867,865,864,883,863,864) sum: 8666
immutable par List(466,468,463,466,469,470,467,478,467) sum: 4680

欢迎使用Scala版本2.9.0.1(Java HotSpot(TM)64位服务器VM,Java 1.6.0_22).

scala> Test.main(Array[String]())
avail procs 2
mutable List(841,360,348,338,337,342,336,336) sum: 3914
immutable List(320,303,302,300,304,305,299,299) sum: 3039
immutable par List(521,284,244,232,267,209,219,231,203) sum: 2654

scala> Test.main(Array[String]())
avail procs 2
mutable List(370,393,351,340,334,340) sum: 3491
immutable List(301,301,301) sum: 3021
immutable par List(207,240,201,194,204,197,211,207,208) sum: 2063

scala> Test.main(Array[String]())
avail procs 2
mutable List(334,339,341,340) sum: 3376
immutable List(300,298,296) sum: 3002
immutable par List(194,200,190,192,191,195,196,189) sum: 1950

欢迎使用Scala版本2.9.0.1(Java HotSpot(TM)64位服务器VM,Java 1.7.0).

scala> Test.main(Array[String]())
avail procs 2
mutable List(763,258,227,235,238,279,245,243) sum: 2942
immutable List(274,233,228,247,243,229,245) sum: 2405
immutable par List(635,261,217,291,248,184) sum: 2820

scala> Test.main(Array[String]())
avail procs 2
mutable List(229,230,226,232) sum: 2290
immutable List(228,210,210) sum: 2200
immutable par List(173,160,157,158,177,179,164,163,159) sum: 1699

scala> Test.main(Array[String]())
avail procs 2
mutable List(222,218,216,214,215,218) sum: 2172
immutable List(211,212,210) sum: 2116
immutable par List(161,168,156,161,150,175) sum: 1606

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读