scala – Spark GraphX聚合求和
发布时间:2020-12-16 18:51:34 所属栏目:安全 来源:网络整理
导读:我正在尝试计算spark graphx图中节点值的总和.简而言之,图形是一棵树,顶部节点(根)应该是所有孩子和他们的孩子的总和.我的图实际上是一棵看起来像这样的树,预期的总和值应该是1850: +----+ +--------------- | VertexID 14 | | | Value: 1000 +---+--+ +---
我正在尝试计算spark graphx图中节点值的总和.简而言之,图形是一棵树,顶部节点(根)应该是所有孩子和他们的孩子的总和.我的图实际上是一棵看起来像这样的树,预期的总和值应该是1850:
+----+ +---------------> | VertexID 14 | | | Value: 1000 +---+--+ +----+ +------------> | VertexId 11 | | | Value: +----+ | +------+ Sum of 14 & 24 | VertexId 24 +---++ +--------------> | Value: 550 | | VertexId 20 +----+ | | Value: +----++Sum of 11 & 911 | | +-----+ +-----------> | VertexId 911 | | Value: 300 +-----+ 对此的第一次尝试如下: val vertices: RDD[(VertexId,Int)] = sc.parallelize(Array((20L,0),(11L,(14L,1000),(24L,550),(911L,300) )) //note that the last value in the edge is for factor (positive or negative) val edges: RDD[Edge[Int]] = sc.parallelize(Array( Edge(14L,11L,1),Edge(24L,Edge(11L,20L,Edge(911L,1) )) val dataItemGraph = Graph(vertices,edges) val sum: VertexRDD[(Int,BigDecimal,Int)] = dataItemGraph.aggregateMessages[(Int,Int)]( sendMsg = { triplet => triplet.sendToDst(1,triplet.srcAttr,1) },mergeMsg = { (a,b) => (a._1,a._2 * a._3 + b._2 * b._3,1) } ) sum.collect.foreach(println) 这将返回以下内容: (20,(1,300,1)) (11,1550,1)) 它正在为顶点11做总和,但它没有滚动到根节点(顶点20).我错过了什么,或者有更好的方法吗?当然,树可以是任意大小的,并且每个顶点可以具有任意数量的子边. 解决方法
鉴于图形是有针对性的(如你所示的那样)应该可以编写一个Pregel程序来完成你所要求的:
val result = dataItemGraph.pregel(0,activeDirection = EdgeDirection.Out)( (_,vd,msg) => msg + vd,t => Iterator((t.dstId,t.srcAttr)),(x,y) => x + y ) result.vertices.collect().foreach(println) // Output is: // (24,550) // (20,1850) // (14,1000) // (11,1550) // (911,300) 我正在使用EdgeDirection.Out,以便消息仅从下到上发送(否则我们将进入无限循环). (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |