scala – 为什么spark-shell在依靠具有3000列的DataFrame后会打
(使用本地机器官方网站上的spark-2.1.0-bin-hadoop2.7版本)
当我在spark-shell中执行一个简单的spark命令时,它会在抛出错误之前打印出数千行代码.这些“代码”是什么? 我在我的本地机器上运行火花.我运行的命令是一个简单的df.count,其中df是一个DataFrame. 请看下面的截图(代码飞得太快,我只能截取屏幕截图,看看发生了什么).更多细节在图像下方. 更多细节: 我创建了数据框df val df: DataFrame = spark.createDataFrame(rows,schema) // rows: RDD[Row] // schema: StructType // There were about 3000 columns and 700 rows (testing set) of data in df. // The following line ran successfully and returned the correct value rows.count // The following line threw exception after printing out tons of codes as shown in the screenshot above df.count “代码”之后抛出的异常是: ... /* 181897 */ apply_81(i); /* 181898 */ result.setTotalSize(holder.totalSize()); /* 181899 */ return result; /* 181900 */ } /* 181901 */ } at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:889) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:941) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:938) at org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599) at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379) at org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342) at org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257) ... 29 more Caused by: org.codehaus.janino.JaninoRuntimeException: Code of method "(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass;[Ljava/lang/Object;)V" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection" grows beyond 64 KB at org.codehaus.janino.CodeContext.makeSpace(CodeContext.java:941) at org.codehaus.janino.CodeContext.write(CodeContext.java:854) at org.codehaus.janino.CodeContext.writeShort(CodeContext.java:959) 编辑:正如@TzachZohar所指出的,这看起来像已知的错误之一(https://issues.apache.org/jira/browse/SPARK-16845)已修复但未从spark项目中释放. 我拉了火花大师,从源头建造它,并重新尝试了我的例子.现在我在生成的代码后面得到了一个新的异常: /* 308608 */ apply_1560(i); /* 308609 */ apply_1561(i); /* 308610 */ result.setTotalSize(holder.totalSize()); /* 308611 */ return result; /* 308612 */ } /* 308613 */ } at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:941) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:998) at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:995) at org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599) at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379) at org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342) at org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257) ... 29 more Caused by: org.codehaus.janino.JaninoRuntimeException: Constant pool for class org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection has grown past JVM limit of 0xFFFF at org.codehaus.janino.util.ClassFile.addToConstantPool(ClassFile.java:499) 看起来拉请求正在解决第二个问题:https://github.com/apache/spark/pull/16648 解决方法
这是一个错误.它与在JVM上生成的运行时代码有关.因此,Scala团队似乎很难解决. (关于JIRA的讨论很多).
在执行行操作时,我发生了错误.即使是700行的数据帧上的df.head()也会导致异常. 我的解决方法是将数据帧转换为稀疏数据RDD(即RDD [LabeledPoint])并在RDD上运行逐行操作.它更快,内存效率更高. HOwever,它只适用于数字数据.分类变量(因子,目标等)需要转换为Double. 也就是说,我自己是Scala的新手,所以我的代码可能有点业余.但它的确有效. CreateRow @throws(classOf[Exception]) private def convertRowToLabeledPoint(rowIn: Row,fieldNameSeq: Seq[String],label: Int): LabeledPoint = { try { logger.info(s"fieldNameSeq $fieldNameSeq") val values: Map[String,Long] = rowIn.getValuesMap(fieldNameSeq) val sortedValuesMap = ListMap(values.toSeq.sortBy(_._1): _*) //println(s"convertRowToLabeledPoint row values ${sortedValuesMap}") print(".") val rowValuesItr: Iterable[Long] = sortedValuesMap.values var positionsArray: ArrayBuffer[Int] = ArrayBuffer[Int]() var valuesArray: ArrayBuffer[Double] = ArrayBuffer[Double]() var currentPosition: Int = 0 rowValuesItr.foreach { kv => if (kv > 0) { valuesArray += kv.toDouble; positionsArray += currentPosition; } currentPosition = currentPosition + 1; } new LabeledPoint(label,org.apache.spark.mllib.linalg.Vectors.sparse(positionsArray.size,positionsArray.toArray,valuesArray.toArray)) } catch { case ex: Exception => { throw new Exception(ex) } } } private def castColumnTo(df: DataFrame,cn: String,tpe: DataType): DataFrame = { //println("castColumnTo") df.withColumn(cn,df(cn).cast(tpe) ) } 提供Dataframe并返回RDD LabeledPOint @throws(classOf[Exception]) def convertToLibSvm(spark:SparkSession,mDF : DataFrame,targetColumnName:String): RDD[LabeledPoint] = { try { val fieldSeq: scala.collection.Seq[StructField] = mDF.schema.fields.toSeq.filter(f => f.dataType == IntegerType || f.dataType == LongType) val fieldNameSeq: Seq[String] = fieldSeq.map(f => f.name) val indexer = new StringIndexer() .setInputCol(targetColumnName) .setOutputCol(targetColumnName+"_Indexed") val mDFTypedIndexed = indexer.fit(mDF).transform(mDF).drop(targetColumnName) val mDFFinal = castColumnTo(mDFTypedIndexed,targetColumnName+"_Indexed",IntegerType) //mDFFinal.show() //only doubles accepted by sparse vector,so that's what we filter for var positionsArray: ArrayBuffer[LabeledPoint] = ArrayBuffer[LabeledPoint]() mDFFinal.collect().foreach { row => positionsArray += convertRowToLabeledPoint(row,fieldNameSeq,row.getAs(targetColumnName+"_Indexed")); } spark.sparkContext.parallelize(positionsArray.toSeq) } catch { case ex: Exception => { throw new Exception(ex) } } } (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |