加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 将Spark DataFrame数据划分为单独的文件

发布时间:2020-12-16 18:09:05 所属栏目:安全 来源:网络整理
导读:我从s3文件输入以下DataFrame,需要将数据转换为以下所需的输出.我使用Spark版本1.5.1和 Scala,但可以用 Python改为Spark.欢迎任何建议. DataFrame输入: name animal datajohn mouse aaaaabob mouse bbbbbbob mouse cccccbob dog ddddd 期望的输出: john/mo
我从s3文件输入以下DataFrame,需要将数据转换为以下所需的输出.我使用Spark版本1.5.1和 Scala,但可以用 Python改为Spark.欢迎任何建议.

DataFrame输入:

name    animal   data
john    mouse    aaaaa
bob     mouse    bbbbb
bob     mouse    ccccc
bob     dog      ddddd

期望的输出:

john/mouse/file.csv
bob/mouse/file.csv
bob/dog/file.csv

terminal$cat bob/mouse/file.csv
bbbbb
ccccc

terminal$cat bob/dog/file.csv
ddddd

这是我尝试过的现有Spark Scala代码:

val sc = new SparkContext(new SparkConf())
val sqlc = new org.apache.spark.sql.SQLContext(sc)
val df = sqlc.read.json("raw.gz")
val cols = Seq("name","animal")
df.groupBy(cols.head,cols.tail: _*).count().take(100).foreach(println)

电流输出:

[john,mouse,1]
[bob,2]
[bob,dog,1]

我现有代码的一些问题是groupBy返回一个GroupedData对象,我可能不想对该数据执行count / sum / agg函数.我正在寻找一种更好的技术来分组和输出数据.数据集非常大.

解决方法

这可以使用DataFrameWriter的partitionBy选项来实现.一般语法如下:

df.write.partitionBy("name","animal").format(...).save(...)

不幸的是,支持Spark 1.5中分区的唯一纯文本格式是JSON.

如果您可以将Spark安装更新为:

> 1.6 – 您可以将partitionBy与文本格式一起使用.如果您需要组的单个输出文件(重新分区),则还需要1.6.
> 2.0 – 您可以将partitionBy与csv格式一起使用.

我相信在1.5中你最好的选择是将文件写为JSON并转换单个输出文件.

如果不同名称’,’动物的数量很小,您可以尝试为每个组执行单独的写入:

val dist = df.select("name","animal").rdd.collect.map {
  case Row(name: String,animal: String) => (name,animal)
}

for {
  (name,animal) <- dist
} df.where($"name" === name && $"animal" === animal)
    .select($"data").write.format("csv").save(s"/prefix/$name/$animal")

但是当组合数量增加时,这不会扩展.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读