scala – 如何使用spark sc.textFile获取文件名?
发布时间:2020-12-16 09:58:30 所属栏目:安全 来源:网络整理
导读:我正在使用以下代码读取文件目录: val data = sc.textFile("/mySource/dir1/*") 现在我的数据rdd包含目录中所有文件的所有行(对吧?) 我现在想要为每行添加一个包含源文件名的列,我该怎么做? 我尝试的其他选项是使用wholeTextFile但我不断出现内存异常. 5
我正在使用以下代码读取文件目录:
val data = sc.textFile("/mySource/dir1/*") 现在我的数据rdd包含目录中所有文件的所有行(对吧?) 我现在想要为每行添加一个包含源文件名的列,我该怎么做? 我尝试的其他选项是使用wholeTextFile但我不断出现内存异常. 解决方法
您可以使用此代码.我用Spark 1.4和1.5进行了测试.
它从inputSplit获取文件名,并使用迭代器使用NewHadoopRDD的mapPartitionsWithInputSplit将其添加到每一行 import org.apache.hadoop.mapreduce.lib.input.{FileSplit,TextInputFormat} import org.apache.spark.rdd.{NewHadoopRDD} import org.apache.spark.{SparkConf,SparkContext} import org.apache.hadoop.io.LongWritable import org.apache.hadoop.io.Text val sc = new SparkContext(new SparkConf().setMaster("local")) val fc = classOf[TextInputFormat] val kc = classOf[LongWritable] val vc = classOf[Text] val path :String = "file:///home/user/test" val text = sc.newAPIHadoopFile(path,fc,kc,vc,sc.hadoopConfiguration) val linesWithFileNames = text.asInstanceOf[NewHadoopRDD[LongWritable,Text]] .mapPartitionsWithInputSplit((inputSplit,iterator) => { val file = inputSplit.asInstanceOf[FileSplit] iterator.map(tup => (file.getPath,tup._2)) } ) linesWithFileNames.foreach(println) (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
相关内容
- shell cp -R -f 目录递归拷贝
- Docker pull无法在127.0.1.1:53上查找index.docker.io(无法
- bash – 在最后一次与sed匹配后附加一行
- vim插件之Vundle
- 1199亿成交额!京东618大捷幕后技术功臣揭秘
- wsdl – 使用svcutil生成WebService代理的问题
- WebService的 HelloWorld : XFire , JAX-WS , JSR181
- angularjs – 如何使用ng-repeat生成的表的UI Bootstrap分页
- Angular2茉莉花单元测试组件与第三方指令
- angularjs – 在元素中设置ng-href