加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

Scala-Spark使用参数值动态调用groupby和agg

发布时间:2020-12-16 09:27:05 所属栏目:安全 来源:网络整理
导读:我想编写一个自定义分组和聚合函数来获取用户指定的列名和用户指定的聚合映射.我不知道前面的列名和聚合映射.我想写一个类似下面的函数.但我是Scala的新手,我无法解决它. def groupAndAggregate(df: DataFrame,aggregateFun: Map[String,String],cols: List[
我想编写一个自定义分组和聚合函数来获取用户指定的列名和用户指定的聚合映射.我不知道前面的列名和聚合映射.我想写一个类似下面的函数.但我是Scala的新手,我无法解决它.

def groupAndAggregate(df: DataFrame,aggregateFun: Map[String,String],cols: List[String] ): DataFrame ={
  val grouped = df.groupBy(cols)
  val aggregated = grouped.agg(aggregateFun)
  aggregated.show()
}

并希望称之为

val listOfStrings =  List("A","B","C")
val result = groupAndAggregate(df,Map("D"-> "SUM","E"-> "COUNT"),listOfStrings)

我怎样才能做到这一点?
任何人都可以帮助我.

解决方法

您的代码几乎是正确的 – 有两个问题:

>函数的返回类型是DataFrame,但最后一行是aggregated.show(),它返回Unit.删除对show的调用以返回聚合本身,或者只是立即返回agg的结果
> DataFrame.groupBy期望参数如下:col1:String,cols:String * – 所以你需要传递匹配的参数:第一列,然后是其余的列作为参数列表,你可以这样做: df.groupBy(cols.head,cols.tail:_ *)

总而言之,您的功能将是:

def groupAndAggregate(df: DataFrame,cols: List[String] ): DataFrame ={
  val grouped = df.groupBy(cols.head,cols.tail: _*)
  val aggregated = grouped.agg(aggregateFun)
  aggregated
}

或者,类似的较短版本:

def groupAndAggregate(df: DataFrame,cols: List[String] ): DataFrame = {
  df.groupBy(cols.head,cols.tail: _*).agg(aggregateFun)
}

如果你想在你的函数中调用show:

def groupAndAggregate(df: DataFrame,cols.tail: _*)
  val aggregated = grouped.agg(aggregateFun)
  aggregated.show()
  aggregated
}

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读