scala – Spark:当加入2个大型DF时,大小超过Integer.MAX_VALUE
伙计们,
我正在尝试在每行一个单个密钥标识符的spark中加入2个大型数据帧(每个100GB)时遇到此问题. 我在EMR上使用Spark 1.6,这就是我正在做的事情: val df1 = sqlContext.read.json("hdfs:///df1/") val df2 = sqlContext.read.json("hdfs:///df2/") // clean up and filter steps later df1.registerTempTable("df1") df2.registerTempTable("df2") val df3 = sql("select df1.*,df2.col1 from df1 left join df2 on df1.col3 = df2.col4") df3.write.json("hdfs:///df3/") 这基本上是我正在做的事情的要点,以及最终加入df1和df2之间的其他清理和过滤步骤. 我看到的错误是: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:860) at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:127) at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:115) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1250) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:129) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:136) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:503) at org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:420) at org.apache.spark.storage.BlockManager.get(BlockManager.scala:625) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:44) at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 配置和参考: > spark.sql.broadcastTimeout 我也试过使用更大的集群,但没有帮助. This link表示如果Shuffle分区大小超过2GB,则抛出此错误.但我已经尝试增加分区数量到一个很高的价值,仍然没有运气. 我怀疑这可能与延迟加载有关.当我在DF上执行10次操作时,它们仅在最后一步执行.我尝试在DF的各种存储级别上添加.persist(),但仍然没有成功.我也尝试删除临时表,清空所有早期的DF以进行清理. 但是,如果我将其分解为两部分 – 将最终的临时数据(2个数据帧)写入磁盘,则代码可以正常工作.重新启动以仅加入两个DF. 我之前收到此错误: Exception in thread "main" java.util.concurrent.TimeoutException: Futures timed out after [300 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.sql.execution.joins.BroadcastHashOuterJoin.doExecute(BroadcastHashOuterJoin.scala:113) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130) at org.apache.spark.sql.execution.Project.doExecute(basicOperators.scala:46) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55) at org.apache.spark.sql.DataFrame.toJSON(DataFrame.scala:1724) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 但是当我调整spark.sql.broadcastTimeout时,我开始得到第一个错误. 非常感谢在这种情况下的任何帮助.如果需要,我可以添加更多信息. 解决方法
在火花你不能有大于2GB的shuffle块.这是因为,
Spark将shuffle块存储为 ByteBuffer.以下是如何分配它: ByteBuffer.allocate(INT容量) 因为,ByteBuffer受限于Integer.MAX_SIZE(2GB),所以是shuffle块!解决方案是通过在SparkSQL中使用spark.sql.shuffle.partitions或在rdd中使用rdd.partition()或rdd.colease()来增加分区数,使得每个分区大小都是< = 2GB. 您提到您尝试增加分区数量但仍然失败.你能检查分区大小是否> 2GB.只需确保指定的分区数足以使每个块大小<1. 2GB (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
- bootstrap + tornado
- “-bash: /usr/local/bin/npm: No such file or
- Bootstrap datepicker日期选择器插件使用详解
- scala – 使用Spark 2.0.2(结构化流媒体)从Kafka
- shell脚本小练习
- bootStrap中class = "form-control selectpi
- Docker:Dockerfile与docker-compose.yml
- 在Scala中添加整数元组列表
- angularjs – CORS with angular.js $resource和
- 如何在Docker容器中使用GnuPG,因为它缺少熵?