加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – spark 2.2 cache()导致驱动程序OutOfMemoryerror

发布时间:2020-12-16 18:29:16 所属栏目:安全 来源:网络整理
导读:我在AWS EMR上使用 Scala运行Spark 2.2(Zeppling / spark-shell). 我正在尝试计算非常简单的计算:加载,过滤,缓存和计数大型数据集.我的数据包含4,500 GB(4.8 TB)ORC格式,包含51,317,951,565(510亿)行. 首先,我尝试使用以下群集进行处理: 1 master node –
我在AWS EMR上使用 Scala运行Spark 2.2(Zeppling / spark-shell).

我正在尝试计算非常简单的计算:加载,过滤,缓存和计数大型数据集.我的数据包含4,500 GB(4.8 TB)ORC格式,包含51,317,951,565(510亿)行.

首先,我尝试使用以下群集进行处理:

1 master node – m4.xlarge – 4 cpu,16 gb Mem

150 core nodes – r3.xlarge – 4 cpu,29 gb Mem

150 tasks nodes – r3.xlarge – 4 cpu,29 gb Mem

但是OutOfMemoryError失败了.

当我查看Spark UI和Ganglia时,我看到在应用程序加载了超过80%的数据后,驱动程序节点变得太忙而执行程序停止工作(CPU使用率非常低),直到崩溃为止.

Ganglia CPU usage for master and worker nodes

然后我尝试执行相同的过程只是将驱动程序节点增加到:

1 master node – m4.2xlarge – 8 cpu,31 gb Mem

它成功了.

我不明白为什么驱动程序节点内存使用量在崩溃之前就已经完成了. AFAIK只有执行程序正在加载和处理任务,数据不应传递给主服务器.可能是什么原因呢?

1)Ganglia Master Node usage for the second scenario

2)Spark UI stages

3)Spark UI DAG visualization

您可以在下面找到代码:

import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataset,SaveMode,SparkSession,DataFrame}
import org.apache.spark.sql.functions.{concat_ws,expr,lit,udf}
import org.apache.spark.storage.StorageLevel

val df = spark.sql("select * from default.level_1 where date_  >= ('2017-11-08') and date_  <= ('2017-11-27')")
.drop("carrier","city","connection_type","geo_country","geo_lat","geo_lon","geo_type","ip","keywords","language","lat","lon","store_category","GEO3","GEO4")
.where("GEO4 is not null")
.withColumn("is_away",lit(0))


df.persist(StorageLevel.MEMORY_AND_DISK_SER)
df.count()

您可以在下面找到错误消息 –

{"Event":"SparkListenerLogStart","Spark Version":"2.2.0"}
{"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"driver","Host":"10.44.6.179","Port":44257},"Maximum Memory":6819151872,"Timestamp":1512024674827,"Maximum Onheap Memory":6819151872,"Maximum Offheap Memory":0}
{"Event":"SparkListenerEnvironmentUpdate","JVM Information":{"Java Home":"/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.141-1.b16.32.amzn1.x86_64/jre","Java Version":"1.8.0_141 (Oracle Corporation)","Scala Version":"version 2.11.8"},"Spark Properties":{"spark.sql.warehouse.dir":"hdfs:///user/spark/warehouse","spark.yarn.dist.files":"file:/etc/spark/conf/hive-site.xml","spark.executor.extraJavaOptions":"-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 -XX:+CMSClassUnloadingEnabled -XX:OnOutOfMemoryError='kill -9 %p'","spark.driver.host":"10.44.6.179","spark.history.fs.logDirectory":"hdfs:///var/log/spark/apps","spark.eventLog.enabled":"true","spark.driver.port":"33707","spark.shuffle.service.enabled":"true","spark.driver.extraLibraryPath":"/usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native","spark.repl.class.uri":"spark://10.44.6.179:33707/classes","spark.jars":"","spark.yarn.historyServer.address":"ip-10-44-6-179.ec2.internal:18080","spark.stage.attempt.ignoreOnDecommissionFetchFailure":"true","spark.repl.class.outputDir":"/mnt/tmp/spark-52cac1b4-614f-43a5-ab9b-5c60c6c1c5a5/repl-9389c888-603e-4988-9593-86e298d2514a","spark.app.name":"Spark shell","spark.scheduler.mode":"FIFO","spark.driver.memory":"11171M","spark.executor.instances":"200","spark.default.parallelism":"3200","spark.resourceManager.cleanupExpiredHost":"true","spark.executor.id":"driver","spark.yarn.appMasterEnv.SPARK_PUBLIC_DNS":"$(hostname -f)","spark.driver.extraJavaOptions":"-XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 -XX:+CMSClassUnloadingEnabled -XX:OnOutOfMemoryError='kill -9 %p'","spark.submit.deployMode":"client","spark.master":"yarn","spark.ui.filters":"org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter","spark.blacklist.decommissioning.timeout":"1h","spark.executor.extraLibraryPath":"/usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native","spark.sql.hive.metastore.sharedPrefixes":"com.amazonaws.services.dynamodbv2","spark.executor.memory":"20480M","spark.driver.extraClassPath":"/usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*:/usr/share/aws/hmclient/lib/aws-glue-datacatalog-spark-client.jar:/usr/share/java/Hive-JSON-Serde/hive-openx-serde.jar","spark.home":"/usr/lib/spark","spark.eventLog.dir":"hdfs:///var/log/spark/apps","spark.dynamicAllocation.enabled":"true","spark.executor.extraClassPath":"/usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*:/usr/share/aws/hmclient/lib/aws-glue-datacatalog-spark-client.jar:/usr/share/java/Hive-JSON-Serde/hive-openx-serde.jar","spark.sql.catalogImplementation":"hive","spark.executor.cores":"8","spark.history.ui.port":"18080","spark.driver.appUIAddress":"http://ip-10-44-6-179.ec2.internal:4040","spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_HOSTS":"ip-10-44-6-

笔记 –

1)我试图将StorageLevel更改为cache()和DISK_ONLY,但它不会影响结果.

2)我检查了“刮擦空间”的体积,我看到其中90%以上仍未使用.

谢谢!!

解决方法

我有一些假设,这可能是由spark SQL内部的机制引起的.

简而言之,spark SQL将收集驱动程序端的所有广播数据集,这样当您有大查询时,驱动程序必须有足够的内存来保存广播数据.

相关链接:enter link description here

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读