加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 为什么这个Spark代码会产生NullPointerException?

发布时间:2020-12-16 08:59:08 所属栏目:安全 来源:网络整理
导读:我在执行Spark应用程序时遇到问题. 源代码: // Read table From HDFSval productInformation = spark.table("temp.temp_table1")val dict = spark.table("temp.temp_table2")// Custom UDFval countPositiveSimilarity = udf[Long,Seq[String],Seq[String]]
我在执行Spark应用程序时遇到问题.

源代码:

// Read table From HDFS
val productInformation = spark.table("temp.temp_table1")
val dict = spark.table("temp.temp_table2")

// Custom UDF
val countPositiveSimilarity = udf[Long,Seq[String],Seq[String]]((a,b) => 
    dict.filter(
        (($"first".isin(a: _*) && $"second".isin(b: _*)) || ($"first".isin(b: _*) && $"second".isin(a: _*))) && $"similarity" > 0.7
    ).count
)

val result = productInformation.withColumn("positive_count",countPositiveSimilarity($"title",$"internal_category"))

// Error occurs!
result.show

错误信息:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 54.0 failed 4 times,most recent failure: Lost task 0.3 in stage 54.0 (TID 5887,ip-10-211-220-33.ap-northeast-2.compute.internal,executor 150): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (array<string>,array<string>) => bigint)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
    at $anonfun$1.apply(<console>:45)
    at $anonfun$1.apply(<console>:43)
    ... 16 more

Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
  at scala.Option.foreach(Option.scala:257)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:333)
  at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
  at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2371)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
  at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2765)
  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2370)
  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2377)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2113)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2112)
  at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2795)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:2112)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2327)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:248)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:636)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:595)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:604)
  ... 48 elided
Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (array<string>,array<string>) => bigint)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
  at org.apache.spark.scheduler.Task.run(Task.scala:99)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
  ... 3 more
Caused by: java.lang.NullPointerException
  at $anonfun$1.apply(<console>:45)
  at $anonfun$1.apply(<console>:43)
  ... 16 more

我已经检查了productInformation和dict在Columns中是否具有null值.但是没有空值.

谁能帮我?
我附上了示例代码,让您了解更多详细信息:

case class Target(wordListOne: Seq[String],WordListTwo: Seq[String])
val targetData = Seq(Target(Seq("Spark","Wrong","Something"),Seq("Java","Grape","Banana")),Target(Seq("Java","Scala"),Seq("Scala",Target(Seq(""),Seq("Grape",Seq("")))
val targets = spark.createDataset(targetData)

case class WordSimilarity(first: String,second: String,similarity: Double)
val similarityData = Seq(WordSimilarity("Spark","Java",0.8),WordSimilarity("Scala","Spark",0.9),WordSimilarity("Java","Scala",WordSimilarity("Apple",0.66),"Apple",-0.1),WordSimilarity("Gine",0.1)) 
val dict = spark.createDataset(similarityData)

val countPositiveSimilarity = udf[Long,b) => 
    dict.filter(
        (($"first".isin(a: _*) && $"second".isin(b: _*)) || ($"first".isin(b: _*) && $"second".isin(a: _*))) && $"similarity" > 0.7
    ).count
)

val countDF = targets.withColumn("positive_count",countPositiveSimilarity($"wordListOne",$"wordListTwo"))

这是一个示例代码,类似于我的原始代码.
示例代码运行良好.我应该在哪一点检查原始代码和数据?

解决方法

非常有趣的问题.我必须做一些搜索,但这是我的.希望这会对你有所帮助.

通过createDataset创建数据集时,spark将使用LocalRelation逻辑查询计划分配此数据集.

def createDataset[T : Encoder](data: Seq[T]): Dataset[T] = {
    val enc = encoderFor[T]
    val attributes = enc.schema.toAttributes
    val encoded = data.map(d => enc.toRow(d).copy())
    val plan = new LocalRelation(attributes,encoded)
    Dataset[T](self,plan)
  }

请关注此link:
LocalRelation是一个叶逻辑计划,允许在本地执行collect或take等函数,即不使用Spark执行程序.

并且,正如isLocal方法所指出的那样

/**
   * Returns true if the `collect` and `take` methods can be run locally
   * (without any Spark executors).
   *
   * @group basic
   * @since 1.6.0
   */
  def isLocal: Boolean = logicalPlan.isInstanceOf[LocalRelation]

显然,您可以查看您的2个数据集是本地的.

并且,show方法实际上是内部调用.

private[sql] def showString(_numRows: Int,truncate: Int = 20): String = {
    val numRows = _numRows.max(0)
    val takeResult = toDF().take(numRows + 1)
    val hasMoreData = takeResult.length > numRows
    val data = takeResult.take(numRows)

因此,有了这些证据,我认为调用countDF.show被执行,它会像你从驱动程序调用dict数据集上的count一样表现,调用次数就是目标的记录数.并且,dict数据集当然不需要是countDF工作中的show的本地数据集.

您可以尝试保存countDF,它将为您提供与第一种情况相同的异常org.apache.spark.SparkException:无法执行用户定义的函数($anonfun $1

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读