scala – Spark数据集:示例:无法生成编码器问题
发布时间:2020-12-16 09:09:43 所属栏目:安全 来源:网络整理
导读:新的火花世界和尝试用 scala编写的数据集示例,我在网上找到了 在通过SBT运行时,我继续收到以下错误 org.apache.spark.sql.AnalysisException:无法为内部类生成编码器 不知道我在俯瞰什么 也可以随意指出编写相同数据集示例的更好方法 谢谢 sbt runMain Data
新的火花世界和尝试用
scala编写的数据集示例,我在网上找到了
在通过SBT运行时,我继续收到以下错误 org.apache.spark.sql.AnalysisException:无法为内部类生成编码器 不知道我在俯瞰什么 也可以随意指出编写相同数据集示例的更好方法 谢谢 > sbt> runMain DatasetExample Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 16/10/25 01:06:39 INFO Remoting: Starting remoting 16/10/25 01:06:46 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@192.168.150.130:50555] [error] (run-main-6) org.apache.spark.sql.AnalysisException: Unable to generate an encoder for inner class `DatasetExample$Student` without access to the scope that this class was defined in. Try moving this class out of its parent class.; org.apache.spark.sql.AnalysisException: Unable to generate an encoder for inner class `DatasetExample$Student` without access to the scope that this class was defined in. Try moving this class out of its parent class.; at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$$anonfun$3.applyOrElse(ExpressionEncoder.scala:306) at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$$anonfun$3.applyOrElse(ExpressionEncoder.scala:302) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:259) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:259) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:258) at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:249) at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.resolve(ExpressionEncoder.scala:302) at org.apache.spark.sql.Dataset.<init>(Dataset.scala:79) at org.apache.spark.sql.Dataset.<init>(Dataset.scala:90) at org.apache.spark.sql.DataFrame.as(DataFrame.scala:209) at DatasetExample$.main(DatasetExample.scala:45) at DatasetExample.main(DatasetExample.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) [trace] Stack trace suppressed: run last sparkExamples/compile:runMain for the full output. java.lang.RuntimeException: Nonzero exit code: 1 at scala.sys.package$.error(package.scala:27) [trace] Stack trace suppressed: run last sparkExamples/compile:runMain for the full output. [error] (sparkExamples/compile:runMain) Nonzero exit code: 1 [error] Total time: 127 s,completed Oct 25,2016 1:08:09 AM 代码: import org.apache.spark._ import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import org.apache.spark.sql.SQLContext import org.apache.spark.sql._ import org.apache.log4j.{Level,Logger} import org.apache.spark.sql.SQLContext import org.apache.spark.{SparkConf,SparkContext} import org.apache.spark.sql.functions._ object DatasetExample { // Create data sets case class Student(name: String,dept: String,age:Long ) case class Department(abbrevName: String,fullName: String) org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this) // Not sure what exactly is the purpose def main(args: Array[String]) { Logger.getLogger("org").setLevel(Level.OFF) Logger.getLogger("akka").setLevel(Level.OFF) // initialise spark context val conf = new SparkConf().setAppName("SetsExamples").setMaster("local") val sc = new SparkContext(conf) val sqlcontext = new org.apache.spark.sql.SQLContext(sc) import sqlcontext.implicits._ // Not sure what exactly is the purpose // Read JSON objects into a Dataset[Student]. val students = sqlcontext.read.json("student.json").as[Student] students.show() // Select two columns and filter on one column. // Each argument of "select" must be a "TypedColumn". students.select($"name".as[String],$"dept".as[String]). filter(_._2 == "Math"). // Filter on _2,the second selected column collect() // Group by department and count each group. students.groupBy(_.dept).count().collect() // Group and aggregate in each group. students.groupBy(_.dept). agg(avg($"age").as[Double]). collect() // Initialize a Seq and convert to a Dataset. val depts = Seq(Department("CS","Computer Science"),Department("Math","Mathematics")).toDS() // Show the contents of the Dataset. depts.show() // Join two datasets with "joinWith". val joined = students.joinWith(depts,$"dept" === $"abbrevName") // Show the contents of the joined Dataset. // Note that the original objects are nested into tuples under the _1 and _2 columns. joined.show() // terminate spark context sc.stop() } } JSON文件(student.json): {"id" : "1201","name" : "Kris","age" : "25"} {"id" : "1202","name" : "John","age" : "28"} {"id" : "1203","name" : "Chet","age" : "39"} {"id" : "1204","name" : "Mark","age" : "23"} {"id" : "1205","name" : "Vic","age" : "23"} 解决方法
这条线是导致问题的原因:
org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this) 这意味着您要向此上下文添加一个新的外部作用域,可以在反序列化期间实例化内部类时使用. 当在Spark REPL中定义case类并注册定义此类的外部作用域时,可以创建内部类,从而允许我们在spark执行器上创建新实例. 在正常使用(您的情况)中,您不需要调用此函数. 编辑:您还需要在DatasetExample对象之外移动案例类. 注意: import sqlContext.implicits._是一个特定于scala的调用,用于隐式方法,可用于将常见的scala RDD对象转换为DataFrame. 更多关于here. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |