加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 如何在Spark SQL中定义自定义类型的模式?

发布时间:2020-12-16 09:20:14 所属栏目:安全 来源:网络整理
导读:以下示例代码尝试将一些案例对象放入数据框.该代码包括使用此特征的案例对象层次结构和案例类的定义: import org.apache.spark.{SparkContext,SparkConf}import org.apache.spark.sql.SQLContextsealed trait Somecase object AType extends Somecase objec
以下示例代码尝试将一些案例对象放入数据框.该代码包括使用此特征的案例对象层次结构和案例类的定义:

import org.apache.spark.{SparkContext,SparkConf}
import org.apache.spark.sql.SQLContext

sealed trait Some
case object AType extends Some
case object BType extends Some

case class Data( name : String,t: Some)

object Example {
  def main(args: Array[String]) : Unit = {
    val conf = new SparkConf()
      .setAppName( "Example" )
      .setMaster( "local[*]")

    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)

    import sqlContext.implicits._

    val df = sc.parallelize( Seq( Data( "a",AType),Data( "b",BType) ),4).toDF()
    df.show()
  }
}

执行代码时,我不幸遇到以下异常:

java.lang.UnsupportedOperationException: Schema for type Some is not supported

问题

>是否有可能为某些类型添加或定义模式(这里输入一些)?
>还有另一个方法来代表这种枚举?

>我试图直接使用枚举,也没有成功. (见下文)

枚举代??码:

object Some extends Enumeration {
  type Some = Value
  val AType,BType = Value
}

提前致谢.我希望,最好的方法是不要使用字符串.

解决方法

Spark 2.0.0:

UserDefinedType已经在Spark 2.0.0中变为私有,现在它没有数据集友好的替换.

见:SPARK-14155 (Hide UserDefinedType in Spark 2.0)

大多数时间静态类型的数据集可以作为替代
有一个挂起的Jira SPARK-7768,使UDT API再次公开,目标版本为2.2.

参见How to store custom objects in a Dataset?

火花2.0.0

Is there a possibility to add or define a schema for certain types (here type Some)?

我想这个答案取决于你需要这么多的东西.看起来可以创建一个UserDefinedType,但它需要访问DeveloperApi,并不是完全直截了当或记录良好.

import org.apache.spark.sql.types._

@SQLUserDefinedType(udt = classOf[SomeUDT])
sealed trait Some
case object AType extends Some
case object BType extends Some

class SomeUDT extends UserDefinedType[Some] {
  override def sqlType: DataType = IntegerType

  override def serialize(obj: Any) = {
    obj match {
      case AType => 0
      case BType => 1
    }
  }

  override def deserialize(datum: Any): Some = {
    datum match {
      case 0 => AType
      case 1 => BType
    }
  }

  override def userClass: Class[Some] = classOf[Some]
}

你也应该覆盖hashCode和equals.

它的PySpark对应物可以如下所示:

from enum import Enum,unique
from pyspark.sql.types import UserDefinedType,IntegerType

class SomeUDT(UserDefinedType):
    @classmethod
    def sqlType(self):
        return IntegerType()

    @classmethod
    def module(cls):
        return cls.__module__

    @classmethod 
    def scalaUDT(cls): # Required in Spark < 1.5
        return 'net.zero323.enum.SomeUDT'

    def serialize(self,obj):
        return obj.value

    def deserialize(self,datum):
        return {x.value: x for x in Some}[datum]

@unique
class Some(Enum):
    __UDT__ = SomeUDT()
    AType = 0
    BType = 1

在Spark< 1.5 Python UDT需要一个配对的Scala UDT,但它看起来不一样,在1.5.

对于一个简单的UDT,你可以使用简单的类型(例如IntegerType而不是整个Struct).

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读