加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – Spark GraphX聚合求和

发布时间:2020-12-16 18:51:34 所属栏目:安全 来源:网络整理
导读:我正在尝试计算spark graphx图中节点值的总和.简而言之,图形是一棵树,顶部节点(根)应该是所有孩子和他们的孩子的总和.我的图实际上是一棵看起来像这样的树,预期的总和值应该是1850: +----+ +--------------- | VertexID 14 | | | Value: 1000 +---+--+ +---
我正在尝试计算spark graphx图中节点值的总和.简而言之,图形是一棵树,顶部节点(根)应该是所有孩子和他们的孩子的总和.我的图实际上是一棵看起来像这样的树,预期的总和值应该是1850:

+----+
                     +--------------->    |  VertexID 14
                     |               |    |  Value: 1000
                 +---+--+            +----+
    +------------>      | VertexId 11
    |            |      | Value:     +----+
    |            +------+ Sum of 14 & 24  |  VertexId 24
+---++                +-------------->    |  Value: 550
|    | VertexId 20                   +----+
|    | Value:
+----++Sum of 11 & 911
      |
      |           +-----+
      +----------->     | VertexId 911
                  |     | Value: 300
                  +-----+

对此的第一次尝试如下:

val vertices: RDD[(VertexId,Int)] =
      sc.parallelize(Array((20L,0),(11L,(14L,1000),(24L,550),(911L,300)
      ))

  //note that the last value in the edge is for factor (positive or negative)
    val edges: RDD[Edge[Int]] =
      sc.parallelize(Array(
        Edge(14L,11L,1),Edge(24L,Edge(11L,20L,Edge(911L,1)
      ))

    val dataItemGraph = Graph(vertices,edges)


    val sum: VertexRDD[(Int,BigDecimal,Int)] = dataItemGraph.aggregateMessages[(Int,Int)](
      sendMsg = { triplet => triplet.sendToDst(1,triplet.srcAttr,1) },mergeMsg = { (a,b) => (a._1,a._2 * a._3 + b._2 * b._3,1) }
    )

    sum.collect.foreach(println)

这将返回以下内容:

(20,(1,300,1))
(11,1550,1))

它正在为顶点11做总和,但它没有滚动到根节点(顶点20).我错过了什么,或者有更好的方法吗?当然,树可以是任意大小的,并且每个顶点可以具有任意数量的子边.

解决方法

鉴于图形是有针对性的(如你所示的那样)应该可以编写一个Pregel程序来完成你所要求的:

val result = 
 dataItemGraph.pregel(0,activeDirection = EdgeDirection.Out)(
  (_,vd,msg) => msg + vd,t => Iterator((t.dstId,t.srcAttr)),(x,y) => x + y
 )

 result.vertices.collect().foreach(println)

// Output is:
// (24,550)
// (20,1850)
// (14,1000)
// (11,1550)
// (911,300)

我正在使用EdgeDirection.Out,以便消息仅从下到上发送(否则我们将进入无限循环).

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读