加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – SPARK DataFrame:如何根据相同的列值有效地分割每个组

发布时间:2020-12-16 10:05:56 所属栏目:安全 来源:网络整理
导读:我有一个DataFrame生成如下: df.groupBy($"Hour",$"Category") .agg(sum($"value").alias("TotalValue")) .sort($"Hour".asc,$"TotalValue".desc)) 结果如下: +----+--------+----------+|Hour|Category|TotalValue|+----+--------+----------+| 0| cat26|
我有一个DataFrame生成如下:

df.groupBy($"Hour",$"Category")
  .agg(sum($"value").alias("TotalValue"))
  .sort($"Hour".asc,$"TotalValue".desc))

结果如下:

+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
|   0|   cat26|      30.9|
|   0|   cat13|      22.1|
|   0|   cat95|      19.6|
|   0|  cat105|       1.3|
|   1|   cat67|      28.5|
|   1|    cat4|      26.8|
|   1|   cat13|      12.6|
|   1|   cat23|       5.3|
|   2|   cat56|      39.6|
|   2|   cat40|      29.7|
|   2|  cat187|      27.9|
|   2|   cat68|       9.8|
|   3|    cat8|      35.6|
| ...|    ....|      ....|
+----+--------+----------+

我想基于col(“Hour”)的每个唯一值创建新的数据帧,即

>对于小时组== 0
>对于小时组== 1
>对于小时组== 2
等等…

所以期望的输出将是:

df0 as:

+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
|   0|   cat26|      30.9|
|   0|   cat13|      22.1|
|   0|   cat95|      19.6|
|   0|  cat105|       1.3|
+----+--------+----------+

df1 as:
+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
|   1|   cat67|      28.5|
|   1|    cat4|      26.8|
|   1|   cat13|      12.6|
|   1|   cat23|       5.3|
+----+--------+----------+

同样地,

df2 as:

+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
|   2|   cat56|      39.6|
|   2|   cat40|      29.7|
|   2|  cat187|      27.9|
|   2|   cat68|       9.8|
+----+--------+----------+

任何帮助都非常感谢.

编辑1:

我尝试过的:

df.foreach(
  row => splitHour(row)
  )

def splitHour(row: Row) ={
    val Hour=row.getAs[Long]("Hour")

    val HourDF= sparkSession.createDataFrame(List((s"$Hour",1)))

    val hdf=HourDF.withColumnRenamed("_1","Hour_unique").drop("_2")

    val mydf: DataFrame =df.join(hdf,df("Hour")===hdf("Hour_unique"))

    mydf.write.mode("overwrite").parquet(s"/home/dev/shaishave/etc/myparquet/$Hour/")
  }

这个策略的问题:

在数据帧df上运行需要8个小时,该数据帧df超过100万行,并且在单个节点上为10 GB RAM提供了火花作业.因此,加入正变得非常低效.

警告:我必须将每个数据帧mydf写为镶木地板,它具有需要维护(不展平)的嵌套模式.

解决方法

正如我的评论中所指出的,这个问题的一个可能简单的方法是使用:

df.write.partitionBy("hour").saveAsTable("myparquet")

如上所述,文件夹结构将是myparquet / hour = 1,myparquet / hour = 2,…,myparquet / hour = 24,而不是myparquet / 1,myparquet / 2,myparquet / 24.

要更改文件夹结构,您可以

>可以在显式HiveContext中使用Hive配置设置hcat.dynamic.partitioning.custom.pattern;更多信息,请访问HCatalog DynamicPartitions.
>另一种方法是在执行df.write.partitionBy.saveAsTable(…)命令之后直接更改文件系统,例如f in *;做mv $f ${f / ${f:0:5} /};完成后会删除文件夹名称中的Hour =文本.

重要的是要注意,通过更改文件夹的命名模式,当您在该文件夹中运行spark.read.parquet(…)时,Spark将不会自动理解动态分区,因为它缺少partitionKey(即Hour)信息.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读