scala – 如何在Spark中强制DataFrame评估
发布时间:2020-12-16 09:04:15 所属栏目:安全 来源:网络整理
导读:有时(例如,测试和bechmarking)我想强制执行在DataFrame上定义的转换.调用像count这样的动作的AFAIK并不能确保实际计算所有列,show只能计算所有行的子集(参见下面的示例) 我的解决方案是使用df.write.saveAsTable将DataFrame写入HDFS,但这会使我的系统“混乱
有时(例如,测试和bechmarking)我想强制执行在DataFrame上定义的转换.调用像count这样的动作的AFAIK并不能确保实际计算所有列,show只能计算所有行的子集(参见下面的示例)
我的解决方案是使用df.write.saveAsTable将DataFrame写入HDFS,但这会使我的系统“混乱”我不希望继续使用的表. 那么触发DataFrame评估的最佳方法是什么? 编辑: 请注意,最近还有关于spark开发人员列表的讨论:http://apache-spark-developers-list.1001551.n3.nabble.com/Will-count-always-trigger-an-evaluation-of-each-row-td21018.html 我做了一个小例子,显示DataFrame上的计数不会评估所有内容(使用Spark 1.6.3和spark-master = local [2]进行测试): val df = sc.parallelize(Seq(1)).toDF("id") val myUDF = udf((i:Int) => {throw new RuntimeException;i}) df.withColumn("test",myUDF($"id")).count // runs fine df.withColumn("test",myUDF($"id")).show() // gives Exception 使用相同的逻辑,这里显示的示例不评估所有行: val df = sc.parallelize(1 to 10).toDF("id") val myUDF = udf((i:Int) => {if(i==10) throw new RuntimeException;i}) df.withColumn("test",myUDF($"id")).show(5) // runs fine df.withColumn("test",myUDF($"id")).show(10) // gives Exception 编辑2:对于Eliasah:例外情况说: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 1 times,most recent failure: Lost task 0.0 in stage 6.0 (TID 6,localhost): java.lang.RuntimeException at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply$mcII$sp(<console>:68) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:68) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:68) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51) at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) . . . . Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:212) at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:165) at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174) at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1500) at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1500) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56) at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2087) at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$execute$1(DataFrame.scala:1499) at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$collect(DataFrame.scala:1506) at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1376) at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1375) at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2100) at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1375) at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1457) at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:170) at org.apache.spark.sql.DataFrame.show(DataFrame.scala:350) at org.apache.spark.sql.DataFrame.show(DataFrame.scala:311) at org.apache.spark.sql.DataFrame.show(DataFrame.scala:319) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:74) . . . . 解决方法
我想简单地从DataFrame获取一个底层的rdd并在其上触发一个动作应该可以实现你正在寻找的东西.
df.withColumn("test",myUDF($"id")).rdd.count // this gives proper exceptions (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |