加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – Spark 2.2 Null-safe Left Outer Join Null Pointer Ex

发布时间:2020-12-16 19:24:05 所属栏目:安全 来源:网络整理
导读:使用null-safe equals运算符执行左外连接会导致NullPointerException. 版本 Spark 2.2.0, 斯卡拉2.11.8 scala var d1 = Seq((null,1),("a1",2)).toDF("a","b")scala d1.show+----+---+| a| b|+----+---+|null| 1|| a1| 2|+----+---+scala var d2 = Seq(("a2"
使用null-safe equals运算符执行左外连接会导致NullPointerException.

版本
Spark 2.2.0,
斯卡拉2.11.8

scala> var d1 = Seq((null,1),("a1",2)).toDF("a","b")
scala> d1.show
+----+---+
|   a|  b|
+----+---+
|null|  1|
|  a1|  2|
+----+---+

scala> var d2 = Seq(("a2",3)).toDF("a","b")
scala> d2.show
+---+---+
|  a|  b|
+---+---+
| a2|  3|
+---+---+

scala> d1.joinWith(d2,d1("a") <=> d2("a"),"left_outer").show
17/10/10 09:44:39 ERROR Executor: Exception in task 0.0 in stage 6.0 (TID 8)
java.lang.NullPointerException

这种行为有望吗?

堆栈跟踪

java.lang.NullPointerException
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
17/10/10 10:19:28 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,localhost,executor driver): java.lang.NullPointerException
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

17/10/10 10:19:28 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times,most recent failure: Lost task 0.0 in stage 0.0 (TID 0,executor driver): java.lang.NullPointerException
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

解决方法

我可以说在版本Spark 2.2.0和Scala 2.11.8中没有找到相同的例子,因为我在使用null安全的相同代码示例时没有得到任何异常

<=>

和平等的操作符

===

你能再次检查并添加与问题相关的更多细节吗?

val d1 = sc.parallelize(Seq(
          (null,2))
        ).toDF("a","b") 

d1.show


+----+---+
|   a|  b|
+----+---+
|null|  1|
|  a1|  2|
+----+---+


val d2 = sc.parallelize(Seq(
      ("a2",3))
    ).toDF("a","b") 

d2.show

+---+---+
|  a|  b|
+---+---+
| a2|  3|
+---+---+


d1.joinWith(d2,d1("a") <=>  d2("a"),"left_outer").show()

+--------+----+
|      _1|  _2|
+--------+----+
|[null,1]|null|
|  [a1,2]|null|
+--------+----+

d1.joinWith(d2,d1("a") ===  d2("a"),"left_outer").show()



+--------+----+
|      _1|  _2|
+--------+----+
|[null,2]|null|
+--------+----+

添加其他示例:

val x = Seq((100L,null),(102L,"17179869185L"),(101L,"17179869186L"),(200L,(401L,"1L"),(500L,(600L,"8589934593L"),(700L,(800L,(900L,"8589934594L"),(1000L,(1200L,"2L"),(1300L,(1301L,(1400L,"17179869187L"),(1500L,"17179869188L"),(1600L,"8589934595L")).toDF("u","x1")

x.show()


+----+------------+
|   u|          x1|
+----+------------+
| 100|        null|
| 102|17179869185L|
| 101|17179869186L|
| 200|17179869186L|
| 401|          1L|
| 500|          1L|
| 600| 8589934593L|
| 700| 8589934593L|
| 800| 8589934593L|
| 900| 8589934594L|
|1000| 8589934594L|
|1200|          2L|
|1300|          2L|
|1301|          2L|
|1400|17179869187L|
|1500|17179869188L|
|1600| 8589934595L|
+----+------------+

val y = Seq(("17179869187L",-8589934595L),("17179869188L",("17179869185L",-858993
4593L)).toDF("x2","y")


y.show()

+------------+-----------+
|          x2|          y|
+------------+-----------+
|17179869187L|-8589934595|
|17179869188L|-8589934595|
|17179869185L|-8589934593|
+------------+-----------+


x.join(y,'x1 === 'x2,"left_outer").show()

+----+------------+------------+-----------+
|   u|          x1|          x2|          y|
+----+------------+------------+-----------+
| 100|        null|        null|       null|
| 102|17179869185L|17179869185L|-8589934593|
| 101|17179869186L|        null|       null|
| 200|17179869186L|        null|       null|
| 401|          1L|        null|       null|
| 500|          1L|        null|       null|
| 600| 8589934593L|        null|       null|
| 700| 8589934593L|        null|       null|
| 800| 8589934593L|        null|       null|
| 900| 8589934594L|        null|       null|
|1000| 8589934594L|        null|       null|
|1200|          2L|        null|       null|
|1300|          2L|        null|       null|
|1301|          2L|        null|       null|
|1400|17179869187L|17179869187L|-8589934595|
|1500|17179869188L|17179869188L|-8589934595|
|1600| 8589934595L|        null|       null|
+----+------------+------------+-----------+

x: org.apache.spark.sql.DataFrame = [u: bigint,x1: string]
y: org.apache.spark.sql.DataFrame = [x2: string,y: bigint]
Command took 1.00 second

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读