scala – SparkError:XXXX任务的序列化结果总大小(2.0 GB)大于s
错误:
ERROR TaskSetManager: Total size of serialized results of XXXX tasks (2.0 GB) is bigger than spark.driver.maxResultSize (2.0 GB) 目标:获取使用该模型的所有用户的建议,并与每个用户测试数据重叠并生成重叠率. 我使用spark mllib构建了一个推荐模型.我评估每个用户的测试数据和每个用户的推荐项目的重叠比率,并生成平均重叠率. def overlapRatio(model: MatrixFactorizationModel,test_data: org.apache.spark.rdd.RDD[Rating]): Double = { val testData: RDD[(Int,Iterable[Int])] = test_data.map(r => (r.user,r.product)).groupByKey val n = testData.count val recommendations: RDD[(Int,Array[Int])] = model.recommendProductsForUsers(20) .mapValues(_.map(r => r.product)) val overlaps = testData.join(recommendations).map(x => { val moviesPerUserInRecs = x._2._2.toSet val moviesPerUserInTest = x._2._1.toSet val localHitRatio = moviesPerUserInRecs.intersect(moviesPerUserInTest) if(localHitRatio.size > 0) 1 else 0 }).filter(x => x != 0).count var r = 0.0 if (overlaps != 0) r = overlaps / n return r } 但问题是它最终会抛出maxResultSize错误.在我的spark配置中,我做了以下操作以增加maxResultSize. val conf = new SparkConf() conf.set("spark.driver.maxResultSize","6g") 但这并没有解决问题,我几乎接近我分配驱动程序内存的数量,但问题没有得到解决.虽然代码正在执行,但我仍然关注我的火花工作,我看到的有点令人费解. [Stage 281:==> (47807 + 100) / 1000000]15/12/01 12:27:03 ERROR TaskSetManager: Total size of serialized results of 47809 tasks (6.0 GB) is bigger than spark.driver.maxResultSize (6.0 GB) 在上面的阶段代码是在第277行的spark-mllib recommendedForAll中执行MatrixFactorization代码(不完全确定行号). private def recommendForAll( rank: Int,srcFeatures: RDD[(Int,Array[Double])],dstFeatures: RDD[(Int,num: Int): RDD[(Int,Array[(Int,Double)])] = { val srcBlocks = blockify(rank,srcFeatures) val dstBlocks = blockify(rank,dstFeatures) val ratings = srcBlocks.cartesian(dstBlocks).flatMap { case ((srcIds,srcFactors),(dstIds,dstFactors)) => val m = srcIds.length val n = dstIds.length val ratings = srcFactors.transpose.multiply(dstFactors) val output = new Array[(Int,(Int,Double))](m * n) var k = 0 ratings.foreachActive { (i,j,r) => output(k) = (srcIds(i),(dstIds(j),r)) k += 1 } output.toSeq } ratings.topByKey(num)(Ordering.by(_._2)) } suggestForAll方法从suggestProductsForUsers方法调用. 但看起来这种方法正在剥离1M任务.获得的数据来自2000个部分文件,所以我很困惑它开始吐出1M任务,我认为这可能是问题所在. 我的问题是如何才能真正解决这个问题.如果不使用这种方法,很难计算重叠率或召回@K.这是火花1.5(cloudera 5.5) 解决方法
2GB问题对Spark社区来说并不陌生:
https://issues.apache.org/jira/browse/SPARK-6235
重新/分区大小大于2GB,尝试重新分区(myRdd.repartition(parallelism))你的RDD到更多的分区(w / r / t /你当前的并行级别),从而减少每个单独分区的大小. Re /旋转的任务数量(因此创建了分区),我的假设是它可能来自srcBlocks.cartesian(dstBlocks)API调用,它产生的输出RDD由(z = srcBlocks的分区数* dstBlocks的数量组成)分区). 在这种情况下,您可以考虑利用myRdd.coalesce(并行)API而不是重新分区来避免混乱(并分区与分离的相关问题). (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |