scala – 当DF包含太多列时,Spark UDF会在每条记录中多次调用
发布时间:2020-12-16 09:27:26 所属栏目:安全 来源:网络整理
导读:我正在使用Spark 1.6.1并遇到一个奇怪的行为:我在包含一些输入数据的数据帧上运行一个带有一些繁重计算(物理模拟)的UDF,并构建一个包含许多列的结果-Dataframe(~40 ). 奇怪的是,在这种情况下,我的输入数据帧的每个记录不止一次调用我的UDF(经常是1.6倍),我
我正在使用Spark 1.6.1并遇到一个奇怪的行为:我在包含一些输入数据的数据帧上运行一个带有一些繁重计算(物理模拟)的UDF,并构建一个包含许多列的结果-Dataframe(~40 ).
奇怪的是,在这种情况下,我的输入数据帧的每个记录不止一次调用我的UDF(经常是1.6倍),我发现这是不可接受的,因为它非常昂贵.如果我减少列数(例如减少到20),则此行为将消失. 我设法写下一个小脚本来证明这一点: import org.apache.spark.sql.SQLContext import org.apache.spark.{SparkConf,SparkContext} import org.apache.spark.sql.functions.udf object Demo { case class Result(a: Double) def main(args: Array[String]): Unit = { val sc = new SparkContext(new SparkConf().setAppName("Demo").setMaster("local[*]")) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ val numRuns = sc.accumulator(0) // to count the number of udf calls val myUdf = udf((i:Int) => {numRuns.add(1);Result(i.toDouble)}) val data = sc.parallelize((1 to 100),numSlices = 5).toDF("id") // get results of UDF var results = data .withColumn("tmp",myUdf($"id")) .withColumn("result",$"tmp.a") // add many columns to dataframe (must depend on the UDF's result) for (i <- 1 to 42) { results=results.withColumn(s"col_$i",$"result") } // trigger action val res = results.collect() println(res.size) // prints 100 println(numRuns.value) // prints 160 } } 现在,有没有办法在不减少列数的情况下解决这个问题? 解决方法
我无法解释这种行为 – 但很明显,查询计划会以某种方式选择一些路径,其中一些记录会被计算两次.这意味着如果我们缓存中间结果(在应用UDF之后),我们可能会“强制”Spark不重新计算UDF.实际上,一旦添加了缓存,它就会按预期运行 – UDF被称为100次:
// get results of UDF var results = data .withColumn("tmp",myUdf($"id")) .withColumn("result",$"tmp.a").cache() 当然,缓存有其自身的成本(内存……),但如果它保存了许多UDF调用,它可能最终会对你的情况有所帮助. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |