加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 用于大数据数据处理的分布式计算

发布时间:2020-12-16 19:13:45 所属栏目:安全 来源:网络整理
导读:我有一个庞大的时间序列数据,我想使用spark的并行处理/分布式计算进行数据处理. 要求是逐行查看数据,以确定下面指定的组在所需的结果部分下,如果没有执行者之间的某种协调,我真的无法获得分配这一点的火花 t- timeseries datetime sample,lat-latitude,long-
我有一个庞大的时间序列数据,我想使用spark的并行处理/分布式计算进行数据处理.
要求是逐行查看数据,以确定下面指定的组在所需的结果部分下,如果没有执行者之间的某种协调,我真的无法获得分配这一点的火花

t- timeseries datetime sample,lat-latitude,long-longitude

例如:采用一小部分样本数据集来解释案例

t   lat long
0   27  28
5   27  28
10  27  28
15  29  49
20  29  49
25  27  28
30  27  28

期望的输出应该是:

Lat-long    interval
(27,28) (0,10)
(29,49) (15,20)
(27,28) (25,30)

我可以使用这段代码获得所需的结果

val spark = SparkSession.builder().master("local").getOrCreate()

import spark.implicits._

 val df = Seq(
  (0,27,28),(5,(10,(15,26,49),(20,(25,(30,28)
).toDF("t","lat","long")

val dfGrouped = df
.withColumn("lat-long",struct($"lat",$"long"))

val wAll = Window.partitionBy().orderBy($"t".asc)

dfGrouped.withColumn("lag",lag("lat-long",1,null).over(wAll))
.orderBy(asc("t")).withColumn("detector",when($"lat-long" === $"lag",0)
    .otherwise(1)).withColumn("runningTotal",sum("detector").over(wAll))
.groupBy("runningTotal","lat-long").agg(struct(min("t"),max("t")).as("interval"))
.drop("runningTotal").show
}

但是,如果数据进入两个执行器,那么数据就像

执行人1中的数据:

t   lat long
0   27  28
5   27  28
10  27  28
15  29  49
20  29  49
25  27  28

执行人2中的数据:

t   lat long
30   27  28

我应该如何获得大量数据的所需输出.必须有更聪明的方法来实现这一点,通过执行器之间的某种协调来分配它以获得该结果.

请指导我一个正确的方向,我已经研究了相同但无法找到解决方案.

PS:这只是一个示例.

解决方法

您可以使用UDAF解决此问题.
首先,您可以添加一列代表在您拥有的许多执行程序中分区的t列.像executorIndex = t%((max(t) – min(t))/ numExecutors)之类的东西.

然后,您可以通过executorIndex应用您的UDAF分组.

你的UDAF需要存储一个带有String键的Map(例如)代表一个lat和long对,以及一个int []表示这个lat-long键的maxT和minT.

请询问您是否需要更广泛的解释.

希望这有帮助……

PS:我总结说同一个纬度和长度之间存在一些时间关系,如果你正在追踪一些运动,这是正常的……

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读