scala – 如何并行化groupBy
我正在玩.par,我想知道以下计算是否可以进一步并行化以获得性能增益,或者是否有其他方法可以更快地计算结果.我不认为最终结果取决于分组的顺序,所以我希望还有其他可能的收获.
object Test { val data = (1 to 500000) map { i => (i % 100) -> (i % 10000) } def mutableIndex = { val map = collection.mutable.Map[Int,Set[Int]]().withDefaultValue( Set[Int]()) for ((k,v) <- data) { map(k) = map(k) + v } map } def immutableIndex = data.groupBy(_._1).map{ case (k,seq) => k -> seq.map(_._2).toSet } def immutableParIndex = data.par.groupBy(_._1).map{ case (k,seq) => k -> seq.map(_._2).toSet } def main(args: Array[String]) { def bench(id: String)(block: => Unit) { val times = (new testing.Benchmark { def run() = block }).runBenchmark(10) println(id + " " + times + " sum: " + times.sum) } println("avail procs " + Runtime.getRuntime.availableProcessors) bench("mutable"){ mutableIndex } bench("immutable"){ immutableIndex } bench("immutable par"){ immutableParIndex } } } 运行它打印这个 – 使用2.9.1: $scalac -d classes -optimize A.scala $scala -cp classes Test avail procs 4 mutable List(718,343,296,297,312,312) sum: 3526 immutable List(312,266,265,249,265) sum: 2683 immutable par List(546,234,202,187,172,188,171) sum: 2293 一些说明: >虽然上面的输出非常好,但并行版本也更加不一致,这取决于我在数据中使用的常量以及我在工作台中配置的迭代次数(有时效率低于顺序版本).我想知道是否期望并行收藏. 编辑:这是一个基于并发hashmap的版本,并以groupBy的库代码为模型: def syncIndex = { import collection.mutable.Builder import java.util.concurrent.ConcurrentHashMap import collection.JavaConverters._ val m = new ConcurrentHashMap[Int,Builder[Int,Set[Int]]]().asScala for ((k,v) <- data.par) { val bldr = Set.newBuilder[Int] m.putIfAbsent(k,bldr) match { case Some(bldr) => bldr.synchronized(bldr += v) case None => bldr.synchronized(bldr += v) } } val b = Map.newBuilder[Int,Set[Int]] for ((k,v) <- m) b += ((k,v.result)) b.result } 它看起来可以在2个核心上提供一个很好的加速,但不会在4个核心上. 解决方法
不是你的问题的答案,但我发现.par特别在Hotspot(32位?)客户端上提供了加速,而在Hotspot Server上则没有那么多.我在REPL中运行它,基准测试在后续运行中变得更快,因为它已经预热了.
我在任务管理器和每个任务管理器上观察了处理器的使用情况,从非并行化任务的54%到并行化的75%. Java 7也提供了相当大的速度提升. 欢迎使用Scala版本2.9.0.1(Java HotSpot(TM)客户端VM,Java 1.6.0_22). scala> Test.main(Array[String]()) avail procs 2 mutable List(1303,1086,1058,1132,1071,1068,1035,1037,1036,1032) sum: 10858 immutable List(874,872,869,856,858,857,855,849) sum: 8602 immutable par List(688,502,482,479,480,465,473,471,472) sum: 4985 scala> Test.main(Array[String]()) avail procs 2 mutable List(1015,1025,1090,1026,1011,1021,1014,1017,1015) sum: 10245 immutable List(863,868,867,865,864,883,863,864) sum: 8666 immutable par List(466,468,463,466,469,470,467,478,467) sum: 4680 欢迎使用Scala版本2.9.0.1(Java HotSpot(TM)64位服务器VM,Java 1.6.0_22). scala> Test.main(Array[String]()) avail procs 2 mutable List(841,360,348,338,337,342,336,336) sum: 3914 immutable List(320,303,302,300,304,305,299,299) sum: 3039 immutable par List(521,284,244,232,267,209,219,231,203) sum: 2654 scala> Test.main(Array[String]()) avail procs 2 mutable List(370,393,351,340,334,340) sum: 3491 immutable List(301,301,301) sum: 3021 immutable par List(207,240,201,194,204,197,211,207,208) sum: 2063 scala> Test.main(Array[String]()) avail procs 2 mutable List(334,339,341,340) sum: 3376 immutable List(300,298,296) sum: 3002 immutable par List(194,200,190,192,191,195,196,189) sum: 1950 欢迎使用Scala版本2.9.0.1(Java HotSpot(TM)64位服务器VM,Java 1.7.0). scala> Test.main(Array[String]()) avail procs 2 mutable List(763,258,227,235,238,279,245,243) sum: 2942 immutable List(274,233,228,247,243,229,245) sum: 2405 immutable par List(635,261,217,291,248,184) sum: 2820 scala> Test.main(Array[String]()) avail procs 2 mutable List(229,230,226,232) sum: 2290 immutable List(228,210,210) sum: 2200 immutable par List(173,160,157,158,177,179,164,163,159) sum: 1699 scala> Test.main(Array[String]()) avail procs 2 mutable List(222,218,216,214,215,218) sum: 2172 immutable List(211,212,210) sum: 2116 immutable par List(161,168,156,161,150,175) sum: 1606 (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |