加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – Spark:当加入2个大型DF时,大小超过Integer.MAX_VALUE

发布时间:2020-12-16 19:14:02 所属栏目:安全 来源:网络整理
导读:伙计们, 我正在尝试在每行一个单个密钥标识符的spark中加入2个大型数据帧(每个100GB)时遇到此问题. 我在EMR上使用Spark 1.6,这就是我正在做的事情: val df1 = sqlContext.read.json("hdfs:///df1/")val df2 = sqlContext.read.json("hdfs:///df2/")// clean
伙计们,

我正在尝试在每行一个单个密钥标识符的spark中加入2个大型数据帧(每个100GB)时遇到此问题.

我在EMR上使用Spark 1.6,这就是我正在做的事情:

val df1 = sqlContext.read.json("hdfs:///df1/")
val df2 = sqlContext.read.json("hdfs:///df2/")

// clean up and filter steps later 

df1.registerTempTable("df1")
df2.registerTempTable("df2")

val df3 = sql("select df1.*,df2.col1 from df1 left join df2 on df1.col3 = df2.col4")

df3.write.json("hdfs:///df3/")

这基本上是我正在做的事情的要点,以及最终加入df1和df2之间的其他清理和过滤步骤.

我看到的错误是:

java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
    at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:860)
    at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:127)
    at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:115)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1250)
    at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:129)
    at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:136)
    at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:503)
    at org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:420)
    at org.apache.spark.storage.BlockManager.get(BlockManager.scala:625)
    at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:44)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

配置和参考:
我使用13个节点60GB每个集群与执行程序和驱动程序内存相应的开销设置.我试过调整的事情:

> spark.sql.broadcastTimeout
> spark.sql.shuffle.partitions

我也试过使用更大的集群,但没有帮助. This link表示如果Shuffle分区大小超过2GB,则抛出此错误.但我已经尝试增加分区数量到一个很高的价值,仍然没有运气.

我怀疑这可能与延迟加载有关.当我在DF上执行10次操作时,它们仅在最后一步执行.我尝试在DF的各种存储级别上添加.persist(),但仍然没有成功.我也尝试删除临时表,清空所有早期的DF以进行清理.

但是,如果我将其分解为两部分 – 将最终的临时数据(2个数据帧)写入磁盘,则代码可以正常工作.重新启动以仅加入两个DF.

我之前收到此错误:

Exception in thread "main" java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
    at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
    at scala.concurrent.Await$.result(package.scala:107)
    at org.apache.spark.sql.execution.joins.BroadcastHashOuterJoin.doExecute(BroadcastHashOuterJoin.scala:113)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
    at org.apache.spark.sql.execution.Project.doExecute(basicOperators.scala:46)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55)
    at org.apache.spark.sql.DataFrame.toJSON(DataFrame.scala:1724)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

但是当我调整spark.sql.broadcastTimeout时,我开始得到第一个错误.

非常感谢在这种情况下的任何帮助.如果需要,我可以添加更多信息.

解决方法

在火花你不能有大于2GB的shuffle块.这是因为,
Spark将shuffle块存储为 ByteBuffer.以下是如何分配它:

ByteBuffer.allocate(INT容量)

因为,ByteBuffer受限于Integer.MAX_SIZE(2GB),所以是shuffle块!解决方案是通过在SparkSQL中使用spark.sql.shuffle.partitions或在rdd中使用rdd.partition()或rdd.colease()来增加分区数,使得每个分区大小都是< = 2GB. 您提到您尝试增加分区数量但仍然失败.你能检查分区大小是否> 2GB.只需确保指定的分区数足以使每个块大小<1. 2GB

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读