加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 由于长RDD谱系的Stackoverflow

发布时间:2020-12-16 09:13:19 所属栏目:安全 来源:网络整理
导读:HDFS中有数千个小文件.需要处理略小的文件子集(再次以数千为单位),fileList包含需要处理的文件路径列表. // fileList == list of filepaths in HDFSvar masterRDD: org.apache.spark.rdd.RDD[(String,String)] = sparkContext.emptyRDDfor (i - 0 to fileLis
HDFS中有数千个小文件.需要处理略小的文件子集(再次以数千为单位),fileList包含需要处理的文件路径列表.

// fileList == list of filepaths in HDFS

var masterRDD: org.apache.spark.rdd.RDD[(String,String)] = sparkContext.emptyRDD

for (i <- 0 to fileList.size() - 1) {

val filePath = fileStatus.get(i)
val fileRDD = sparkContext.textFile(filePath)
val sampleRDD = fileRDD.filter(line => line.startsWith("#####")).map(line => (filePath,line)) 

masterRDD = masterRDD.union(sampleRDD)

}

masterRDD.first()

//一旦出来,执行任何操作会导致由于长时间的RDD而导致的stackoverflow错误

Exception in thread "main" java.lang.StackOverflowError
    at scala.runtime.AbstractFunction1.<init>(AbstractFunction1.scala:12)
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.<init>(UnionRDD.scala:66)
    at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.AbstractTraversable.map(Traversable.scala:105)
    at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.AbstractTraversable.map(Traversable.scala:105)
    at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
    =====================================================================
    =====================================================================
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)

解决方法

一般来说,您可以使用检查点来打破长期谱系.一些或多或少的相似之处应该是:

import org.apache.spark.rdd.RDD
import scala.reflect.ClassTag

val checkpointInterval: Int = ???

def loadAndFilter(path: String) = sc.textFile(path)
  .filter(_.startsWith("#####"))
  .map((path,_))

def mergeWithLocalCheckpoint[T: ClassTag](interval: Int)
  (acc: RDD[T],xi: (RDD[T],Int)) = {
    if(xi._2 % interval == 0 & xi._2 > 0) xi._1.union(acc).localCheckpoint
    else xi._1.union(acc)
  }

val zero: RDD[(String,String)] = sc.emptyRDD[(String,String)]
fileList.map(loadAndFilter).zipWithIndex
  .foldLeft(zero)(mergeWithLocalCheckpoint(checkpointInterval))

在这种特殊情况下,应该使用SparkContext.union方法一个简单得多的解决方案:

val masterRDD = sc.union(
  fileList.map(path => sc.textFile(path)
    .filter(_.startsWith("#####"))
    .map((path,_))) 
)

当您查看由loop / reduce生成的DAG时,这些方法之间的区别应该是显而易见的:

enter image description here

和一个联合:

enter image description here

当然,如果文件很小,可以将flatTextFiles与flatMap结合起来,一次读取所有文件:

sc.wholeTextFiles(fileList.mkString(","))
  .flatMap{case (path,text) =>  
    text.split("n").filter(_.startsWith("#####")).map((path,_))}

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读