scala – 用于大数据数据处理的分布式计算
发布时间:2020-12-16 19:13:45 所属栏目:安全 来源:网络整理
导读:我有一个庞大的时间序列数据,我想使用spark的并行处理/分布式计算进行数据处理. 要求是逐行查看数据,以确定下面指定的组在所需的结果部分下,如果没有执行者之间的某种协调,我真的无法获得分配这一点的火花 t- timeseries datetime sample,lat-latitude,long-
我有一个庞大的时间序列数据,我想使用spark的并行处理/分布式计算进行数据处理.
要求是逐行查看数据,以确定下面指定的组在所需的结果部分下,如果没有执行者之间的某种协调,我真的无法获得分配这一点的火花 t- timeseries datetime sample,lat-latitude,long-longitude 例如:采用一小部分样本数据集来解释案例 t lat long 0 27 28 5 27 28 10 27 28 15 29 49 20 29 49 25 27 28 30 27 28 期望的输出应该是: Lat-long interval (27,28) (0,10) (29,49) (15,20) (27,28) (25,30) 我可以使用这段代码获得所需的结果 val spark = SparkSession.builder().master("local").getOrCreate() import spark.implicits._ val df = Seq( (0,27,28),(5,(10,(15,26,49),(20,(25,(30,28) ).toDF("t","lat","long") val dfGrouped = df .withColumn("lat-long",struct($"lat",$"long")) val wAll = Window.partitionBy().orderBy($"t".asc) dfGrouped.withColumn("lag",lag("lat-long",1,null).over(wAll)) .orderBy(asc("t")).withColumn("detector",when($"lat-long" === $"lag",0) .otherwise(1)).withColumn("runningTotal",sum("detector").over(wAll)) .groupBy("runningTotal","lat-long").agg(struct(min("t"),max("t")).as("interval")) .drop("runningTotal").show } 但是,如果数据进入两个执行器,那么数据就像 执行人1中的数据: t lat long 0 27 28 5 27 28 10 27 28 15 29 49 20 29 49 25 27 28 执行人2中的数据: t lat long 30 27 28 我应该如何获得大量数据的所需输出.必须有更聪明的方法来实现这一点,通过执行器之间的某种协调来分配它以获得该结果. 请指导我一个正确的方向,我已经研究了相同但无法找到解决方案. PS:这只是一个示例. 解决方法
您可以使用UDAF解决此问题.
首先,您可以添加一列代表在您拥有的许多执行程序中分区的t列.像executorIndex = t%((max(t) – min(t))/ numExecutors)之类的东西. 然后,您可以通过executorIndex应用您的UDAF分组. 你的UDAF需要存储一个带有String键的Map(例如)代表一个lat和long对,以及一个int []表示这个lat-long键的maxT和minT. 请询问您是否需要更广泛的解释. 希望这有帮助…… PS:我总结说同一个纬度和长度之间存在一些时间关系,如果你正在追踪一些运动,这是正常的…… (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
相关内容
- unix – 使用sudo运行时,在$shell脚本中获取$USER?
- XFire开发WebService服务器端
- Angular学习笔记之集成三方UI框架、控件
- webservice(二) cxf介绍和用ant发布项目
- Scala中的Ad hoc多态性
- angular – 如何在ngFor(嵌套formArray)中访问ngFor中的for
- bash – 使用xargs将stdin分配给变量
- Bootstrap 基于HTML,CSS,JAVASCRIPT的简洁灵活的流行前端
- angular – 无法在服务中正确设置POST的RequestOptionsArgs
- angularJs异步的问题及promise使用