加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – Spark distinct,后跟join给出IndexOutOfBoundsExceptio

发布时间:2020-12-16 08:46:30 所属栏目:安全 来源:网络整理
导读:试图加入两个数据帧A B. B在加入之前有一个独特的操作.另外B中的一列在A中的两列上连接.这种特定情况是给出IndexOutOfBoundsException.以前有人遇到过这种情况吗? 详情如下.提前致谢! 环境: spark-shell standalone modeSpark version 2.3.1 码: val df1
试图加入两个数据帧A& B. B在加入之前有一个独特的操作.另外B中的一列在A中的两列上连接.这种特定情况是给出IndexOutOfBoundsException.以前有人遇到过这种情况吗?

详情如下.提前致谢!

环境:

spark-shell standalone mode
Spark version 2.3.1

码:

val df1 = Seq((1,"one","one"),(2,"two","two")).toDF("key1","val11","val12")
val df2 = Seq(("one","first"),("one",("two","second")).toDF("key2","val2")
val df3 = df2.distinct
val df4 = df1.join(df3,col("val11") === col("key2") and col("val12") === col("key2"))
df4.show(false)

例外:

java.lang.IndexOutOfBoundsException: -1
  at scala.collection.LinearSeqOptimized$class.apply(LinearSeqOptimized.scala:65)
  at scala.collection.immutable.List.apply(List.scala:84)
  at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$reorder$1.apply(EnsureRequirements.scala:233)
  at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$reorder$1.apply(EnsureRequirements.scala:231)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  at org.apache.spark.sql.execution.exchange.EnsureRequirements.reorder(EnsureRequirements.scala:231)
  at org.apache.spark.sql.execution.exchange.EnsureRequirements.org$apache$spark$sql$execution$exchange$EnsureRequirements$$reorderJoinKeys(EnsureRequirements.scala:255)
  at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$org$apache$spark$sql$execution$exchange$EnsureRequirements$$reorderJoinPredicates$1.applyOrElse(EnsureRequirements.scala:277)
  at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$org$apache$spark$sql$execution$exchange$EnsureRequirements$$reorderJoinPredicates$1.applyOrElse(EnsureRequirements.scala:273)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
  at org.apache.spark.sql.execution.exchange.EnsureRequirements.org$apache$spark$sql$execution$exchange$EnsureRequirements$$reorderJoinPredicates(EnsureRequirements.scala:273)
  at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$apply$1.applyOrElse(EnsureRequirements.scala:302)
  at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$apply$1.applyOrElse(EnsureRequirements.scala:294)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
  at org.apache.spark.sql.execution.exchange.EnsureRequirements.apply(EnsureRequirements.scala:294)
  at org.apache.spark.sql.execution.exchange.EnsureRequirements.apply(EnsureRequirements.scala:37)
  at org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:87)
  at org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:87)
  at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
  at scala.collection.immutable.List.foldLeft(List.scala:84)
  at org.apache.spark.sql.execution.QueryExecution.prepareForExecution(QueryExecution.scala:87)
  at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
  at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3249)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:2484)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2698)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:254)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:725)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:702)
  ... 49 elided

更新:工作解决方案:谢谢@ 1pluszara!

val df1 = Seq((1,"val2")
val df3 = spark.createDataFrame(df2.rdd.distinct,df2.schema)
val df4 = df1.join(df3,col("val11") === col("key2") and col("val12") === col("key2"))
df4.show(false)

解决方法

试过这个:

val df3 = df2.rdd.distinct().map({ 
  case Row(key2: String,val2: String) => (key2,val2)
}).toDF("key2","val2")

val df4 = df1.join(df3,col("val11") === col("key2") and col("val12") === col("key2"))
df4.show(false)

输出:

+----+-----+-----+----+------+
|key1|val11|val12|key2|val2  |
+----+-----+-----+----+------+
|2   |two  |two  |two |second|
|1   |one  |one  |one |first |
+----+-----+-----+----+------+

但不确定执行在数据帧版本内部如何工作.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读