scala – 在大型数据集上运行spark时“sparkContext已关闭”
发布时间:2020-12-16 19:17:18 所属栏目:安全 来源:网络整理
导读:当在群集上运行sparkJob超过某个数据大小(~2,5GB)时,我得到“因为SparkContext被关闭而取消了作业”或“执行者丢失”.看着纱桂,我看到被杀的工作是成功的.运行500mb的数据时没有问题.我一直在寻找解决方案,并发现: ?? – “似乎纱线杀死了一些执行者,因为他
当在群集上运行sparkJob超过某个数据大小(~2,5GB)时,我得到“因为SparkContext被关闭而取消了作业”或“执行者丢失”.看着纱桂,我看到被杀的工作是成功的.运行500mb的数据时没有问题.我一直在寻找解决方案,并发现:
?? – “似乎纱线杀死了一些执行者,因为他们要求的内存超出预期.” 有什么建议怎么调试呢? 命令我提交我的spark作业: /opt/spark-1.5.0-bin-hadoop2.4/bin/spark-submit --driver-memory 22g --driver-cores 4 --num-executors 15 --executor-memory 6g --executor-cores 6 --class sparkTesting.Runner --master yarn-client myJar.jar jarArguments 和sparkContext设置 val sparkConf = (new SparkConf() .set("spark.driver.maxResultSize","21g") .set("spark.akka.frameSize","2011") .set("spark.eventLog.enabled","true") .set("spark.eventLog.enabled","true") .set("spark.eventLog.dir",configVar.sparkLogDir) ) 失败的简化代码看起来像那样 val hc = new org.apache.spark.sql.hive.HiveContext(sc) val broadcastParser = sc.broadcast(new Parser()) val featuresRdd = hc.sql("select "+ configVar.columnName + " from " + configVar.Table +" ORDER BY RAND() LIMIT " + configVar.Articles) val myRdd : org.apache.spark.rdd.RDD[String] = featuresRdd.map(doSomething(_,broadcastParser)) val allWords= featuresRdd .flatMap(line => line.split(" ")) .count val wordQuantiles= featuresRdd .flatMap(line => line.split(" ")) .map(word => (word,1)) .reduceByKey(_ + _) .map(pair => (pair._2,pair._2)) .reduceByKey(_+_) .sortBy(_._1) .collect .scanLeft((0,0.0)) ( (res,add) => (add._1,res._2+add._2) ) .map(entry => (entry._1,entry._2/allWords)) val dictionary = featuresRdd .flatMap(line => line.split(" ")) .map(word => (word,1)) .reduceByKey(_ + _) // here I have Rdd of word,count tuples .filter(_._2 >= moreThan) .filter(_._2 <= lessThan) .filter(_._1.trim!=("")) .map(_._1) .zipWithIndex .collect .toMap 和错误堆栈 Exception in thread "main" org.apache.spark.SparkException: Job cancelled because SparkContext was shut down at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:703) at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:702) at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:702) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1511) at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84) at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1435) at org.apache.spark.SparkContext$$anonfun$stop$7.apply$mcV$sp(SparkContext.scala:1715) at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1185) at org.apache.spark.SparkContext.stop(SparkContext.scala:1714) at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$MonitorThread.run(YarnClientSchedulerBackend.scala:146) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1813) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1826) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1839) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1910) at org.apache.spark.rdd.RDD.count(RDD.scala:1121) at sparkTesting.InputGenerationAndDictionaryComputations$.createDictionary(InputGenerationAndDictionaryComputations.scala:50) at sparkTesting.Runner$.main(Runner.scala:133) at sparkTesting.Runner.main(Runner.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 解决方法
找到了答案.
我的表被保存为20gb的avro文件.执行人试图打开它时.他们每个人都必须将20gb加载到内存中.通过使用csv而不是avro来解决它 (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |