scala – Spark DataFrame:对组进行操作
发布时间:2020-12-16 18:47:14 所属栏目:安全 来源:网络整理
导读:我有一个我正在操作的DataFrame,我希望按一组列进行分组,并在其余列上按组操作.在常规的RDD-land中我认为它看起来像这样: rdd.map( tup = ((tup._1,tup._2,tup._3),tup) ). groupByKey(). forEachPartition( iter = doSomeJob(iter) ) 在DataFrame-land中我
我有一个我正在操作的DataFrame,我希望按一组列进行分组,并在其余列上按组操作.在常规的RDD-land中我认为它看起来像这样:
rdd.map( tup => ((tup._1,tup._2,tup._3),tup) ). groupByKey(). forEachPartition( iter => doSomeJob(iter) ) 在DataFrame-land中我会这样开始: df.groupBy("col1","col2","col3") // Reference by name 但是如果我的操作比GroupedData提供的平均/最小/最大/计数更复杂,那么我不确定如何操作组. 例如,我想构建一个MongoDB文档per(“col1”,“col2”,“col3”)组(通过迭代组中的相关行),缩小到N个分区,然后将文档插入到MongoDB数据库. N limit是我想要的最大同时连接数. 有什么建议? 解决方法
你可以自我加入.首先得到小组:
val groups = df.groupBy($"col1",$"col2",$"col3").agg($"col1",$"col3") 然后,您可以将其加入到原始DataFrame中: val joinedDF = groups .select($"col1" as "l_col1",$"col2" as "l_col2",$"col3" as "l_col3) .join(df,$"col1" <=> $"l_col1" and $"col2" <=> $"l_col2" and $"col3" <=> $"l_col3") 虽然这样可以获得与原始数据完全相同的数据(以及3个额外的冗余列),但您可以执行另一个连接以添加具有与该行关联的(col1,col2,col3)组的MongoDB文档ID的列. 无论如何,根据我的经验,连接和自连接是处理DataFrames中复杂内容的方式. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |