加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – dataframe过滤器给出NullPointerException

发布时间:2020-12-16 18:01:16 所属栏目:安全 来源:网络整理
导读:在Spark 1.6.0中,我有一个数据框,其中包含一个包含作业描述的列,如: Descriptionbartenderbartenderemployeetaxi-driver... 我从该列检索一个唯一值列表: val jobs = people.select("Description").distinct().rdd.map(r = r(0).asInstanceOf[String]).rep
在Spark 1.6.0中,我有一个数据框,其中包含一个包含作业描述的列,如:

Description
bartender
bartender
employee
taxi-driver
...

我从该列检索一个唯一值列表:

val jobs = people.select("Description").distinct().rdd.map(r => r(0).asInstanceOf[String]).repartition(4)

然后,我尝试为每个工作描述检索具有该工作的人并做一些事情,但我得到一个NullPointerException:

jobs.foreach { 
  ajob => 
   var peoplewithjob = people.filter($"Description" === ajob)
   // ... do stuff
}

我不明白为什么会发生这种情况,因为每个工作都是从人员数据框中提取出来的,所以至少应该有一个工作…任何提示更多的欢迎!这是堆栈跟踪:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 4.0 failed 1 times,most recent failure: Lost task 3.0 in stage 4.0 (TID 206,localhost): java.lang.NullPointerException
at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:131)
at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$withPlan(DataFrame.scala:2165)
at org.apache.spark.sql.DataFrame.filter(DataFrame.scala:799)
at jago.Run$$anonfun$main$1.apply(Run.scala:89)
at jago.Run$$anonfun$main$1.apply(Run.scala:82)
at scala.collection.Iterator$class.foreach(Iterator.scala:742)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$32.apply(RDD.scala:912)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$32.apply(RDD.scala:912)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

解决方法

这是因为Spark不支持嵌套操作或转换.如果要对从DataFrame中提取的不同值进行操作,则必须将结果提取到驱动程序并在本地迭代:

// or toLocalIterator
jobs.collect.foreach { 
  ajob => 
    var peoplewithjob = people.filter($"Description" === ajob)
}

根据您作为“做东西”应用的变换类型,简单地进行组合和聚合可能是一个更好的想法:

people.groupBy($"Description").agg(...)

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读