加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 如何从包含Enums的案例类创建Spark Dataset或Dataframe

发布时间:2020-12-16 09:28:08 所属栏目:安全 来源:网络整理
导读:我一直在尝试使用包含枚举的案例类创建Spark数据集,但我无法做到.我正在使用Spark版本1.6.0.例外是抱怨我的枚举没有找到编码器.这在Spark中不可能在数据中包含枚举吗? 码: import org.apache.spark.sql.SQLContextimport org.apache.spark.{SparkConf,Spar
我一直在尝试使用包含枚举的案例类创建Spark数据集,但我无法做到.我正在使用Spark版本1.6.0.例外是抱怨我的枚举没有找到编码器.这在Spark中不可能在数据中包含枚举吗?

码:

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf,SparkContext}

object MyEnum extends Enumeration {
  type MyEnum = Value
  val Hello,World = Value
}

case class MyData(field: String,other: MyEnum.Value)

object EnumTest {

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    val sc = new SparkContext(sparkConf)
    val sqlCtx = new SQLContext(sc)

    import sqlCtx.implicits._

    val df = sc.parallelize(Array(MyData("hello",MyEnum.World))).toDS()

    println(s"df: ${df.collect().mkString(",")}}")
  }

}

错误:

Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for com.company.MyEnum.Value
- field (class: "scala.Enumeration.Value",name: "other")
- root class: "com.company.MyData"
at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$extractorFor(ScalaReflection.scala:597)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$extractorFor$1.apply(ScalaReflection.scala:509)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$extractorFor$1.apply(ScalaReflection.scala:502)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$extractorFor(ScalaReflection.scala:502)
at org.apache.spark.sql.catalyst.ScalaReflection$.extractorsFor(ScalaReflection.scala:394)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:54)
at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:41)
at com.company.EnumTest$.main(EnumTest.scala:22)
at com.company.EnumTest.main(EnumTest.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)

解决方法

您可以创建自己的编码器:

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf,other: MyEnum.Value)

object MyDataEncoders {
  implicit def myDataEncoder: org.apache.spark.sql.Encoder[MyData] =
    org.apache.spark.sql.Encoders.kryo[MyData]
}  

object EnumTest {
  import MyDataEncoders._

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    val sc = new SparkContext(sparkConf)
    val sqlCtx = new SQLContext(sc)

    import sqlCtx.implicits._

    val df = sc.parallelize(Array(MyData("hello",")}}")
  }
}

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读