scala – 如何在Spark 2.X数据集中创建自定义编码器?
发布时间:2020-12-16 19:05:33 所属栏目:安全 来源:网络整理
导读:Spark数据集将从Row’s移动到Encoder’s for Pojo / primitives. Catalyst引擎使用ExpressionEncoder来转换SQL表达式中的列.但是,似乎没有其他的Encoder子类可用作我们自己的实现的模板. 以下是Spark 1.X / DataFrames中不能在新版本中编译的代码的示例: //
Spark数据集将从Row’s移动到Encoder’s for Pojo / primitives. Catalyst引擎使用ExpressionEncoder来转换SQL表达式中的列.但是,似乎没有其他的Encoder子类可用作我们自己的实现的模板.
以下是Spark 1.X / DataFrames中不能在新版本中编译的代码的示例: //mapping each row to RDD tuple df.map(row => { var id: String = if (!has_id) "" else row.getAs[String]("id") var label: String = row.getAs[String]("label") val channels : Int = if (!has_channels) 0 else row.getAs[Int]("channels") val height : Int = if (!has_height) 0 else row.getAs[Int]("height") val width : Int = if (!has_width) 0 else row.getAs[Int]("width") val data : Array[Byte] = row.getAs[Any]("data") match { case str: String => str.getBytes case arr: Array[Byte@unchecked] => arr case _ => { log.error("Unsupport value type") null } } (id,label,channels,height,width,data) }).persist(StorageLevel.DISK_ONLY) } 我们得到一个编译器错误 Error:(56,11) Unable to find encoder for type stored in a Dataset. Primitive types (Int,String,etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases. df.map(row => { ^ 那么那么某种方式/某处应该有办法 >定义/实现我们的自定义编码器 我正在寻找成功执行这些步骤的代码. 解决方法
据我所知,从1.6开始,没有什么改变,How to store custom objects in Dataset?中描述的解决方案是唯一可用的选项.然而,您的当前代码应该适用于产品类型的默认编码器.
要了解为什么您的代码在1.x中工作,并且可能无法在2.0.0中运行,您将必须检查签名.在1.x中DataFrame.map是一个使用函数Row => T并将RDD [Row]转换为RDD [T]. 在2.0.0中DataFrame.map采用Row =>类型的函数. T,但是将Dataset [Row](a.k.a DataFrame)转换为Dataset [T],因此T需要一个Encoder.如果你想获得“旧”行为,你应该明确使用RDD: df.rdd.map(row => ???) (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |