加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 如何使用spark sc.textFile获取文件名?

发布时间:2020-12-16 09:58:30 所属栏目:安全 来源:网络整理
导读:我正在使用以下代码读取文件目录: val data = sc.textFile("/mySource/dir1/*") 现在我的数据rdd包含目录中所有文件的所有行(对吧?) 我现在想要为每行添加一个包含源文件名的列,我该怎么做? 我尝试的其他选项是使用wholeTextFile但我不断出现内存异常. 5
我正在使用以下代码读取文件目录:

val data = sc.textFile("/mySource/dir1/*")

现在我的数据rdd包含目录中所有文件的所有行(对吧?)

我现在想要为每行添加一个包含源文件名的列,我该怎么做?

我尝试的其他选项是使用wholeTextFile但我不断出现内存异常.
5台服务器24核24 GB(执行器 – 核心5执行器 – 内存5G)
有任何想法吗?

解决方法

您可以使用此代码.我用Spark 1.4和1.5进行了测试.

它从inputSplit获取文件名,并使用迭代器使用NewHadoopRDD的mapPartitionsWithInputSplit将其添加到每一行

import org.apache.hadoop.mapreduce.lib.input.{FileSplit,TextInputFormat}
import org.apache.spark.rdd.{NewHadoopRDD}
import org.apache.spark.{SparkConf,SparkContext}
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text

val sc = new SparkContext(new SparkConf().setMaster("local"))

val fc = classOf[TextInputFormat]
val kc = classOf[LongWritable]
val vc = classOf[Text]

val path :String = "file:///home/user/test"
val text = sc.newAPIHadoopFile(path,fc,kc,vc,sc.hadoopConfiguration)

val linesWithFileNames = text.asInstanceOf[NewHadoopRDD[LongWritable,Text]]
           .mapPartitionsWithInputSplit((inputSplit,iterator) => {
  val file = inputSplit.asInstanceOf[FileSplit]
  iterator.map(tup => (file.getPath,tup._2))
  }
)

linesWithFileNames.foreach(println)

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读