加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 最简单程序的大任务大小

发布时间:2020-12-16 10:00:23 所属栏目:安全 来源:网络整理
导读:我试图用Spark运行最简单的程序 import org.apache.spark.{SparkContext,SparkConf}object LargeTaskTest { def main(args: Array[String]) { val conf = new SparkConf().setAppName("DataTest").setMaster("local[*]") val sc = new SparkContext(conf) va
我试图用Spark运行最简单的程序

import org.apache.spark.{SparkContext,SparkConf}

object LargeTaskTest {

  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("DataTest").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val dat = (1 to 10000000).toList
    val data = sc.parallelize(dat).cache()
    for(i <- 1 to 100){
      println(data.reduce(_ + _))
    }
  }   
}

每次迭代后,我收到以下错误消息:

WARN TaskSetManager: Stage 0 contains a task of very large size (9767
KB). The maximum recommended task size is 100 KB.

增加数据大小会增加所述任务大小.这告诉我,驱动程序正在向所有执行程序发送“dat”对象,但我不能为我的生活看到原因,因为我的RDD上的唯一操作是reduce,它基本上没有关闭.有任何想法吗 ?

解决方法

因为您首先在本地创建非常大的列表,所以Spark parallelize方法尝试将此列表作为单个单元发送给Spark工作器,作为任务的一部分.因此,您收到警告信息.作为替代方案,您可以并行化一个小得多的列表,然后使用flatMap将其分解为更大的列表.这也有利于并行创建更大的数字集.例如:

import org.apache.spark.{SparkContext,SparkConf}

object LargeTaskTest extends App {

  val conf = new SparkConf().setAppName("DataTest").setMaster("local[*]")
  val sc = new SparkContext(conf)
  val dat = (0 to 99).toList
  val data = sc.parallelize(dat).cache().flatMap(i => (1 to 1000000).map(j => j * 100 + i))
  println(data.count()) //100000000
  println(data.reduce(_ + _))
  sc.stop()
}

编辑:

最终,并行化的本地集合必须被推送给执行者. parallelize方法创建ParallelCollectionRDD的实例:

def parallelize[T: ClassTag](
      seq: Seq[T],numSlices: Int = defaultParallelism): RDD[T] = withScope {
    assertNotStopped()
    new ParallelCollectionRDD[T](this,seq,numSlices,Map[Int,Seq[String]]())
  }

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L730

ParallelCollectionRDD创建了一些等于numSlices的分区:

override def getPartitions: Array[Partition] = {
    val slices = ParallelCollectionRDD.slice(data,numSlices).toArray
    slices.indices.map(i => new ParallelCollectionPartition(id,i,slices(i))).toArray
  }

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala#L96

numSlices默认为sc.defaultParallelism,它在我的机器上是4.所以即使拆分,每个分区都包含一个非常大的列表,需要将其推送到执行程序.

SparkContext.parallelize包含注释@note Parallelize行为懒惰,ParallelCollectionRDD包含注释;

// TODO: Right now,each split sends along its full data,even if
later down the RDD chain it gets // cached. It might be worthwhile
to write the data to a file in the DFS and read it in the split //
instead.

所以看起来当你调用reduce时会出现问题,因为这是分区被发送到执行程序的重点,但根本原因是你在一个非常大的列表上调用parallelize.在执行器中生成大型列表是一种更好的方法,恕我直言.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读