scala – 如何在Spark数据集中舍入一列?
发布时间:2020-12-16 10:06:17 所属栏目:安全 来源:网络整理
导读:使用 Scala Spark,如何使用类型化的数据集API来聚合聚合列? 另外,如何通过groupby操作保留数据集的类型? 这就是我目前拥有的: case class MyRow( k1: String,k2: String,c1: Double,c2: Double)def groupTyped(ds: Dataset[MyRow]): Dataset[MyRow] = {im
使用
Scala Spark,如何使用类型化的数据集API来聚合聚合列?
另外,如何通过groupby操作保留数据集的类型? 这就是我目前拥有的: case class MyRow( k1: String,k2: String,c1: Double,c2: Double ) def groupTyped(ds: Dataset[MyRow]): Dataset[MyRow] = { import org.apache.spark.sql.expressions.scalalang.typed._ ds.groupByKey(row => (row.k1,row.k2)) .agg( avg(_.c1),avg(_.c2) ) .map(r => MyRow(r._1._1,r._1._2,r._2,r._3)) } >如果我用圆形(avg(_.c1))替换avg(_.c1),我会收到类型错误.什么是绕过我的价值观的正确方法? 谢谢! 解决方法
使用round确实在类型错误上失败,因为agg期望TypedColumn类型的聚合函数[IN,OUT]和round提供一个Column(适用于DataFrames).
你需要的是一个舍入平均聚合函数,它没有在org.apache.spark.sql.expressions.scalalang.typed._中提供 – 但你可以通过扩展执行平均值的类来自己创建一个聚合: // Extend TypedAverage - round the result before returning it class TypedRoundAverage[IN](f: IN => Double) extends TypedAverage[IN](f) { override def finish(reduction: (Double,Long)): Double = math.round(super.finish(reduction)) } // A nice wrapper to create the TypedRoundAverage for a given function def roundAvg[IN](f: IN => Double): TypedColumn[IN,Double] = new TypedRoundAverage(f).toColumn // Now you can use "roundAvg" instead of "round" def groupTyped(ds: Dataset[MyRow]): Dataset[MyRow] = { ds.groupByKey(row => (row.k1,row.k2)) .agg( roundAvg(_.c1),roundAvg(_.c2) ) .map { case ((k1,k2),c1,c2) => MyRow(k1,k2,c2) } // just a nicer way to put it } 我无法看到摆脱map操作的方法,因为group-by必然会返回一个元组,但是使用模式匹配可以使它更好一些 (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |