加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 如何获取写入的记录数(使用DataFrameWriter的保存操作)

发布时间:2020-12-16 18:09:33 所属栏目:安全 来源:网络整理
导读:使用spark保存记录时,有没有办法获取写入的记录数?虽然我知道目前不符合规范,但我希望能够做到这样的事情: val count = df.write.csv(path) 或者,能够进行步骤结果的内联计数(优选地不使用标准累加器)将(几乎)同样有效.即: dataset.countTo(count_var).fi
使用spark保存记录时,有没有办法获取写入的记录数?虽然我知道目前不符合规范,但我希望能够做到这样的事情:

val count = df.write.csv(path)

或者,能够进行步骤结果的内联计数(优选地不使用标准累加器)将(几乎)同样有效.即:

dataset.countTo(count_var).filter({function}).countTo(filtered_count_var).collect()

有任何想法吗?

解决方法

我使用 SparkListener可以拦截可用于访问任务指标的onTaskEnd或onStageCompleted事件.

任务指标为您提供Spark用于在SQL选项卡中显示指标的累加器(在查询的详细信息中).

web UI / Details for Query

例如,以下查询:

spark.
  read.
  option("header",true).
  csv("../datasets/people.csv").
  limit(10).
  write.
  csv("people")

给出了10个输出行,因此Spark知道它(你也可以).

enter image description here

您还可以探索Spark SQL的QueryExecutionListener:

The interface of query execution listener that can be used to analyze execution metrics.

您可以使用可用作spark.listenerManager的ExecutionListenerManager注册QueryExecutionListener.

scala> :type spark.listenerManager
org.apache.spark.sql.util.ExecutionListenerManager

scala> spark.listenerManager.
clear   clone   register   unregister

我认为它更接近“裸机”,但之前没有使用过.

@D3V(在评论部分中)提到使用结构化查询的QueryExecution访问numOutputRows SQL指标.值得考虑的事情.

scala> :type q
org.apache.spark.sql.DataFrame

scala> :type q.queryExecution.executedPlan.metrics
Map[String,org.apache.spark.sql.execution.metric.SQLMetric]

q.queryExecution.executedPlan.metrics("numOutputRows").value

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读