scala – 如何在Apache Spark中读取包含多个文件的zip
发布时间:2020-12-16 18:55:25 所属栏目:安全 来源:网络整理
导读:我有一个包含多个文本文件的Zipped文件. 我想读取每个文件并构建一个包含每个文件内容的RDD列表. val test = sc.textFile("/Volumes/work/data/kaggle/dato/test/5.zip") 将只是整个文件,但如何遍历zip的每个内容,然后使用Spark将其保存在RDD中. 我对Scala或
我有一个包含多个文本文件的Zipped文件.
我想读取每个文件并构建一个包含每个文件内容的RDD列表. val test = sc.textFile("/Volumes/work/data/kaggle/dato/test/5.zip") 将只是整个文件,但如何遍历zip的每个内容,然后使用Spark将其保存在RDD中. 我对Scala或Python很好. Python中使用Spark的可能解决方案 – archive = zipfile.ZipFile(archive_path,'r') file_paths = zipfile.ZipFile.namelist(archive) for file_path in file_paths: urls = file_path.split("/") urlId = urls[-1].split('_')[0] 解决方法
Apache Spark默认压缩支持
我在其他答案中写了所有必要的理论,你可能想参考:https://stackoverflow.com/a/45958182/1549135 读取包含多个文件的zip 我遵循了@Herman给出的建议并使用了ZipInputStream.这给了我这个解决方案,它返回了zip内容的RDD [String]. import java.io.{BufferedReader,InputStreamReader} import java.util.zip.ZipInputStream import org.apache.spark.SparkContext import org.apache.spark.input.PortableDataStream import org.apache.spark.rdd.RDD implicit class ZipSparkContext(val sc: SparkContext) extends AnyVal { def readFile(path: String,minPartitions: Int = sc.defaultMinPartitions): RDD[String] = { if (path.endsWith(".zip")) { sc.binaryFiles(path,minPartitions) .flatMap { case (name: String,content: PortableDataStream) => val zis = new ZipInputStream(content.open) Stream.continually(zis.getNextEntry) .takeWhile { case null => zis.close(); false case _ => true } .flatMap { _ => val br = new BufferedReader(new InputStreamReader(zis)) Stream.continually(br.readLine()).takeWhile(_ != null) } } } else { sc.textFile(path,minPartitions) } } } 只需通过导入隐式类并在SparkContext上调用readFile方法来使用它: import com.github.atais.spark.Implicits.ZipSparkContext sc.readFile(path) (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |