scala – SPARK DataFrame:如何根据相同的列值有效地分割每个组
我有一个DataFrame生成如下:
df.groupBy($"Hour",$"Category") .agg(sum($"value").alias("TotalValue")) .sort($"Hour".asc,$"TotalValue".desc)) 结果如下: +----+--------+----------+ |Hour|Category|TotalValue| +----+--------+----------+ | 0| cat26| 30.9| | 0| cat13| 22.1| | 0| cat95| 19.6| | 0| cat105| 1.3| | 1| cat67| 28.5| | 1| cat4| 26.8| | 1| cat13| 12.6| | 1| cat23| 5.3| | 2| cat56| 39.6| | 2| cat40| 29.7| | 2| cat187| 27.9| | 2| cat68| 9.8| | 3| cat8| 35.6| | ...| ....| ....| +----+--------+----------+ 我想基于col(“Hour”)的每个唯一值创建新的数据帧,即 >对于小时组== 0 所以期望的输出将是: df0 as: +----+--------+----------+ |Hour|Category|TotalValue| +----+--------+----------+ | 0| cat26| 30.9| | 0| cat13| 22.1| | 0| cat95| 19.6| | 0| cat105| 1.3| +----+--------+----------+ df1 as: +----+--------+----------+ |Hour|Category|TotalValue| +----+--------+----------+ | 1| cat67| 28.5| | 1| cat4| 26.8| | 1| cat13| 12.6| | 1| cat23| 5.3| +----+--------+----------+ 同样地, df2 as: +----+--------+----------+ |Hour|Category|TotalValue| +----+--------+----------+ | 2| cat56| 39.6| | 2| cat40| 29.7| | 2| cat187| 27.9| | 2| cat68| 9.8| +----+--------+----------+ 任何帮助都非常感谢. 编辑1: 我尝试过的: df.foreach( row => splitHour(row) ) def splitHour(row: Row) ={ val Hour=row.getAs[Long]("Hour") val HourDF= sparkSession.createDataFrame(List((s"$Hour",1))) val hdf=HourDF.withColumnRenamed("_1","Hour_unique").drop("_2") val mydf: DataFrame =df.join(hdf,df("Hour")===hdf("Hour_unique")) mydf.write.mode("overwrite").parquet(s"/home/dev/shaishave/etc/myparquet/$Hour/") } 这个策略的问题: 在数据帧df上运行需要8个小时,该数据帧df超过100万行,并且在单个节点上为10 GB RAM提供了火花作业.因此,加入正变得非常低效. 警告:我必须将每个数据帧mydf写为镶木地板,它具有需要维护(不展平)的嵌套模式. 解决方法
正如我的评论中所指出的,这个问题的一个可能简单的方法是使用:
df.write.partitionBy("hour").saveAsTable("myparquet") 如上所述,文件夹结构将是myparquet / hour = 1,myparquet / hour = 2,…,myparquet / hour = 24,而不是myparquet / 1,myparquet / 2,myparquet / 24. 要更改文件夹结构,您可以 >可以在显式HiveContext中使用Hive配置设置hcat.dynamic.partitioning.custom.pattern;更多信息,请访问HCatalog DynamicPartitions. 重要的是要注意,通过更改文件夹的命名模式,当您在该文件夹中运行spark.read.parquet(…)时,Spark将不会自动理解动态分区,因为它缺少partitionKey(即Hour)信息. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |