scala – Apache Spark – 数据集操作在抽象基类中失败了吗?
发布时间:2020-12-16 09:58:48 所属栏目:安全 来源:网络整理
导读:我正在尝试将一些常见代码提取到抽象类中,但遇到了问题. 假设我正在读取格式为“id | name”的文件: case class Person(id: Int,name: String) extends Serializableobject Persons { def apply(lines: Dataset[String]): Dataset[Person] = { import lines
我正在尝试将一些常见代码提取到抽象类中,但遇到了问题.
假设我正在读取格式为“id | name”的文件: case class Person(id: Int,name: String) extends Serializable object Persons { def apply(lines: Dataset[String]): Dataset[Person] = { import lines.sparkSession.implicits._ lines.map(line => { val fields = line.split("|") Person(fields(0).toInt,fields(1)) }) } } Persons(spark.read.textFile("persons.txt")).show() 大.这很好用.现在让我们说我想用“名称”字段读取许多不同的文件,因此我将提取出所有常见的逻辑: trait Named extends Serializable { val name: String } abstract class NamedDataset[T <: Named] { def createRecord(fields: Array[String]): T def apply(lines: Dataset[String]): Dataset[T] = { import lines.sparkSession.implicits._ lines.map(line => createRecord(line.split("|"))) } } case class Person(id: Int,name: String) extends Named object Persons extends NamedDataset[Person] { override def createRecord(fields: Array[String]) = Person(fields(0).toInt,fields(1)) } 这失败了两个错误: Error: Unable to find encoder for type stored in a Dataset. Primitive types (Int,String,etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases. lines.map(line => createRecord(line.split("|"))) Error: not enough arguments for method map: (implicit evidence$7: org.apache.spark.sql.Encoder[T])org.apache.spark.sql.Dataset[T]. Unspecified value parameter evidence$7. lines.map(line => createRecord(line.split("|"))) 我有一种感觉这与implicits,TypeTags和/或ClassTags有关,但我刚开始使用Scala并且还没有完全理解这些概念. 解决方法
你必须做两个小改动:
>由于仅支持基元和产品(作为错误消息状态),因此使命名特征Serializable是不够的.你应该让它扩展Product(这意味着case类和Tuples可以扩展它) 所以 – 这是一个工作版本: import scala.reflect.ClassTag import scala.reflect.runtime.universe.TypeTag trait Named extends Product { val name: String } abstract class NamedDataset[T <: Named : ClassTag : TypeTag] extends Serializable { def createRecord(fields: Array[String]): T def apply(lines: Dataset[String]): Dataset[T] = { import lines.sparkSession.implicits._ lines.map(line => createRecord(line.split("|"))) } } case class Person(id: Int,fields(1)) } (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
相关内容
- Scala:过滤器和takeWhile在流上有什么区别?
- angularjs – 如何使用Typescript 1.5和AngulaJS 1.4实现模
- angularjs – Angular / Ionic中可重复使用的模态
- 跳转到Vim中当前行的quickfix或位置列表中的错误(使用Synta
- DNS报文格式解析(非常详细)
- bash:带引号的问题运行命令
- Bourne Shell For i in(seq)
- angularjs Error: have been in digest cycle
- angular – 如何在Material设计中将按钮移动到容器的右侧
- typescript – PrimeNG的数据表没有触发onRowSelect事件