scala – 在Spark中导入镶木地板文件时的内存问题
发布时间:2020-12-16 18:54:47 所属栏目:安全 来源:网络整理
导读:我正在尝试从 Scala Spark(1.5)中的镶木地板文件中查询数据,包括200万行的查询(以下代码中的“变体”). val sqlContext = new org.apache.spark.sql.SQLContext(sc) sqlContext.sql("SET spark.sql.parquet.binaryAsString=true")val parquetFile = sqlConte
我正在尝试从
Scala Spark(1.5)中的镶木地板文件中查询数据,包括200万行的查询(以下代码中的“变体”).
val sqlContext = new org.apache.spark.sql.SQLContext(sc) sqlContext.sql("SET spark.sql.parquet.binaryAsString=true") val parquetFile = sqlContext.read.parquet(<path>) parquetFile.registerTempTable("tmpTable") sqlContext.cacheTable("tmpTable") val patients = sqlContext.sql("SELECT DISTINCT patient FROM tmpTable ...) val variants = sqlContext.sql("SELECT DISTINCT ... FROM tmpTable ... ) 当获取的行数较少时,此运行正常,但在请求大量数据时,“大小超过Integer.MAX_VALUE”错误则失败. User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 43 in stage 1.0 failed 4 times,most recent failure: Lost task 43.3 in stage 1.0 (TID 123,node009): java.lang.RuntimeException: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828) at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125) at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113) at ... 我能做些什么来完成这项工作? 这看起来像是一个内存问题,但我尝试使用多达100个执行程序,没有区别(无论涉及的执行程序数量多少,失败所用的时间都保持不变).感觉数据没有在节点之间进行分区? 我试图通过天真地替换这条线来强制更高的并行化,但无济于事: val variants = sqlContext.sql("SELECT DISTINCT ... FROM tmpTable ... ).repartition(sc.defaultParallelism*10) 解决方法
我不相信这个问题具有针对性.您正在“击中”Spark中分区的最大大小限制.
Integer.MAX_VALUE检测到您的大小(我相信)大于2GB的分区(需要多于int32来索引它). Joe Widen的评论很有见.您需要对数据进行更多重新分区.尝试1000或更多. 例如., val data = sqlContext.read.parquet("data.parquet").rdd.repartition(1000).toDF (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |