加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 编程开发 > Python > 正文

如何使用Pyspark中的时间序列数据滑动窗口转换数据

发布时间:2020-12-20 10:33:28 所属栏目:Python 来源:网络整理
导读:我试图基于时间序列数据的滑动窗口提取功能. 在 Scala中,似乎有一个基于 this post和 the documentation的滑动功能 import org.apache.spark.mllib.rdd.RDDFunctions._sc.parallelize(1 to 100,10) .sliding(3) .map(curSlice = (curSlice.sum / curSlice.si
我试图基于时间序列数据的滑动窗口提取功能.
在 Scala中,似乎有一个基于 this post和 the documentation的滑动功能

import org.apache.spark.mllib.rdd.RDDFunctions._

sc.parallelize(1 to 100,10)
  .sliding(3)
  .map(curSlice => (curSlice.sum / curSlice.size))
  .collect()

我的问题是PySpark中有类似的功能吗?或者,如果没有这样的功能,我们如何实现类似的滑动窗口转换呢?

解决方法

据我所知,滑动功能不能从Python获得,SlidingRDD是私有类,不能在MLlib外部访问.

如果你在现有的RDD上使用滑动,你可以像这样创建穷人滑动:

def sliding(rdd,n):
    assert n > 0
    def gen_window(xi,n):
        x,i = xi
        return [(i - offset,(i,x)) for offset in xrange(n)]

    return (
        rdd.
        zipWithIndex(). # Add index
        flatMap(lambda xi: gen_window(xi,n)). # Generate pairs with offset
        groupByKey(). # Group to create windows
        # Sort values to ensure order inside window and drop indices
        mapValues(lambda vals: [x for (i,x) in sorted(vals)]).
        sortByKey(). # Sort to makes sure we keep original order
        values(). # Get values
        filter(lambda x: len(x) == n)) # Drop beginning and end

或者你可以尝试这样的东西(在toolz的小帮助下)

from toolz.itertoolz import sliding_window,concat

def sliding2(rdd,n):
    assert n > 1

    def get_last_el(i,iter):
        """Return last n - 1 elements from the partition"""
        return  [(i,[x for x in iter][(-n + 1):])]

    def slide(i,iter):
        """Prepend previous items and return sliding window"""
        return sliding_window(n,concat([last_items.value[i - 1],iter]))

    def clean_last_items(last_items):
        """Adjust for empty or to small partitions"""
        clean = {-1: [None] * (n - 1)}
        for i in range(rdd.getNumPartitions()):
            clean[i] = (clean[i - 1] + list(last_items[i]))[(-n + 1):]
        return {k: tuple(v) for k,v in clean.items()}

    last_items = sc.broadcast(clean_last_items(
        rdd.mapPartitionsWithIndex(get_last_el).collectAsMap()))

    return rdd.mapPartitionsWithIndex(slide)

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读