scala – 在Spark中计算UDF的调用
发布时间:2020-12-16 08:45:34 所属栏目:安全 来源:网络整理
导读:使用Spark 1.6.1我想调用UDF的调用次数.我想这样做是因为我有一个非常昂贵的UDF(每次调用大约1秒),我怀疑UDF被调用的次数比我数据帧中的记录数要多,这使得我的spark工作速度慢于必要. 虽然我无法重现这种情况,但我想出了一个简单的例子,显示对UDF的调用次数
使用Spark 1.6.1我想调用UDF的调用次数.我想这样做是因为我有一个非常昂贵的UDF(每次调用大约1秒),我怀疑UDF被调用的次数比我数据帧中的记录数要多,这使得我的spark工作速度慢于必要.
虽然我无法重现这种情况,但我想出了一个简单的例子,显示对UDF的调用次数似乎与行数不同(此处:更少),这怎么可能? import org.apache.spark.sql.SQLContext import org.apache.spark.{SparkConf,SparkContext} import org.apache.spark.sql.functions.udf object Demo extends App { val conf = new SparkConf().setMaster("local[4]").setAppName("Demo") val sc = new SparkContext(conf) sc.setLogLevel("WARN") val sqlContext = new SQLContext(sc) import sqlContext.implicits._ val callCounter = sc.accumulator(0) val df= sc.parallelize(1 to 10000,numSlices = 100).toDF("value") println(df.count) // gives 10000 val myudf = udf((d:Int) => {callCounter.add(1);d}) val res = df.withColumn("result",myudf($"value")).cache println(res.select($"result").collect().size) // gives 10000 println(callCounter.value) // gives 9941 } 如果使用累加器不是调用UDF计数的正确方法,我还能怎样做呢? 注意:在我的实际Spark-Job中,获得的呼叫计数大约是实际记录数的1.7倍. 解决方法
Spark应用程序应定义main()方法,而不是扩展scala.App. scala.App的子类可能无法正常工作.
import org.apache.spark.sql.SQLContext import org.apache.spark.{SparkConf,SparkContext} import org.apache.spark.sql.functions.udf object Demo extends App { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("Simple Application").setMaster("local[4]") val sc = new SparkContext(conf) // [...] } } 这应该可以解决您的问题. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |