加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 填补时间序列Spark的空白

发布时间:2020-12-16 18:42:04 所属栏目:安全 来源:网络整理
导读:处理时间序列数据时遇到问题.由于电源故障,数据集中缺少某些时间戳.我需要通过添加行来填补这些空白,然后我可以插入缺失的值. 输入数据: periodstart usage---------------------------------2015-09-11 02:15 23000 2015-09-11 03:15 23344 2015-09-11 03:
处理时间序列数据时遇到问题.由于电源故障,数据集中缺少某些时间戳.我需要通过添加行来填补这些空白,然后我可以插入缺失的值.

输入数据:

periodstart                usage
---------------------------------
2015-09-11 02:15           23000   
2015-09-11 03:15           23344   
2015-09-11 03:30           23283  
2015-09-11 03:45           23786   
2015-09-11 04:00           25039

通缉输出:

periodstart                usage
---------------------------------
2015-09-11 02:15           23000   
2015-09-11 02:30           0   
2015-09-11 02:45           0   
2015-09-11 03:00           0   
2015-09-11 03:15           23344   
2015-09-11 03:30           23283   
2015-09-11 03:45           23786   
2015-09-11 04:00           25039

现在我已在数据集foreach函数中使用while循环修复此问题.问题是我必须首先将数据集收集到驱动程序才能执行while循环.所以这不是Spark的正确方法.

有人能给我一个更好的解决方案吗?

这是我的代码:

MissingMeasurementsDS.collect().foreach(row => {
  // empty list for new generated measurements
  val output = ListBuffer.empty[Measurement]
  // Missing measurements
  val missingMeasurements = row.getAs[Int]("missingmeasurements")
  val lastTimestamp = row.getAs[Timestamp]("previousperiodstart")
  //Generate missing timestamps
  var i = 1
  while (i <= missingMeasurements) {
    //Increment timestamp with 15 minutes (900000 milliseconds)
    val newTimestamp = lastTimestamp.getTime + (900000 * i)
    output += Measurement(new Timestamp(newTimestamp),0))
    i += 1
  }
  //Join interpolated measurements with correct measurements
  completeMeasurementsDS.join(output.toDS())
})
completeMeasurementsDS.show()
println("OutputDF count = " + completeMeasurementsDS.count())

解决方法

如果输入DataFrame具有以下结构:

root
 |-- periodstart: timestamp (nullable = true)
 |-- usage: long (nullable = true)

斯卡拉

确定最小/最大:

val (minp,maxp) = df
  .select(min($"periodstart").cast("bigint"),max($"periodstart".cast("bigint")))
  .as[(Long,Long)]
  .first

设置步骤,例如15分钟:

val step: Long = 15 * 60

生成参考范围:

val reference = spark
  .range((minp / step) * step,((maxp / step) + 1) * step,step)
  .select($"id".cast("timestamp").alias("periodstart"))

加入并填补空白:

reference.join(df,Seq("periodstart"),"leftouter").na.fill(0,Seq("usage"))

Python

同样在PySpark中:

from pyspark.sql.functions import col,min as min_,max as max_

step = 15 * 60

minp,maxp = df.select(
    min_("periodstart").cast("long"),max_("periodstart").cast("long")
).first()

reference = spark.range(
    (minp / step) * step,step
).select(col("id").cast("timestamp").alias("periodstart"))

reference.join(df,["periodstart"],"leftouter")

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读