scala – 由于长RDD谱系的Stackoverflow
发布时间:2020-12-16 09:13:19 所属栏目:安全 来源:网络整理
导读:HDFS中有数千个小文件.需要处理略小的文件子集(再次以数千为单位),fileList包含需要处理的文件路径列表. // fileList == list of filepaths in HDFSvar masterRDD: org.apache.spark.rdd.RDD[(String,String)] = sparkContext.emptyRDDfor (i - 0 to fileLis
HDFS中有数千个小文件.需要处理略小的文件子集(再次以数千为单位),fileList包含需要处理的文件路径列表.
// fileList == list of filepaths in HDFS var masterRDD: org.apache.spark.rdd.RDD[(String,String)] = sparkContext.emptyRDD for (i <- 0 to fileList.size() - 1) { val filePath = fileStatus.get(i) val fileRDD = sparkContext.textFile(filePath) val sampleRDD = fileRDD.filter(line => line.startsWith("#####")).map(line => (filePath,line)) masterRDD = masterRDD.union(sampleRDD) } masterRDD.first() //一旦出来,执行任何操作会导致由于长时间的RDD而导致的stackoverflow错误 Exception in thread "main" java.lang.StackOverflowError at scala.runtime.AbstractFunction1.<init>(AbstractFunction1.scala:12) at org.apache.spark.rdd.UnionRDD$$anonfun$1.<init>(UnionRDD.scala:66) at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) ===================================================================== ===================================================================== at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 解决方法
一般来说,您可以使用检查点来打破长期谱系.一些或多或少的相似之处应该是:
import org.apache.spark.rdd.RDD import scala.reflect.ClassTag val checkpointInterval: Int = ??? def loadAndFilter(path: String) = sc.textFile(path) .filter(_.startsWith("#####")) .map((path,_)) def mergeWithLocalCheckpoint[T: ClassTag](interval: Int) (acc: RDD[T],xi: (RDD[T],Int)) = { if(xi._2 % interval == 0 & xi._2 > 0) xi._1.union(acc).localCheckpoint else xi._1.union(acc) } val zero: RDD[(String,String)] = sc.emptyRDD[(String,String)] fileList.map(loadAndFilter).zipWithIndex .foldLeft(zero)(mergeWithLocalCheckpoint(checkpointInterval)) 在这种特殊情况下,应该使用SparkContext.union方法一个简单得多的解决方案: val masterRDD = sc.union( fileList.map(path => sc.textFile(path) .filter(_.startsWith("#####")) .map((path,_))) ) 当您查看由loop / reduce生成的DAG时,这些方法之间的区别应该是显而易见的: 和一个联合: 当然,如果文件很小,可以将flatTextFiles与flatMap结合起来,一次读取所有文件: sc.wholeTextFiles(fileList.mkString(",")) .flatMap{case (path,text) => text.split("n").filter(_.startsWith("#####")).map((path,_))} (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |