加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – Spark Dataframes-按键减少

发布时间:2020-12-16 18:19:07 所属栏目:安全 来源:网络整理
导读:假设我有一个这样的数据结构,其中ts是一些时间戳 case class Record(ts: Long,id: Int,value: Int) 鉴于大量这些记录,我希望最终得到每个id具有最高时间戳的记录.使用RDD api我认为以下代码完成了工作: def findLatest(records: RDD[Record])(implicit spar
假设我有一个这样的数据结构,其中ts是一些时间戳

case class Record(ts: Long,id: Int,value: Int)

鉴于大量这些记录,我希望最终得到每个id具有最高时间戳的记录.使用RDD api我认为以下代码完成了工作:

def findLatest(records: RDD[Record])(implicit spark: SparkSession) = {
  records.keyBy(_.id).reduceByKey{
    (x,y) => if(x.ts > y.ts) x else y
  }.values
}

同样,这是我对数据集的尝试:

def findLatest(records: Dataset[Record])(implicit spark: SparkSession) = {
  records.groupByKey(_.id).mapGroups{
    case(id,records) => {
      records.reduceLeft((x,y) => if (x.ts > y.ts) x else y)
    }
  }
}

我正在尝试研究如何使用数据框来实现类似的东西,但无济于事 – 我意识到我可以使用以下方法进行分组:

records.groupBy($"id")

但是这给了我一个RelationGroupedDataSet,我不清楚我需要编写什么聚合函数来实现我想要的 – 我看到的所有示例聚合似乎都集中在返回只聚合的一列而不是整行.

是否可以使用数据框来实现这一目标?

解决方法

您可以使用argmax逻辑(参见 databricks example)

例如,假设您的数据框名为df,并且它具有列id,val,ts您将执行以下操作:

import org.apache.spark.sql.functions._
val newDF = df.groupBy('id).agg.max(struct('ts,'val)) as 'tmp).select($"id",$"tmp.*")

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读