scala – 大于内存大小的数据的并行收集处理
发布时间:2020-12-16 18:51:31 所属栏目:安全 来源:网络整理
导读:有没有一种简单的方法来使用 scala并行集合而无需将完整集合加载到内存中? 例如,我有一个大型集合,我想在一个小块上并行执行特定操作(折叠),这个块适合内存,而不是另一个块等等,最后重新组合所有块的结果. 我知道,可以使用actor,但是使用par-collections会
有没有一种简单的方法来使用
scala并行集合而无需将完整集合加载到内存中?
例如,我有一个大型集合,我想在一个小块上并行执行特定操作(折叠),这个块适合内存,而不是另一个块等等,最后重新组合所有块的结果. 我知道,可以使用actor,但是使用par-collections会非常好. 我写了一个解决方案,但它并不好: def split[A](list: Iterable[A],chunkSize: Int): Iterable[Iterable[A]] = { new Iterator[Iterable[A]] { var rest = list def hasNext = !rest.isEmpty def next = { val chunk = rest.take(chunkSize) rest = rest.drop(chunkSize) chunk } }.toIterable } def foldPar[A](acc: A)(list: Iterable[A],chunkSize: Int,combine: ((A,A) => A)): A = { val chunks: Iterable[Iterable[A]] = split(list,chunkSize) def combineChunk: ((A,Iterable[A]) => A) = { case (res,entries) => entries.par.fold(res)(combine) } chunks.foldLeft(acc)(combineChunk) } val chunkSize = 10000000 val x = 1 to chunkSize*10 def sum: ((Int,Int) => Int) = {case (acc,n) => acc + n } foldPar(0)(x,chunkSize,sum) 解决方法
你的想法非常简洁,很遗憾没有这样的功能(AFAIK).
我只是将你的想法改为更短的代码.首先,我觉得对于平行折叠来说,使用monoid的概念是有用的 – 它是一个具有关联操作和零元素的结构.关联性很重要,因为我们不知道我们组合并行计算结果的顺序.零元素很重要,因此我们可以将计算分成块并开始从零开始折叠.尽管如此,它并没有什么新鲜事,它只是Scala集合所期望的折叠. // The function defined by Monoid's apply must be associative // and zero its identity element. trait Monoid[A] extends Function2[A,A,A] { val zero: A } 接下来,Scala的迭代器已经有一个有用的方法分组(Int):GroupedIterator [Seq [A]],它将迭代器切割成固定大小的序列.这与你的分裂十分相似.这允许我们将输入切割成固定大小的块,然后在它们上应用Scala的并行收集方法: def parFold[A](c: Iterator[A],blockSize: Int)(implicit monoid: Monoid[A]): A = c.grouped(blockSize).map(_.par.fold(monoid.zero)(monoid)) .fold(monoid.zero)(monoid); 我们使用并行集合框架折叠每个块,然后(没有任何并行化)组合中间结果. 一个例子: // Example: object SumMonoid extends Monoid[Long] { override val zero: Long = 0; override def apply(x: Long,y: Long) = x + y; } val it = Iterator.range(1,10000001).map(_.toLong) println(parFold(it,100000)(SumMonoid)); (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |