加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – Spark和SparkSQL:如何模仿窗口功能?

发布时间:2020-12-16 09:25:45 所属栏目:安全 来源:网络整理
导读:描述 给定数据帧df id | date--------------- 1 | 2015-09-01 2 | 2015-09-01 1 | 2015-09-03 1 | 2015-09-04 2 | 2015-09-04 我想创建一个运行计数器或索引, 按相同的ID分组 按该组中的日期排序, 从而 id | date | counter-------------------------- 1 | 2
描述

给定数据帧df

id |       date
---------------
 1 | 2015-09-01
 2 | 2015-09-01
 1 | 2015-09-03
 1 | 2015-09-04
 2 | 2015-09-04

我想创建一个运行计数器或索引,

>按相同的ID分组
>按该组中的日期排序,

从而

id |       date |  counter
--------------------------
 1 | 2015-09-01 |        1
 1 | 2015-09-03 |        2
 1 | 2015-09-04 |        3
 2 | 2015-09-01 |        1
 2 | 2015-09-04 |        2

这是我可以用窗函数实现的,例如

val w = Window.partitionBy("id").orderBy("date")
val resultDF = df.select( df("id"),rowNumber().over(w) )

不幸的是,Spark 1.4.1不支持常规数据帧的窗口函数:

org.apache.spark.sql.AnalysisException: Could not resolve window function 'row_number'. Note that,using window functions currently requires a HiveContext;

问题

>如何在不使用窗口函数的情况下在当前Spark 1.4.1上实现上述计算?
> Spark何时支持常规数据帧的窗口函数?

谢谢!

解决方法

您可以使用RDD执行此操作.就个人而言,我发现RDD的API更有意义 – 我并不总是希望我的数据像数据帧一样“平坦”.

val df = sqlContext.sql("select 1,'2015-09-01'"
    ).unionAll(sqlContext.sql("select 2,'2015-09-01'")
    ).unionAll(sqlContext.sql("select 1,'2015-09-03'")
    ).unionAll(sqlContext.sql("select 1,'2015-09-04'")
    ).unionAll(sqlContext.sql("select 2,'2015-09-04'"))

// dataframe as an RDD (of Row objects)
df.rdd 
  // grouping by the first column of the row
  .groupBy(r => r(0)) 
  // map each group - an Iterable[Row] - to a list and sort by the second column
  .map(g => g._2.toList.sortBy(row => row(1).toString))     
  .collect()

以上结果如下:

Array[List[org.apache.spark.sql.Row]] = 
Array(
  List([1,2015-09-01],[1,2015-09-03],2015-09-04]),List([2,[2,2015-09-04]))

如果你想要在’组’中的位置,你可以使用zipWithIndex.

df.rdd.groupBy(r => r(0)).map(g => 
    g._2.toList.sortBy(row => row(1).toString).zipWithIndex).collect()

Array[List[(org.apache.spark.sql.Row,Int)]] = Array(
  List(([1,0),([1,1),2015-09-04],2)),List(([2,([2,1)))

您可以使用FlatMap将其展平回一个简单的Row对象列表/数组,但是如果您需要在“组”上执行任何不是一个好主意的事情.

像这样使用RDD的缺点是,从DataFrame转换为RDD并再次返回是很繁琐的.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读