scala – 填补时间序列Spark的空白
发布时间:2020-12-16 18:42:04 所属栏目:安全 来源:网络整理
导读:处理时间序列数据时遇到问题.由于电源故障,数据集中缺少某些时间戳.我需要通过添加行来填补这些空白,然后我可以插入缺失的值. 输入数据: periodstart usage---------------------------------2015-09-11 02:15 23000 2015-09-11 03:15 23344 2015-09-11 03:
处理时间序列数据时遇到问题.由于电源故障,数据集中缺少某些时间戳.我需要通过添加行来填补这些空白,然后我可以插入缺失的值.
输入数据: periodstart usage --------------------------------- 2015-09-11 02:15 23000 2015-09-11 03:15 23344 2015-09-11 03:30 23283 2015-09-11 03:45 23786 2015-09-11 04:00 25039 通缉输出: periodstart usage --------------------------------- 2015-09-11 02:15 23000 2015-09-11 02:30 0 2015-09-11 02:45 0 2015-09-11 03:00 0 2015-09-11 03:15 23344 2015-09-11 03:30 23283 2015-09-11 03:45 23786 2015-09-11 04:00 25039 现在我已在数据集foreach函数中使用while循环修复此问题.问题是我必须首先将数据集收集到驱动程序才能执行while循环.所以这不是Spark的正确方法. 有人能给我一个更好的解决方案吗? 这是我的代码: MissingMeasurementsDS.collect().foreach(row => { // empty list for new generated measurements val output = ListBuffer.empty[Measurement] // Missing measurements val missingMeasurements = row.getAs[Int]("missingmeasurements") val lastTimestamp = row.getAs[Timestamp]("previousperiodstart") //Generate missing timestamps var i = 1 while (i <= missingMeasurements) { //Increment timestamp with 15 minutes (900000 milliseconds) val newTimestamp = lastTimestamp.getTime + (900000 * i) output += Measurement(new Timestamp(newTimestamp),0)) i += 1 } //Join interpolated measurements with correct measurements completeMeasurementsDS.join(output.toDS()) }) completeMeasurementsDS.show() println("OutputDF count = " + completeMeasurementsDS.count()) 解决方法
如果输入DataFrame具有以下结构:
root |-- periodstart: timestamp (nullable = true) |-- usage: long (nullable = true) 斯卡拉 确定最小/最大: val (minp,maxp) = df .select(min($"periodstart").cast("bigint"),max($"periodstart".cast("bigint"))) .as[(Long,Long)] .first 设置步骤,例如15分钟: val step: Long = 15 * 60 生成参考范围: val reference = spark .range((minp / step) * step,((maxp / step) + 1) * step,step) .select($"id".cast("timestamp").alias("periodstart")) 加入并填补空白: reference.join(df,Seq("periodstart"),"leftouter").na.fill(0,Seq("usage")) Python 同样在PySpark中: from pyspark.sql.functions import col,min as min_,max as max_ step = 15 * 60 minp,maxp = df.select( min_("periodstart").cast("long"),max_("periodstart").cast("long") ).first() reference = spark.range( (minp / step) * step,step ).select(col("id").cast("timestamp").alias("periodstart")) reference.join(df,["periodstart"],"leftouter") (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |