scala – 如何在单个查询中计算不同类型列的流数据帧的统计信息
发布时间:2020-12-16 18:44:43 所属栏目:安全 来源:网络整理
导读:我有一个流数据帧有三列时间col1,col2. +-----------------------+-------------------+--------------------+|time |col1 |col2 |+-----------------------+-------------------+--------------------+|2018-01-10 15:27:21.289|0.4988615628926717 |0.1926
我有一个流数据帧有三列时间col1,col2.
+-----------------------+-------------------+--------------------+ |time |col1 |col2 | +-----------------------+-------------------+--------------------+ |2018-01-10 15:27:21.289|0.4988615628926717 |0.1926744113882285 | |2018-01-10 15:27:22.289|0.5430687338123434 |0.17084552928040175 | |2018-01-10 15:27:23.289|0.20527770821641478|0.2221980020202523 | |2018-01-10 15:27:24.289|0.130852802747647 |0.5213147910202641 | +-----------------------+-------------------+--------------------+ col1和col2的数据类型是可变的.它可以是字符串或数字数据类型. 解决方法
枚举您想要的案例并选择.例如,如果stream定义为:
import org.apache.spark.sql.types._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.Column val schema = StructType(Seq( StructField("v",TimestampType),StructField("x",IntegerType),StructField("y",StringType),StructField("z",DecimalType(10,2)) )) val df = spark.readStream.schema(schema).format("csv").load("/tmp/foo") 结果将是 val stats = df.select(df.dtypes.flatMap { case (c,"StringType") => Seq(count(c) as s"valid_${c}",count("*") - count(c) as s"invalid_${c}") case (c,t) if Seq("TimestampType","DateType") contains t => Seq(min(c),max(c)) case (c,t) if (Seq("FloatType","DoubleType","IntegerType") contains t) || t.startsWith("DecimalType") => Seq(min(c),max(c),avg(c),stddev(c)) case _ => Seq.empty[Column] }: _*) // root // |-- min(v): timestamp (nullable = true) // |-- max(v): timestamp (nullable = true) // |-- min(x): integer (nullable = true) // |-- max(x): integer (nullable = true) // |-- avg(x): double (nullable = true) // |-- stddev_samp(x): double (nullable = true) // |-- valid_y: long (nullable = false) // |-- invalid_y: long (nullable = false) // |-- min(z): decimal(10,2) (nullable = true) // |-- max(z): decimal(10,2) (nullable = true) // |-- avg(z): decimal(14,6) (nullable = true) // |-- stddev_samp(z): double (nullable = true) (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |