scala – Spark:当迭代太大时,PageRank示例抛出stackoverflowEr
发布时间:2020-12-16 08:55:45 所属栏目:安全 来源:网络整理
导读:我测试了spark默认的PageRank示例并将迭代设置为1024,然后它抛出stackoverflower.我也在我的其他程序中遇到了这个问题.我怎么能解决它. object SparkPageRank { def main(args: Array[String]) { if (args.length 3) { System.err.println("Usage: PageRank
我测试了spark默认的PageRank示例并将迭代设置为1024,然后它抛出stackoverflower.我也在我的其他程序中遇到了这个问题.我怎么能解决它.
object SparkPageRank { def main(args: Array[String]) { if (args.length < 3) { System.err.println("Usage: PageRank <master> <file> <number_of_iterations>") System.exit(1) } var iters = args(2).toInt val ctx = new SparkContext(args(0),"PageRank",System.getenv("SPARK_HOME"),SparkContext.jarOfClass(this.getClass)) val lines = ctx.textFile(args(1),1) val links = lines.map{ s => val parts = s.split("s+") (parts(0),parts(1)) }.distinct().groupByKey().cache() var ranks = links.mapValues(v => 1.0) for (i <- 1 to iters) { val contribs = links.join(ranks).values.flatMap{ case (urls,rank) => val size = urls.size urls.map(url => (url,rank / size)) } ranks = contribs.reduceByKey(_ + _).mapValues(0.15 + 0.85 * _) } val output = ranks.collect() output.foreach(tup => println(tup._1 + " has rank: " + tup._2 + ".")) System.exit(0) } } 我在这里发布错误. [spark-akka.actor.default-dispatcher-15] ERROR LocalActorRefProvider(akka://spark) - guardian failed,shutting down system java.lang.StackOverflowError at scala.collection.mutable.FlatHashTable$class.containsEntry(FlatHashTable.scala:119) at scala.collection.mutable.HashSet.containsEntry(HashSet.scala:41) at scala.collection.mutable.HashSet.contains(HashSet.scala:58) at scala.collection.GenSetLike$class.apply(GenSetLike.scala:43) at scala.collection.mutable.AbstractSet.apply(Set.scala:45) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$visit$1(DAGScheduler.scala:312) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$visit$1$1.apply(DAGScheduler.scala:321) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$visit$1$1.apply(DAGScheduler.scala:316) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$visit$1(DAGScheduler.scala:316) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$visit$1$1.apply(DAGScheduler.scala:321) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$visit$1$1.apply(DAGScheduler.scala:316) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$visit$1(DAGScheduler.scala:316) at org.apache.spark.scheduler.DAGScheduler.getParentStages(DAGScheduler.scala:326) 解决方法
这是因为for循环中的这些转换在你的rdd中产生了很长的依赖性.当您尝试运行spark作业时,递归访问您的rdd将导致stackoverflow错误.
要解决此问题,可以在rdd上使用checkpoint(). cache()无法帮助您立即评估您的rdd. 因此,您应该在某些迭代后调用中间rdd上的cache()和checkpoint()并手动评估它以清除其依赖关系. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |