加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 百科 > 正文

Pyspark用户定义了列的聚合计算

发布时间:2020-12-14 04:54:22 所属栏目:百科 来源:网络整理
导读:我正在为Pyspark中的分类器准备输入数据.我一直在SparkSQL中使用聚合函数来提取平均值和方差等功能.这些按活动,名称和窗口分组.通过将unix时间戳除以10000来分解为10秒的时间窗来计算窗口. sample = sqlContext.sql("SELECT activity,name,window,avg(acc_x)
我正在为Pyspark中的分类器准备输入数据.我一直在SparkSQL中使用聚合函数来提取平均值和方差等功能.这些按活动,名称和窗口分组.通过将unix时间戳除以10000来分解为10秒的时间窗来计算窗口.

sample = sqlContext.sql("SELECT activity,name,window,avg(acc_x) as avgX,variance(acc_x) as varX FROM data  GROUP BY activity,window ORDER BY activity,window")

结果看起来像

Activity  Name         Window       AvgX       VarX
Walk    accelerometer  95875        2.0          1.0

我现在要做的是计算X中每个点的平均斜率.

为此,我需要时间戳,窗口和X.我已经在Python中实现了逻辑,使用数组,这就是它的样子 – 计算每个点之间的斜率,然后获得平均斜率.理想情况下,我想在UDAF中这样做,Pyspark尚不支持. (它看起来像这样,如果下面的函数被称为斜率.那么在sql中你可以做斜率(时间戳,X)作为avgSlopeX

编辑 – 更改输入,使其更清晰.
所以,我正在做的就是计算每个点之间的斜率,然后返回该窗口中斜率的平均值.所以,当我得到每个窗口的平均值和方差时,我也希望得到平均斜率.

#sample input
timestamp = [1464703425544,1464703426534,1464703427551,1464703428587,1464703429512,1464703430493,1464703431505,1464703432543,1464703433513,1464703434529]

values = [1021.31,1021.26,1021.19,1021.1,1021.05,1021.02]

i = 0; 
slope = 0.0;
totalSlope = 0.0;

while (i < len(timestamp) - 1):
    y2 = values[i+1];
    y1 = values[i];

    x2 = timestamp[i + 1];
    x1 = timestamp[i]; 
    slope = ((y2-y1)/(x2-x1)); 
    totalSlope = totalSlope + slope;
    i=i+1

avgSlope = (totalSlope/len(x_values))

我该如何实现呢?我应该尝试转换为pandas数据帧然后转换为numpy数组吗?如果是这样,我怎样才能确保数据仍然正确映射,同时记住sql查询中的GROUP BY活动,名称窗口.

解决方法

一般来说,这不是UDAF的工作,因为UDAF没有提供任何定义订单的方法.看起来你真正需要的是窗口函数和标准聚合的某种组合.

from pyspark.sql.functions import col,lag,avg
from pyspark.sql.window import Window

df = ... 
## DataFrame[activity: string,name: string,window: bigint,##   timestamp: bigint,value: float]

group = ["activity","name","window"]

w = (Window()
    .partitionBy(*group)
    .orderBy("timestamp"))

v_diff = col("value") - lag("value",1).over(w)
t_diff = col("timestamp") - lag("timestamp",1).over(w)

slope = v_diff / t_diff

df.withColumn("slope",slope).groupBy(*group).agg(avg(col("slope")))

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读