如何使用Pyspark中的时间序列数据滑动窗口转换数据
发布时间:2020-12-20 10:33:28 所属栏目:Python 来源:网络整理
导读:我试图基于时间序列数据的滑动窗口提取功能. 在 Scala中,似乎有一个基于 this post和 the documentation的滑动功能 import org.apache.spark.mllib.rdd.RDDFunctions._sc.parallelize(1 to 100,10) .sliding(3) .map(curSlice = (curSlice.sum / curSlice.si
我试图基于时间序列数据的滑动窗口提取功能.
在 Scala中,似乎有一个基于 this post和 the documentation的滑动功能 import org.apache.spark.mllib.rdd.RDDFunctions._ sc.parallelize(1 to 100,10) .sliding(3) .map(curSlice => (curSlice.sum / curSlice.size)) .collect() 我的问题是PySpark中有类似的功能吗?或者,如果没有这样的功能,我们如何实现类似的滑动窗口转换呢? 解决方法
据我所知,滑动功能不能从Python获得,SlidingRDD是私有类,不能在MLlib外部访问.
如果你在现有的RDD上使用滑动,你可以像这样创建穷人滑动: def sliding(rdd,n): assert n > 0 def gen_window(xi,n): x,i = xi return [(i - offset,(i,x)) for offset in xrange(n)] return ( rdd. zipWithIndex(). # Add index flatMap(lambda xi: gen_window(xi,n)). # Generate pairs with offset groupByKey(). # Group to create windows # Sort values to ensure order inside window and drop indices mapValues(lambda vals: [x for (i,x) in sorted(vals)]). sortByKey(). # Sort to makes sure we keep original order values(). # Get values filter(lambda x: len(x) == n)) # Drop beginning and end 或者你可以尝试这样的东西(在 from toolz.itertoolz import sliding_window,concat def sliding2(rdd,n): assert n > 1 def get_last_el(i,iter): """Return last n - 1 elements from the partition""" return [(i,[x for x in iter][(-n + 1):])] def slide(i,iter): """Prepend previous items and return sliding window""" return sliding_window(n,concat([last_items.value[i - 1],iter])) def clean_last_items(last_items): """Adjust for empty or to small partitions""" clean = {-1: [None] * (n - 1)} for i in range(rdd.getNumPartitions()): clean[i] = (clean[i - 1] + list(last_items[i]))[(-n + 1):] return {k: tuple(v) for k,v in clean.items()} last_items = sc.broadcast(clean_last_items( rdd.mapPartitionsWithIndex(get_last_el).collectAsMap())) return rdd.mapPartitionsWithIndex(slide) (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
相关内容
- 【Flask模版语言Jinja2】 -- 2019-08-07 10:37:14
- 资深程序员用Python实现数据驱动的接口自动化测试!
- 文档 – Restructuredtext页面中的非TOC标题
- python比较2列,如果第3列与第2列匹配,则用第1列的值编写第4
- python – Django AttributeError:’InterestsForm’对象没
- python – de-Bazel-ing TensorFlow服务
- 移植旧的fortran程序与python numpy一起工作
- 有没有办法在python中同时读取两个文件? (使用相同的循环?
- Python3.X 线程中信号量的使用方法示例
- 如何为python selenium 3.8.0设置’driver.get’的超时?