scala – Spark Dataframes-按键减少
发布时间:2020-12-16 18:19:07 所属栏目:安全 来源:网络整理
导读:假设我有一个这样的数据结构,其中ts是一些时间戳 case class Record(ts: Long,id: Int,value: Int) 鉴于大量这些记录,我希望最终得到每个id具有最高时间戳的记录.使用RDD api我认为以下代码完成了工作: def findLatest(records: RDD[Record])(implicit spar
假设我有一个这样的数据结构,其中ts是一些时间戳
case class Record(ts: Long,id: Int,value: Int) 鉴于大量这些记录,我希望最终得到每个id具有最高时间戳的记录.使用RDD api我认为以下代码完成了工作: def findLatest(records: RDD[Record])(implicit spark: SparkSession) = { records.keyBy(_.id).reduceByKey{ (x,y) => if(x.ts > y.ts) x else y }.values } 同样,这是我对数据集的尝试: def findLatest(records: Dataset[Record])(implicit spark: SparkSession) = { records.groupByKey(_.id).mapGroups{ case(id,records) => { records.reduceLeft((x,y) => if (x.ts > y.ts) x else y) } } } 我正在尝试研究如何使用数据框来实现类似的东西,但无济于事 – 我意识到我可以使用以下方法进行分组: records.groupBy($"id") 但是这给了我一个RelationGroupedDataSet,我不清楚我需要编写什么聚合函数来实现我想要的 – 我看到的所有示例聚合似乎都集中在返回只聚合的一列而不是整行. 是否可以使用数据框来实现这一目标? 解决方法
您可以使用argmax逻辑(参见
databricks example)
例如,假设您的数据框名为df,并且它具有列id,val,ts您将执行以下操作: import org.apache.spark.sql.functions._ val newDF = df.groupBy('id).agg.max(struct('ts,'val)) as 'tmp).select($"id",$"tmp.*") (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
推荐文章
站长推荐
热点阅读