加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – Spark RDD默认分区数

发布时间:2020-12-16 18:46:04 所属栏目:安全 来源:网络整理
导读:版本:Spark 1.6.2,Scala 2.10 我正在执行下面的命令在spark-shell中. 我试图查看Spark默认创建的分区数. val rdd1 = sc.parallelize(1 to 10)println(rdd1.getNumPartitions) // == Result is 4//Creating rdd for the local file test1.txt. It is not HDF
版本:Spark 1.6.2,Scala 2.10

我正在执行下面的命令在spark-shell中.
我试图查看Spark默认创建的分区数.

val rdd1 = sc.parallelize(1 to 10)
println(rdd1.getNumPartitions) // ==> Result is 4

//Creating rdd for the local file test1.txt. It is not HDFS.
//File content is just one word "Hello"
val rdd2 = sc.textFile("C:/test1.txt")
println(rdd2.getNumPartitions) // ==> Result is 2

根据Apache Spark documentation,spark.default.parallelism是我的笔记本电脑中的核心数(2核心处理器).

我的问题是:rdd2似乎正在给出2个分区的正确结果,如文档中所述.但是为什么rdd1将结果作为4个分区?

解决方法

最小分区数实际上是SparkContext设置的下限,但由于它在底层使用Hadoop,Hadoop InputFormat仍然是默认行为.

第一种情况应该反映出here所述的defaultParallelism,它可能会有所不同,具体取决于设置和硬件. (核心数量等)

因此,除非您提供切片数量,否则第一种情况将由sc.defaultParallelism描述的数字定义:

scala> sc.defaultParallelism
res0: Int = 6

scala> sc.parallelize(1 to 100).partitions.size
res1: Int = 6

至于第二种情况,使用sc.textFile,默认情况下切片的数量是最小分区数.它在this section of code中可以看到等于2.

您应该考虑以下事项:

> sc.parallelize将采用numSlices或defaultParallelism.
> sc.textFile将取minPartition和基于hadoop输入分割大小除以块大小计算的分割数之间的最大值.

> sc.textFile调用sc.hadoopFile,它创建了一个在引擎盖下使用InputFormat.getSplits的HadoopRDD [Ref. InputFormat documentation].
>

InputSplit[] getSplits(JobConf job,int numSplits) throws IOException : Logically split the set of input files for the job.
Each InputSplit is then assigned to an individual Mapper for processing.
Note: The split is a logical split of the inputs and the input files are not physically split into chunks. For e.g. a split could be tuple. Parameters: job – job configuration.
numSplits – the desired number of splits,a hint. Returns: an array of InputSplits for the job. Throws: IOException.

示范:

让我们创建一些虚拟文本文件:

fallocate -l 241m bigfile.txt
fallocate -l 4G hugefile.txt

这将分别创建大小为241MB和4GB的2个文件.

我们可以看到当我们阅读文件时会发生什么:

scala> val rdd = sc.textFile("bigfile.txt")
// rdd: org.apache.spark.rdd.RDD[String] = bigfile.txt MapPartitionsRDD[1] at textFile at <console>:27

scala> rdd.getNumPartitions
// res0: Int = 8

scala> val rdd2 = sc.textFile("hugefile.txt")
// rdd2: org.apache.spark.rdd.RDD[String] = hugefile.txt MapPartitionsRDD[3] at textFile at <console>:27

scala> rdd2.getNumPartitions
// res1: Int = 128

它们都是HadoopRDDs:

scala> rdd.toDebugString
// res2: String = 
// (8) bigfile.txt MapPartitionsRDD[1] at textFile at <console>:27 []
//  |  bigfile.txt HadoopRDD[0] at textFile at <console>:27 []

scala> rdd2.toDebugString
// res3: String = 
// (128) hugefile.txt MapPartitionsRDD[3] at textFile at <console>:27 []
//   |   hugefile.txt HadoopRDD[2] at textFile at <console>:27 []

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读