加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 按列“grp”分组并压缩DataFrame – (按列“ord”为每

发布时间:2020-12-16 18:21:27 所属栏目:安全 来源:网络整理
导读:假设我有以下DataFrame: +---+--------+---+----+----+|grp|null_col|ord|col1|col2|+---+--------+---+----+----+| 1| null| 3|null| 11|| 2| null| 2| xxx| 22|| 1| null| 1| yyy|null|| 2| null| 7|null| 33|| 1| null| 12|null|null|| 2| null| 19|null
假设我有以下DataFrame:

+---+--------+---+----+----+
|grp|null_col|ord|col1|col2|
+---+--------+---+----+----+
|  1|    null|  3|null|  11|
|  2|    null|  2| xxx|  22|
|  1|    null|  1| yyy|null|
|  2|    null|  7|null|  33|
|  1|    null| 12|null|null|
|  2|    null| 19|null|  77|
|  1|    null| 10| s13|null|
|  2|    null| 11| a23|null|
+---+--------+---+----+----+

这是带有注释的相同样本DF,按grp和ord排序:

scala> df.orderBy("grp","ord").show
+---+--------+---+----+----+
|grp|null_col|ord|col1|col2|
+---+--------+---+----+----+
|  1|    null|  1| yyy|null|
|  1|    null|  3|null|  11|   # grp:1 - last value for `col2` (11)
|  1|    null| 10| s13|null|   # grp:1 - last value for `col1` (s13)
|  1|    null| 12|null|null|   # grp:1 - last values for `null_col`,`ord`
|  2|    null|  2| xxx|  22|   
|  2|    null|  7|null|  33|   
|  2|    null| 11| a23|null|   # grp:2 - last value for `col1` (a23)
|  2|    null| 19|null|  77|   # grp:2 - last values for `null_col`,`ord`,`col2`
+---+--------+---+----+----+

我想压缩它.即按列“grp”对其进行分组,对于每个组,按“ord”列对行进行排序,并在每列中取最后一个非空值(如果有的话).

+---+--------+---+----+----+
|grp|null_col|ord|col1|col2|
+---+--------+---+----+----+
|  1|    null| 12| s13|  11|
|  2|    null| 19| a23|  77|
+---+--------+---+----+----+

我见过以下类似的问题:

> How to select the first row of each group?
> How to find first non-null values in groups? (secondary sorting using dataset api)

但我真正的DataFrame有超过250列,所以我需要一个解决方案,我不必明确指定所有列.

我无法绕过它……

MCVE:如何创建示例DataFrame:

>创建本地文件“/tmp/data.txt”并复制并粘贴DataFrame的上下文(如上所述)
>定义function readSparkOutput()
>将“/tmp/data.txt”解析为DataFrame:

val df = readSparkOutput("file:///tmp/data.txt")

更新:我认为它应该类似于以下SQL:

SELECT
  grp,ord,null_col,col1,col2
FROM (
    SELECT
      grp,FIRST(null_col) OVER (PARTITION BY grp ORDER BY ord DESC) as null_col,FIRST(col1) OVER (PARTITION BY grp ORDER BY ord DESC) as col1,FIRST(col2) OVER (PARTITION BY grp ORDER BY ord DESC) as col2,ROW_NUMBER() OVER (PARTITION BY grp ORDER BY ord DESC) as rn
    FROM table_name) as v
WHERE v.rn = 1;

how can we dynamically generate such a Spark query?

我尝试了以下简化方法:

import org.apache.spark.sql.expressions.Window

val win = Window
  .partitionBy("grp")
  .orderBy($"ord".desc)

val cols = df.columns.map(c => first(c,ignoreNulls=true).over(win).as(c))

产生:

scala> cols
res23: Array[org.apache.spark.sql.Column] = Array(first(grp,true) OVER (PARTITION BY grp ORDER BY ord DESC NULLS LAST UnspecifiedFrame) AS `grp`,first(null_col,true) OVER (PARTITION BY grp ORDER BY ord DESC NULLS LAST UnspecifiedFrame) AS `null_col`,first(ord,true) OVER (PARTITION BY grp ORDER BY ord DESC NULLS LAST UnspecifiedFrame) AS `ord`,first(col1,true) OVER (PARTITION BY grp ORDER BY ord DESC NULLS LAST UnspecifiedFrame) AS `col1`,first(col2,true) OVER (PARTITION BY grp ORDER BY ord DESC NULLS LAST UnspecifiedFrame) AS `col2`)

但我无法将其传递给df.select:

scala> df.select(cols.head,cols.tail: _*).show
<console>:34: error: no `: _*' annotation allowed here
(such annotations are only allowed in arguments to *-parameters)
       df.select(cols.head,cols.tail: _*).show

另一种尝试:

scala> df.select(cols.map(col): _*).show
<console>:34: error: type mismatch;
 found   : String => org.apache.spark.sql.Column
 required: org.apache.spark.sql.Column => ?
       df.select(cols.map(col): _*).show

解决方法

我会像@LeoC一样采用相同的方法,但我相信没有必要将列名称作为字符串来操作,而且我会选择更像spark-sql的答案.

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{col,first,last}

val win = Window.partitionBy("grp").orderBy(col("ord")).rowsBetween(0,Window.unboundedFollowing)

// In case there is more than one group column
val nonAggCols = Seq("grp")

// Select columns to aggregate on
val cols: Seq[String] = df.columns.diff(nonAggCols).toSeq

// Map over selection and apply fct
val aggregations: Seq[Column] = cols.map(c => first(col(c),ignoreNulls = true).as(c))

// I'd rather cache the following step as it might get expensive
val step1 = cols.foldLeft(df)((acc,c) => acc.withColumn(c,last(col(c),ignoreNulls = true).over(win))).cache

// Finally we can aggregate our results as followed
val results = step1.groupBy(nonAggCols.head,nonAggCols.tail: _*).agg(aggregations.head,aggregations.tail: _*)

results.show
// +---+--------+---+----+----+
// |grp|null_col|ord|col1|col2|
// +---+--------+---+----+----+
// |  1|    null| 12| s13|  11|
// |  2|    null| 19| a23|  77|
// +---+--------+---+----+----+

我希望这有帮助.

编辑:您没有得到相同结果的原因是因为您使用的读者不正确.

它将文件中的null解释为字符串而不是null;即:

scala> df.filter('col1.isNotNull).show
// +---+--------+---+----+----+
// |grp|null_col|ord|col1|col2|
// +---+--------+---+----+----+
// |  1|    null|  3|null|  11|
// |  2|    null|  2| xxx|  22|
// |  1|    null|  1| yyy|null|
// |  2|    null|  7|null|  33|
// |  1|    null| 12|null|null|
// |  2|    null| 19|null|  77|
// |  1|    null| 10| s13|null|
// |  2|    null| 11| a23|null|
// +---+--------+---+----+----+

这是我的readSparkOutput版本:

def readSparkOutput(filePath: String): org.apache.spark.sql.DataFrame = {
  val step1 = spark.read
    .option("header","true")
    .option("inferSchema","true")
    .option("delimiter","|")
    .option("parserLib","UNIVOCITY")
    .option("ignoreLeadingWhiteSpace","true")
    .option("ignoreTrailingWhiteSpace","true")
    .option("comment","+")
    .csv(filePath)

  val step2 = step1.select(step1.columns.filterNot(_.startsWith("_c")).map(step1(_)): _*)

  val columns = step2.columns
  columns.foldLeft(step2)((acc,when(col(c) =!= "null",col(c))))
}

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读