加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 当Spark尝试发送GetMapOutputStatuses时,为什么会报告

发布时间:2020-12-16 09:17:21 所属栏目:安全 来源:网络整理
导读:我正在使用Spark 1.3对大量数据进行聚合.该工作包括4个步骤: 读取大(1TB)序列文件(对应于1天的数据) 过滤掉大部分内容,得到大约1GB的随机写入 关键客户 aggregateByKey()到构建该客户的配置文件的自定义结构,对应于每个客户的HashMap [Long,Float].长按键是
我正在使用Spark 1.3对大量数据进行聚合.该工作包括4个步骤:

>读取大(1TB)序列文件(对应于1天的数据)
>过滤掉大部分内容,得到大约1GB的随机写入
>关键客户
> aggregateByKey()到构建该客户的配置文件的自定义结构,对应于每个客户的HashMap [Long,Float].长按键是独一无二的,绝对不能超过50K个不同的条目.

我正在使用此配置运行:

--name geo-extract-$1-askTimeout 
--executor-cores 8 
--num-executors 100 
--executor-memory 40g 
--driver-memory 4g 
--driver-cores 8 
--conf 'spark.storage.memoryFraction=0.25' 
--conf 'spark.shuffle.memoryFraction=0.35' 
--conf 'spark.kryoserializer.buffer.max.mb=1024' 
--conf 'spark.akka.frameSize=1024' 
--conf 'spark.akka.timeout=200' 
--conf 'spark.akka.askTimeout=111' 
--master yarn-cluster 

并得到这个错误:

org.apache.spark.SparkException: Error communicating with MapOutputTracker
        at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:117)
        at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:164)
        at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
        ...
    Caused by: org.apache.spark.SparkException: Error sending message [message = GetMapOutputStatuses(0)]
        at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:209)
        at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:113)
        ... 21 more
    Caused by: java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
        at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
        at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
        at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
        at scala.concurrent.Await$.result(package.scala:107)
        at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195)

工作和逻辑已被证明可以使用一个小的测试集,我甚至可以运行这个工作一些日期,但不是为其他人.我已经google了,发现提示“与MapOutputTracker通信时出错”与内部Spark消息有关,但是我已经增加了“spark.akka.frameSize”,“spark.akka.timeout”和“spark.akka.askTimeout”(这最后一个甚至不会出现在Spark文档中,但在Spark邮件列表中提到),没有用. 30秒还有一些超时时间,我不知道如何识别或修复.

由于过滤操作和aggregateByKey执行本地部分聚合的事实应该足以解决数据大小,所以我没有理由因数据大小而失败.任务数量为16K(从原始输入自动),远远超过运行这个的800个内核,在100个执行程序上,因此它不像通常的“增量分区”提示那么简单.任何线索都将不胜感激!谢谢!

解决方法

我有一个类似的问题,我的工作将适用于较小的数据集,但会失败与较大的数据集.

经过大量配置更改后,我发现更改驱动程序内存设置比更改执行程序内存设置有更多的影响.
也使用新的垃圾收集器有助于很多.我正在为3的集群使用以下配置,每个集群为40个核心.希望以下配置有助于:

spark.driver.extraJavaOptions=-XX:+UseG1GC -XX:NewRatio=3 -  
XX:InitiatingHeapOccupancyPercent=35 -XX:+PrintGCDetails -XX:MaxPermSize=4g 
-XX:PermSize=1G -XX:+PrintGCTimeStamps -XX:+UnlockDiagnosticVMOptions

spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:NewRatio=3 -
XX:InitiatingHeapOccupancyPercent=35 -XX:+PrintGCDetails -XX:MaxPermSize=4g   
-XX:PermSize=1G -XX:+PrintGCTimeStamps -XX:+UnlockDiagnosticVMOptions


spark.driver.memory=8g
spark.driver.cores=10
spark.driver.maxResultSize=8g

spark.executor.memory=16g
spark.executor.cores=25

spark.default.parallelism=50
spark.eventLog.dir=hdfs://mars02-db01/opt/spark/logs
spark.eventLog.enabled=true

spark.kryoserializer.buffer=512m
spark.kryoserializer.buffer.max=1536m

spark.rdd.compress=true
spark.storage.memoryFraction=0.15
spark.storage.MemoryStore=12g

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读