加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 如何在Spark数据集中舍入一列?

发布时间:2020-12-16 10:06:17 所属栏目:安全 来源:网络整理
导读:使用 Scala Spark,如何使用类型化的数据集API来聚合聚合列? 另外,如何通过groupby操作保留数据集的类型? 这就是我目前拥有的: case class MyRow( k1: String,k2: String,c1: Double,c2: Double)def groupTyped(ds: Dataset[MyRow]): Dataset[MyRow] = {im
使用 Scala Spark,如何使用类型化的数据集API来聚合聚合列?

另外,如何通过groupby操作保留数据集的类型?

这就是我目前拥有的:

case class MyRow(
  k1: String,k2: String,c1: Double,c2: Double
)

def groupTyped(ds: Dataset[MyRow]): Dataset[MyRow] = {
import org.apache.spark.sql.expressions.scalalang.typed._
ds.groupByKey(row => (row.k1,row.k2))
  .agg(
    avg(_.c1),avg(_.c2)
  )
  .map(r => MyRow(r._1._1,r._1._2,r._2,r._3))
}

>如果我用圆形(avg(_.c1))替换avg(_.c1),我会收到类型错误.什么是绕过我的价值观的正确方法?
> .map(…)行感觉不对 – 是否有更优雅的方式来保留我的数据集类型?

谢谢!

解决方法

使用round确实在类型错误上失败,因为agg期望TypedColumn类型的聚合函数[IN,OUT]和round提供一个Column(适用于DataFrames).

你需要的是一个舍入平均聚合函数,它没有在org.apache.spark.sql.expressions.scalalang.typed._中提供 – 但你可以通过扩展执行平均值的类来自己创建一个聚合:

// Extend TypedAverage - round the result before returning it
class TypedRoundAverage[IN](f: IN => Double) extends TypedAverage[IN](f) {
  override def finish(reduction: (Double,Long)): Double = math.round(super.finish(reduction))
}

// A nice wrapper to create the TypedRoundAverage for a given function  
def roundAvg[IN](f: IN => Double): TypedColumn[IN,Double] = new TypedRoundAverage(f).toColumn

// Now you can use "roundAvg" instead of "round"  
def groupTyped(ds: Dataset[MyRow]): Dataset[MyRow] = {
  ds.groupByKey(row => (row.k1,row.k2))
    .agg(
      roundAvg(_.c1),roundAvg(_.c2)
    )
    .map { case ((k1,k2),c1,c2) => MyRow(k1,k2,c2) } // just a nicer way to put it
}

我无法看到摆脱map操作的方法,因为group-by必然会返回一个元组,但是使用模式匹配可以使它更好一些

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读