scala – 在使用该UDF的列上添加过滤器时,Spark Sql UDF抛出Null
SPARK_VERSION = 2.2.0
在尝试对具有使用UDF添加的列的数据框进行过滤时,我遇到了一个有趣的问题.我可以使用较小的数据集来复制问题. 鉴于虚拟案例类: case class Info(number: Int,color: String) case class Record(name: String,infos: Seq[Info]) 以及以下数据: val blue = Info(1,"blue") val black = Info(2,"black") val yellow = Info(3,"yellow") val orange = Info(4,"orange") val white = Info(5,"white") val a = Record("a",Seq(blue,black,white)) val a2 = Record("a",Seq(yellow,white,orange)) val b = Record("b",black)) val c = Record("c",Seq(white,orange)) val d = Record("d",Seq(orange,black)) 请执行下列操作… 创建两个数据帧(我们将左右调用它们) val left = Seq(a,b).toDF val right = Seq(a2,c,d).toDF 使用full_outer连接加入这些数据帧,并仅使用右侧的数据帧 val rightOnlyInfos = left.alias("l") .join(right.alias("r"),Seq("name"),"full_outer") .filter("l.infos is null") .select($"name",$"r.infos".as("r_infos")) 这导致以下结果: rightOnlyInfos.show(false) +----+-----------------------+ |name|r_infos | +----+-----------------------+ |c |[[5,white],[4,orange]]| |d |[[4,orange],[2,black]]| +----+-----------------------+ 使用以下udf,添加一个布尔值的新列,并表示其中一个r_infos是否包含黑色 def hasBlack = (s: Seq[Row]) => { s.exists{ case Row(num: Int,color: String) => color == "black" } } val joinedBreakdown = rightOnlyInfos.withColumn("has_black",udf(hasBlack).apply($"r_infos")) 这是我现在遇到问题的地方.如果我执行以下操作,则不会出现任何错误: joinedBreakdown.show(false) 它的结果(如预期的那样): +----+-----------------------+---------+ |name|r_infos |has_black| +----+-----------------------+---------+ |c |[[5,orange]]|false | |d |[[4,black]]|true | +----+-----------------------+---------+ 和架构 joinedBreakdown.printSchema 节目 root |-- name: string (nullable = true) |-- r_infos: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- number: integer (nullable = false) | | |-- color: string (nullable = true) |-- has_black: boolean (nullable = true) 但是,当我尝试按结果过滤时,我收到一个错误: joinedBreakdown.filter("has_black == true").show(false) 出现以下错误: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$hasBlack$1: (array<struct<number:int,color:string>>) => boolean) at org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1075) at org.apache.spark.sql.catalyst.expressions.BinaryExpression.eval(Expression.scala:411) at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.org$apache$spark$sql$catalyst$optimizer$EliminateOuterJoin$$canFilterOutNull(joins.scala:127) at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$rightHasNonNullPredicate$lzycompute$1$1.apply(joins.scala:138) at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$rightHasNonNullPredicate$lzycompute$1$1.apply(joins.scala:138) at scala.collection.LinearSeqOptimized$class.exists(LinearSeqOptimized.scala:93) at scala.collection.immutable.List.exists(List.scala:84) at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.rightHasNonNullPredicate$lzycompute$1(joins.scala:138) at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.rightHasNonNullPredicate$1(joins.scala:138) at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.org$apache$spark$sql$catalyst$optimizer$EliminateOuterJoin$$buildNewJoinType(joins.scala:145) at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$apply$2.applyOrElse(joins.scala:152) at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$anonfun$apply$2.applyOrElse(joins.scala:150) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272) at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256) at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.apply(joins.scala:150) at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin.apply(joins.scala:116) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82) at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124) at scala.collection.immutable.List.foldLeft(List.scala:84) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74) at scala.collection.immutable.List.foreach(List.scala:381) at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74) at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:78) at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:78) at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:84) at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:80) at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:89) at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:89) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2832) at org.apache.spark.sql.Dataset.head(Dataset.scala:2153) at org.apache.spark.sql.Dataset.take(Dataset.scala:2366) at org.apache.spark.sql.Dataset.showString(Dataset.scala:245) at org.apache.spark.sql.Dataset.show(Dataset.scala:646) at org.apache.spark.sql.Dataset.show(Dataset.scala:623) ... 58 elided Caused by: java.lang.NullPointerException at $anonfun$hasBlack$1.apply(<console>:41) at $anonfun$hasBlack$1.apply(<console>:40) at org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2.apply(ScalaUDF.scala:92) at org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2.apply(ScalaUDF.scala:91) at org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1072) ... 114 more 编辑:打开了一个jira问题.粘贴在此处用于跟踪目的: 解决方法
这个答案并没有解决为什么存在这个问题,而是我找到的解决方案.
我遇到了一个完全像这样的问题.我不确定原因,但我有两个对我有用的解决方法.比我聪明的人可能会向你解释一切,但这是我解决问题的方法. 第一个解决方案 Spark就像列不存在一样.可能是因为某种滤波器下推.强制Spark在过滤之前缓存结果.这使得列“存在”. val joinedBreakdown = rightOnlyInfos.withColumn("has_black",hasBlack($"r_infos")).cache() println(joinedBreakdown.count()) //This will force cache the results from after the UDF has been applied. joinedBreakdown.filter("has_black == true").show(false) joinedBreakdown.filter("has_black == true").explain OUTPUT 2 +----+-----------------------+---------+ |name|r_infos |has_black| +----+-----------------------+---------+ |d |[[4,black]]|true | +----+-----------------------+---------+ == Physical Plan == *Filter (has_black#112632 = true) +- InMemoryTableScan [name#112622,r_infos#112628,has_black#112632],[(has_black#112632 = true)] +- InMemoryRelation [name#112622,true,10000,StorageLevel(disk,memory,deserialized,1 replicas) +- *Project [coalesce(name#112606,name#112614) AS name#112622,infos#112615 AS r_infos#112628,UDF(infos#112615) AS has_black#112632] +- *Filter isnull(infos#112607) +- SortMergeJoin [name#112606],[name#112614],FullOuter :- *Sort [name#112606 ASC NULLS FIRST],false,0 : +- Exchange hashpartitioning(name#112606,200) : +- LocalTableScan [name#112606,infos#112607] +- *Sort [name#112614 ASC NULLS FIRST],0 +- Exchange hashpartitioning(name#112614,200) +- LocalTableScan [name#112614,infos#112615] 第二个解决方案 不知道为什么这个工作,但你做了同样的事情,除了在UDF中放一个try / catch.在我被大吼之前,请知道使用try / catch控制流程是一种模式.要了解更多信息,我建议使用此question and answer.注意:我稍微编辑了您的UDF,使其看起来像我更熟悉的东西. def hasBlack = udf((s: Seq[Row]) => { try{ s.exists{ case Row(num: Int,color: String) => color == "black" } } catch { case ex: Exception => false } }) val joinedBreakdown = rightOnlyInfos.withColumn("has_black",hasBlack($"r_infos")) joinedBreakdown.filter("has_black == true").explain joinedBreakdown.filter("has_black == true").show(false) OUTPUT == Physical Plan == *Project [coalesce(name#112565,name#112573) AS name#112581,infos#112574 AS r_infos#112587,UDF(infos#112574) AS has_black#112591] +- *Filter isnull(infos#112566) +- *BroadcastHashJoin [name#112565],[name#112573],RightOuter,BuildLeft,false :- BroadcastExchange HashedRelationBroadcastMode(ArrayBuffer(input[0,string,false])) : +- *Filter isnotnull(name#112565) : +- LocalTableScan [name#112565,infos#112566] +- *Filter (UDF(infos#112574) = true) +- LocalTableScan [name#112573,infos#112574] +----+-----------------------+---------+ |name|r_infos |has_black| +----+-----------------------+---------+ |d |[[4,black]]|true | +----+-----------------------+---------+ 您可以看到查询计划是不同的,因为我在过滤器之前强制应用UDF. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |