scala – GenericRowWithSchema异常,将DataBuffer中的HashSet转
我有一个使用的生成的镶木地板格式的Hive表
create table myTable (var1 int,var2 string,var3 int,var4 string,var5 array<struct<a:int,b:string>>) stored as parquet; 我能够验证它已被填充 – 这是一个示例值 [1,"abcdef",2,"ghijkl",ArrayBuffer([1,"hello"])] 我希望将其放入表单的Spark RDD中 ((1,"abcdef"),((2,"ghijkl"),Set((1,"hello")))) 现在,使用spark-shell(我在spark-submit中遇到了同样的问题),我用这些值做了一个测试RDD scala> val tempRDD = sc.parallelize(Seq(((1,ArrayBuffer[(Int,String)]((1,"hello")))))) tempRDD: org.apache.spark.rdd.RDD[((Int,String),((Int,scala.collection.mutable.ArrayBuffer[(Int,String)]))] = ParallelCollectionRDD[44] at parallelize at <console>:85 使用迭代器,我可以在下面的新RDD中将ArrayBuffer转换为HashSet: scala> val tempRDD2 = tempRDD.map(a => (a._1,(a._2._1,{ var tempHashSet = new HashSet[(Int,String)]; a._2._2.foreach(a => tempHashSet = tempHashSet ++ HashSet(a)); tempHashSet } ))) tempRDD2: org.apache.spark.rdd.RDD[((Int,scala.collection.immutable.HashSet[(Int,String)]))] = MapPartitionsRDD[46] at map at <console>:87 scala> tempRDD2.collect.foreach(println) ((1,abcdef),ghijkl),hello)))) 但是当我尝试使用带有HiveContext / SQLContext的DataFrame进行完全相同的操作时,我收到以下错误: scala> val hc = new HiveContext(sc) scala> import hc._ scala> import hc.implicits._ scala> val tempHiveQL = hc.sql("""select var1,var2,var3,var4,var5 from myTable""") scala> val tempRDDfromHive = tempHiveQL.map(a => ((a(0).toString.toInt,a(1).toString),((a(2).toString.toInt,a(3).toString),a(4).asInstanceOf[ArrayBuffer[(Int,String)]] ))) scala> val tempRDD3 = tempRDDfromHive.map(a => (a._1,String)]; a._2._2.foreach(a => tempHashSet = tempHashSet ++ HashSet(a)); tempHashSet } ))) tempRDD3: org.apache.spark.rdd.RDD[((Int,String)]))] = MapPartitionsRDD[47] at map at <console>:91 scala> tempRDD3.collect.foreach(println) org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 14.0 failed 1 times,most recent failure: Lost task 1.0 in stage 14.0 (TID 5211,localhost): java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to scala.Tuple2 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1$$anonfun$apply$1.apply(<console>:91) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:91) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:91) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813) at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:724) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 请注意,当我使用spark-submit在编译的程序中运行它时,我得到同样的错误“GenericRowWithSchema不能转换为scala.Tuple2”.程序在遇到转换步骤时在RUN TIME崩溃,我没有编译器错误. 我觉得很奇怪,我的人工生成的RDD“tempRDD”可以用于转换,而Hive查询DataFrame-> RDD则没有.我查了一下,两个RDD都有相同的形式: scala> tempRDD org.apache.spark.rdd.RDD[((Int,String)]))] = MapPartitionsRDD[21] at map at DataFrame.scala:776 scala> tempRDDfromHive org.apache.spark.rdd.RDD[((Int,String)]))] = ParallelCollectionRDD[25] at parallelize at <console>:70 唯一的区别是他们最后一步的起源.在运行tempRDD2和tempRDD3的步骤之前,我甚至尝试过持久化,检查点和实现这些RDD.都得到了相同的错误消息. 我还阅读了相关的stackoverflow问题和Apache Spark Jira问题,以及我尝试将ArrayBuffer作为迭代器转换的那些问题,但是在第二步中也出现了相同的错误. 有谁知道如何正确地将ArrayBuffers转换为源自Hive表的DataFrame的HashSets?由于错误似乎只适用于Hive表版本,我很想认为这是SparkQL中Spark / Hive集成的问题. 有任何想法吗? 提前致谢. [编辑] BTW,我的Spark版本是1.3.0 CDH. [编辑:这是printSchema的结果] scala> tempRDDfromHive.printSchema() root |-- var1: integer (nullable = true) |-- var2: string (nullable = true) |-- var3: integer (nullable = true) |-- var4: string (nullable = true) |-- var5: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- a: integer (nullable = true) | | |-- b: string (nullable = true) 解决方法
你在map阶段实际得到的不是ArrayBuffer [(Int,String)]而是ArrayBuffer [Row]因此错误.忽略其他列您需要的是这样的:
import org.apache.spark.sql.Row tempHiveQL.map((a: Row) => a.getAs[Seq[Row]](4).map{case Row(k: Int,v: String) => (k,v)}.toSet) 看起来这个问题已在Spark 1.5.0中得到修复. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
- twitter-bootstrap – Twitter Bootstrap自定义最
- [WebServices]之三:动态调用 WebService
- Chapter 1 unit 1 of Bootstrap-Bootstrap Scaff
- 处理UNIX,Linux和Windows的内存限制和地址空间
- bash – 是否可以从子shell获取退出代码?
- WebServices(C#)--返回自定义数据类型
- bootstrap + angularjs + springmvc + mybatis框
- twitter-bootstrap – 将内容置于Bootstrap 4中的
- unix – 我如何知道哪些进程正在使用网络?
- AngularJs Type error : Cannot read property &