scala – RDD CountApproximate比请求的超时时间长得多
为了减少收集DataFrame行计数所花费的时间,正在调用RDD.countApproximate().它有以下签名:
def countApprox( timeout: Long,confidence: Double = 0.95): PartialResult[BoundedDouble] = withScope { 我试图将输出计算限制为60秒.另请注意0.10的精度要求非常低: val waitSecs = 60 val cnt = inputDf.rdd.countApprox(waitSecs * 1000,0.10).getFinalValue.mean 但实际时间是…… 17分钟? 那个时间几乎与首先生成数据所需的时间(19分钟)相同! 那么 – 这个api的用途是什么:有没有什么方法可以让它真正节省一些有意义的时间计算部分? TL; DR(参见接受的答案):使用initialValue而不是getFinalValue 解决方法
请注意approxCount定义中的返回类型.这是部分结果.
def countApprox( timeout: Long,confidence: Double = 0.95): PartialResult[BoundedDouble] = withScope { 现在,请注意它的使用方法: val waitSecs = 60 val cnt = inputDf.rdd.countApprox(waitSecs * 1000,0.10).**getFinalValue**.mean 根据spark scala doc, 而 val waitSecs = 60 val cnt = inputDf.rdd.countApprox(waitSecs * 1000,0.10).initialValue.mean 请注意使用countApprox(超时,置信度).initialValue的缺点是,即使在获得该值之后,它仍将继续计数,直到它获得您使用getFinalValue获得的最终计数,并且仍将保留资源直到操作完成. 现在使用这个api不会在计数操作中被阻止. 参考:https://mail-archives.apache.org/mod_mbox/spark-user/201505.mbox/%3C747872034.1520543.1431544429083.JavaMail.yahoo@mail.yahoo.com%3E 现在让我们验证我们对spark2-shell的非阻塞操作的假设.让我们创建随机数据帧并执行count,aboutCount使用getFinalValue和approxCount with initialValue: scala> val schema = StructType((0 to 10).map(n => StructField(s"column_$n",StringType))) schema: org.apache.spark.sql.types.StructType = StructType(StructField(column_0,StringType,true),StructField(column_1,StructField(column_2,StructField(column_3,StructField(column_4,StructField(column_5,StructField(column_6,StructField(column_7,StructField(column_8,StructField(column_9,StructField(column_10,true)) scala> val rows = spark.sparkContext.parallelize(Seq[Row](),100).mapPartitions { _ => { Range(0,100000).map(m => Row(schema.map(_ => Random.alphanumeric.filter(_.isLower).head.toString).toList: _*)).iterator } } rows: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[1] at mapPartitions at <console>:32 scala> val inputDf = spark.sqlContext.createDataFrame(rows,schema) inputDf: org.apache.spark.sql.DataFrame = [column_0: string,column_1: string ... 9 more fields] //Please note that cnt will be displayed only when all tasks are completed scala> val cnt = inputDf.rdd.count cnt: Long = 10000000 scala> val waitSecs = 60 waitSecs: Int = 60 //cntApproxFinal will be displayed only when all tasks are completed. scala> val cntApprxFinal = inputDf.rdd.countApprox(waitSecs * 1000,0.10).getFinalValue.mean [Stage 1:======================================================> (98 + 2) / 100]cntApprxFinal: Double = 1.0E7 scala> val waitSecs = 60 waitSecs: Int = 60 //Please note that cntApprxInitila in this case,will be displayed exactly after timeout duration. In this case 80 tasks were completed within timeout and it displayed the value of variable. Even after displaying the variable value,it continued will all the remaining tasks scala> val cntApprxInitial = inputDf.rdd.countApprox(waitSecs * 1000,0.10).initialValue.mean [Stage 2:============================================> (80 + 4) / 100]cntApprxInitial: Double = 1.0E7 [Stage 2:=======================================================>(99 + 1) / 100] 让我们来看看spark ui和spark-shell,所有3个操作同时进行: cntApprxInitial在完成所有任务之前可用. 希望这可以帮助! (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |