scala – Spark:如何将RDD的Seq转换为RDD
发布时间:2020-12-16 09:50:39 所属栏目:安全 来源:网络整理
导读:我刚刚开始使用Spark斯卡拉 我有一个包含多个文件的目录 我成功加载它们 sc.wholeTextFiles(directory) 现在我想升级一级.我实际上有一个目录,其中包含包含文件的子目录.我的目标是获得一个RDD [(String,String)],这样我就可以继续前进,其中RDD代表文件的名
我刚刚开始使用Spark&斯卡拉
我有一个包含多个文件的目录 sc.wholeTextFiles(directory) 现在我想升级一级.我实际上有一个目录,其中包含包含文件的子目录.我的目标是获得一个RDD [(String,String)],这样我就可以继续前进,其中RDD代表文件的名称和内容. 我尝试了以下方法: val listOfFolders = getListOfSubDirectories(rootFolder) val input = listOfFolders.map(directory => sc.wholeTextFiles(directory)) 但是我得到了一个Seq [RDD [(String,String)]] 或者也许我做得不对,我应该尝试不同的方法? 编辑:添加代码 // HADOOP VERSION val rootFolderHDFS = "hdfs://****/" val hdfsURI = "hdfs://****/**/" // returns a list of folders (currently about 800) val listOfFoldersHDFS = ListDirectoryContents.list(hdfsURI,rootFolderHDFS) val inputHDFS = listOfFoldersHDFS.map(directory => sc.wholeTextFiles(directory)) // RDD[(String,String)] // val inputHDFS2 = inputHDFS.reduceRight((rdd1,rdd2) => rdd2 ++ rdd1) val init = sc.parallelize(Array[(String,String)]()) val inputHDFS2 = inputHDFS.foldRight(init)((rdd1,rdd2) => rdd2 ++ rdd1) // returns org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: java.lang.StackOverflowError println(inputHDFS2.count) 解决方法
您可以只使用路径通配符将所有目录加载到单个RDD中,而不是将每个目录加载到单独的RDD中吗?
给定以下目录树… $tree test/spark/so test/spark/so ├── a │?? ├── text1.txt │?? └── text2.txt └── b ├── text1.txt └── text2.txt 使用目录的通配符创建RDD. scala> val rdd = sc.wholeTextFiles("test/spark/so/*/*") rdd: org.apache.spark.rdd.RDD[(String,String)] = test/spark/so/*/ WholeTextFileRDD[16] at wholeTextFiles at <console>:37 如你所料,伯爵是4. scala> rdd.count res9: Long = 4 scala> rdd.collect res10: Array[(String,String)] = Array((test/spark/so/a/text1.txt,a1 a2 a3),(test/spark/so/a/text2.txt,a3 a4 a5),(test/spark/so/b/text1.txt,b1 b2 b3),(test/spark/so/b/text2.txt,b3 b4 b5)) (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |