scala – 在Spark中将镶木地板文件加载到case类中的性能
我正在评估在Spark中加载Parquet文件的不同方法的性能,差异是惊人的.
在我们的Parquet文件中,我们有类似的嵌套case类: case class C(/* a dozen of attributes*/) case class B(/* a dozen of attributes*/,cs: Seq[C]) case class A(/* a dozen of attributes*/,bs: Seq[B]) 从Parquet文件加载它们需要一段时间. 以下是我做的基准测试的总结: val df: DataFrame = sqlContext.read.parquet("path/to/file.gz.parquet").persist() df.count() // Spark 1.6 // Play Json // 63.169s df.toJSON.flatMap(s => Try(Json.parse(s).as[A]).toOption) .map(_.fieldToSum).sum() // Direct access to field using Spark Row // 2.811s df.map(row => row.getAs[Long]("fieldToSum")).sum() // Some small library we developed that access fields using Spark Row // 10.401s df.toRDD[A].map(_.fieldToSum).sum() // Dataframe hybrid SQL API // 0.239s df.agg(sum("fieldToSum")).collect().head.getAs[Long](0) // Dataset with RDD-style code // 34.223s df.as[A].map(_.fieldToSum).reduce(_ + _) // Dataset with column selection // 0.176s df.as[A].select($"fieldToSum".as[Long]).reduce(_ + _) // Spark 2.0 // Performance is similar except for: // Direct access to field using Spark Row // 23.168s df.map(row => row.getAs[Long]("fieldToSum")).reduce(_ + _) // Some small library we developed that access fields using Spark Row // 32.898s f1DF.toRDD[A].map(_.fieldToSum).sum() 我理解为什么升级到Spark 2.0时使用Spark Row的方法的性能会降低,因为Dataframe现在只是Dataset [Row]的别名. 另一方面,我对数据集的承诺没有得到保留感到非常失望:使用RDD样式编码(map和flatMaps)时的性能比使用类似SQL的DSL的数据集这样的数据集更糟. 基本上,为了获得良好的性能,我们需要放弃类型安全. >用作RDD的数据集和用作数据帧的数据集之间存在这种差异的原因是什么? 解决方法
为了获得一些见解,让我们考虑一下Spark SQL使用的优化.据我所知,有三种类型的改进优于普通RDD: >执行计划优化(投影和选择下推,常数折叠), 现在问题是并非所有这些技术在受限制的编程模型(如SQL)之外都是有用的. 例如,可以下推选择(过滤器),但投影非常有限(你不能真正拥有对象的一部分,可以吗?).类似地,代码生成依赖于定义良好的语义,并且通常不容易应用它(它基本上是一个生成可以通过JVM进一步优化的代码的编译器). 最后sun.misc.Unsafe是一种提高性能的惊人方式,但它不是免费提供的.虽然这里有很多增益,但编码和解码也有很大的开销.
嵌套结构并不完全是第一类公民,并且有一些记录不完的限制,你仍然可以在这里做很多.
虽然有一些性能回归,但这两段代码根本不相同.在2.0.0中,DataFrame.map具有与1.x版本不同的签名.如果你想使这两者相当,你应该首先转换为RDD: df.rdd.map(row => row.getAs[Long]("fieldToSum")).reduce(_ + _) (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |