scala – 将Spark DataFrame数据划分为单独的文件
发布时间:2020-12-16 18:09:05 所属栏目:安全 来源:网络整理
导读:我从s3文件输入以下DataFrame,需要将数据转换为以下所需的输出.我使用Spark版本1.5.1和 Scala,但可以用 Python改为Spark.欢迎任何建议. DataFrame输入: name animal datajohn mouse aaaaabob mouse bbbbbbob mouse cccccbob dog ddddd 期望的输出: john/mo
我从s3文件输入以下DataFrame,需要将数据转换为以下所需的输出.我使用Spark版本1.5.1和
Scala,但可以用
Python改为Spark.欢迎任何建议.
DataFrame输入: name animal data john mouse aaaaa bob mouse bbbbb bob mouse ccccc bob dog ddddd 期望的输出: john/mouse/file.csv bob/mouse/file.csv bob/dog/file.csv terminal$cat bob/mouse/file.csv bbbbb ccccc terminal$cat bob/dog/file.csv ddddd 这是我尝试过的现有Spark Scala代码: val sc = new SparkContext(new SparkConf()) val sqlc = new org.apache.spark.sql.SQLContext(sc) val df = sqlc.read.json("raw.gz") val cols = Seq("name","animal") df.groupBy(cols.head,cols.tail: _*).count().take(100).foreach(println) 电流输出: [john,mouse,1] [bob,2] [bob,dog,1] 我现有代码的一些问题是groupBy返回一个GroupedData对象,我可能不想对该数据执行count / sum / agg函数.我正在寻找一种更好的技术来分组和输出数据.数据集非常大. 解决方法
这可以使用DataFrameWriter的partitionBy选项来实现.一般语法如下:
df.write.partitionBy("name","animal").format(...).save(...) 不幸的是,支持Spark 1.5中分区的唯一纯文本格式是JSON. 如果您可以将Spark安装更新为: > 1.6 – 您可以将partitionBy与文本格式一起使用.如果您需要组的单个输出文件(重新分区),则还需要1.6. 我相信在1.5中你最好的选择是将文件写为JSON并转换单个输出文件. 如果不同名称’,’动物的数量很小,您可以尝试为每个组执行单独的写入: val dist = df.select("name","animal").rdd.collect.map { case Row(name: String,animal: String) => (name,animal) } for { (name,animal) <- dist } df.where($"name" === name && $"animal" === animal) .select($"data").write.format("csv").save(s"/prefix/$name/$animal") 但是当组合数量增加时,这不会扩展. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |