scala – Spark 2.2 Null-safe Left Outer Join Null Pointer Ex
发布时间:2020-12-16 19:24:05 所属栏目:安全 来源:网络整理
导读:使用null-safe equals运算符执行左外连接会导致NullPointerException. 版本 Spark 2.2.0, 斯卡拉2.11.8 scala var d1 = Seq((null,1),("a1",2)).toDF("a","b")scala d1.show+----+---+| a| b|+----+---+|null| 1|| a1| 2|+----+---+scala var d2 = Seq(("a2"
使用null-safe equals运算符执行左外连接会导致NullPointerException.
版本 scala> var d1 = Seq((null,1),("a1",2)).toDF("a","b") scala> d1.show +----+---+ | a| b| +----+---+ |null| 1| | a1| 2| +----+---+ scala> var d2 = Seq(("a2",3)).toDF("a","b") scala> d2.show +---+---+ | a| b| +---+---+ | a2| 3| +---+---+ scala> d1.joinWith(d2,d1("a") <=> d2("a"),"left_outer").show 17/10/10 09:44:39 ERROR Executor: Exception in task 0.0 in stage 6.0 (TID 8) java.lang.NullPointerException 这种行为有望吗? 堆栈跟踪 java.lang.NullPointerException at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 17/10/10 10:19:28 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,localhost,executor driver): java.lang.NullPointerException at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 17/10/10 10:19:28 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times,most recent failure: Lost task 0.0 in stage 0.0 (TID 0,executor driver): java.lang.NullPointerException at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 解决方法
我可以说在版本Spark 2.2.0和Scala 2.11.8中没有找到相同的例子,因为我在使用null安全的相同代码示例时没有得到任何异常
和平等的操作符
你能再次检查并添加与问题相关的更多细节吗? val d1 = sc.parallelize(Seq( (null,2)) ).toDF("a","b") d1.show +----+---+ | a| b| +----+---+ |null| 1| | a1| 2| +----+---+ val d2 = sc.parallelize(Seq( ("a2",3)) ).toDF("a","b") d2.show +---+---+ | a| b| +---+---+ | a2| 3| +---+---+ d1.joinWith(d2,d1("a") <=> d2("a"),"left_outer").show() +--------+----+ | _1| _2| +--------+----+ |[null,1]|null| | [a1,2]|null| +--------+----+ d1.joinWith(d2,d1("a") === d2("a"),"left_outer").show() +--------+----+ | _1| _2| +--------+----+ |[null,2]|null| +--------+----+ 添加其他示例: val x = Seq((100L,null),(102L,"17179869185L"),(101L,"17179869186L"),(200L,(401L,"1L"),(500L,(600L,"8589934593L"),(700L,(800L,(900L,"8589934594L"),(1000L,(1200L,"2L"),(1300L,(1301L,(1400L,"17179869187L"),(1500L,"17179869188L"),(1600L,"8589934595L")).toDF("u","x1") x.show() +----+------------+ | u| x1| +----+------------+ | 100| null| | 102|17179869185L| | 101|17179869186L| | 200|17179869186L| | 401| 1L| | 500| 1L| | 600| 8589934593L| | 700| 8589934593L| | 800| 8589934593L| | 900| 8589934594L| |1000| 8589934594L| |1200| 2L| |1300| 2L| |1301| 2L| |1400|17179869187L| |1500|17179869188L| |1600| 8589934595L| +----+------------+ val y = Seq(("17179869187L",-8589934595L),("17179869188L",("17179869185L",-858993 4593L)).toDF("x2","y") y.show() +------------+-----------+ | x2| y| +------------+-----------+ |17179869187L|-8589934595| |17179869188L|-8589934595| |17179869185L|-8589934593| +------------+-----------+ x.join(y,'x1 === 'x2,"left_outer").show() +----+------------+------------+-----------+ | u| x1| x2| y| +----+------------+------------+-----------+ | 100| null| null| null| | 102|17179869185L|17179869185L|-8589934593| | 101|17179869186L| null| null| | 200|17179869186L| null| null| | 401| 1L| null| null| | 500| 1L| null| null| | 600| 8589934593L| null| null| | 700| 8589934593L| null| null| | 800| 8589934593L| null| null| | 900| 8589934594L| null| null| |1000| 8589934594L| null| null| |1200| 2L| null| null| |1300| 2L| null| null| |1301| 2L| null| null| |1400|17179869187L|17179869187L|-8589934595| |1500|17179869188L|17179869188L|-8589934595| |1600| 8589934595L| null| null| +----+------------+------------+-----------+ x: org.apache.spark.sql.DataFrame = [u: bigint,x1: string] y: org.apache.spark.sql.DataFrame = [x2: string,y: bigint] Command took 1.00 second (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |