加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – Apache Spark – 数据集操作在抽象基类中失败了吗?

发布时间:2020-12-16 09:58:48 所属栏目:安全 来源:网络整理
导读:我正在尝试将一些常见代码提取到抽象类中,但遇到了问题. 假设我正在读取格式为“id | name”的文件: case class Person(id: Int,name: String) extends Serializableobject Persons { def apply(lines: Dataset[String]): Dataset[Person] = { import lines
我正在尝试将一些常见代码提取到抽象类中,但遇到了问题.

假设我正在读取格式为“id | name”的文件:

case class Person(id: Int,name: String) extends Serializable

object Persons {
  def apply(lines: Dataset[String]): Dataset[Person] = {
    import lines.sparkSession.implicits._
    lines.map(line => {
      val fields = line.split("|")
      Person(fields(0).toInt,fields(1))
    })
  }
}

Persons(spark.read.textFile("persons.txt")).show()

大.这很好用.现在让我们说我想用“名称”字段读取许多不同的文件,因此我将提取出所有常见的逻辑:

trait Named extends Serializable { val name: String }

abstract class NamedDataset[T <: Named] {
  def createRecord(fields: Array[String]): T
  def apply(lines: Dataset[String]): Dataset[T] = {
    import lines.sparkSession.implicits._
    lines.map(line => createRecord(line.split("|")))
  }
}

case class Person(id: Int,name: String) extends Named

object Persons extends NamedDataset[Person] {
  override def createRecord(fields: Array[String]) =
    Person(fields(0).toInt,fields(1))
}

这失败了两个错误:

Error:
Unable to find encoder for type stored in a Dataset.  
Primitive types (Int,String,etc) and Product types (case classes) 
are supported by importing spark.implicits._  Support for serializing 
other types will be added in future releases.
lines.map(line => createRecord(line.split("|")))

Error:
not enough arguments for method map: 
(implicit evidence$7: org.apache.spark.sql.Encoder[T])org.apache.spark.sql.Dataset[T].
Unspecified value parameter evidence$7.
lines.map(line => createRecord(line.split("|")))

我有一种感觉这与implicits,TypeTags和/或ClassTags有关,但我刚开始使用Scala并且还没有完全理解这些概念.

解决方法

你必须做两个小改动:

>由于仅支持基元和产品(作为错误消息状态),因此使命名特征Serializable是不够的.你应该让它扩展Product(这意味着case类和Tuples可以扩展它)
>事实上,Spark需要ClassTag和TypeTag来克服类型擦除并找出实际类型

所以 – 这是一个工作版本:

import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag

trait Named extends Product { val name: String }

abstract class NamedDataset[T <: Named : ClassTag : TypeTag] extends Serializable {
  def createRecord(fields: Array[String]): T
  def apply(lines: Dataset[String]): Dataset[T] = {
    import lines.sparkSession.implicits._
    lines.map(line => createRecord(line.split("|")))
  }
}

case class Person(id: Int,fields(1))
}

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读