加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 如何在Spark 2.1中保存分区的镶木地板文件?

发布时间:2020-12-16 19:18:42 所属栏目:安全 来源:网络整理
导读:我试图测试如何使用Spark 2.1在HDFS 2.7中写入数据.我的数据是一个简单的虚拟值序列,输出应该由属性:id和key分区. // Simple case class to cast the data case class SimpleTest(id:String,value1:Int,value2:Float,key:Int) // Actual data to be stored
我试图测试如何使用Spark 2.1在HDFS 2.7中写入数据.我的数据是一个简单的虚拟值序列,输出应该由属性:id和key分区.

// Simple case class to cast the data
 case class SimpleTest(id:String,value1:Int,value2:Float,key:Int)

 // Actual data to be stored
 val testData = Seq(
    SimpleTest("test",12,13.5.toFloat,1),SimpleTest("test",2),3),SimpleTest("simple",3)
 )

 // Spark's workflow to distribute,partition and store
 // sc and sql are the SparkContext and SparkSession,respectively
 val testDataP = sc.parallelize(testData,6)
 val testDf = sql.createDataFrame(testDataP).toDF("id","value1","value2","key")
 testDf.write.partitionBy("id","key").parquet("/path/to/file")

我期望在HDFS中获得以下树结构:

- /path/to/file
   |- /id=test/key=1/part-01.parquet
   |- /id=test/key=2/part-02.parquet
   |- /id=test/key=3/part-03.parquet
   |- /id=simple/key=1/part-04.parquet
   |- /id=simple/key=2/part-05.parquet
   |- /id=simple/key=3/part-06.parquet

但是当我运行前面的代码时,我得到以下输出:

/path/to/file/id=/key=24/
 |-/part-01.parquet
 |-/part-02.parquet
 |-/part-03.parquet
 |-/part-04.parquet
 |-/part-05.parquet
 |-/part-06.parquet

我不知道代码中是否有什么问题,或者Spark有没有其他的东西.

我正在执行spark-submit,如下所示:

spark-submit –name APP –master local –driver-memory 30G –executor-memory 30G –executor-cores 8 –num-executors 8 –conf spark.io.compression.codec=lzf –conf spark.akka.frameSize=1024 –conf spark.driver.maxResultSize=1g –conf spark.sql.orc.compression.codec=uncompressed –conf spark.sql.parquet.filterPushdown=true –class myClass myFatJar.jar

解决方法

有趣的是……好吧……“它对我有用”.

当您使用Spark 2.1中的SimpleTest案例类描述数据集时,您将导入spark.implicits._ away以获得类型化的数据集.

就我而言,spark是sql.

换句话说,您不必创建testDataP和testDf(使用sql.createDataFrame).

import spark.implicits._
...
val testDf = testData.toDS
testDf.write.partitionBy("id","key").parquet("/path/to/file")

在另一个终端(保存到/ tmp / testDf目录后):

$tree /tmp/testDf/
/tmp/testDf/
├── _SUCCESS
├── id=simple
│?? ├── key=1
│?? │?? └── part-00003-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet
│?? ├── key=2
│?? │?? └── part-00004-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet
│?? └── key=3
│??     └── part-00005-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet
└── id=test
    ├── key=1
    │?? └── part-00000-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet
    ├── key=2
    │?? └── part-00001-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet
    └── key=3
        └── part-00002-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet

8 directories,7 files

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读