加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – Spark DataFrame:对组进行操作

发布时间:2020-12-16 18:47:14 所属栏目:安全 来源:网络整理
导读:我有一个我正在操作的DataFrame,我希望按一组列进行分组,并在其余列上按组操作.在常规的RDD-land中我认为它看起来像这样: rdd.map( tup = ((tup._1,tup._2,tup._3),tup) ). groupByKey(). forEachPartition( iter = doSomeJob(iter) ) 在DataFrame-land中我
我有一个我正在操作的DataFrame,我希望按一组列进行分组,并在其余列上按组操作.在常规的RDD-land中我认为它看起来像这样:

rdd.map( tup => ((tup._1,tup._2,tup._3),tup) ).
  groupByKey().
  forEachPartition( iter => doSomeJob(iter) )

在DataFrame-land中我会这样开始:

df.groupBy("col1","col2","col3")  // Reference by name

但是如果我的操作比GroupedData提供的平均/最小/最大/计数更复杂,那么我不确定如何操作组.

例如,我想构建一个MongoDB文档per(“col1”,“col2”,“col3”)组(通过迭代组中的相关行),缩小到N个分区,然后将文档插入到MongoDB数据库. N limit是我想要的最大同时连接数.

有什么建议?

解决方法

你可以自我加入.首先得到小组:

val groups = df.groupBy($"col1",$"col2",$"col3").agg($"col1",$"col3")

然后,您可以将其加入到原始DataFrame中:

val joinedDF = groups
  .select($"col1" as "l_col1",$"col2" as "l_col2",$"col3" as "l_col3)
  .join(df,$"col1" <=> $"l_col1" and $"col2" <=> $"l_col2" and  $"col3" <=> $"l_col3")

虽然这样可以获得与原始数据完全相同的数据(以及3个额外的冗余列),但您可以执行另一个连接以添加具有与该行关联的(col1,col2,col3)组的MongoDB文档ID的列.

无论如何,根据我的经验,连接和自连接是处理DataFrames中复杂内容的方式.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读