scala – 如何从graphx中的元组构建图形并在之后标记节点?
有些上下文可以找到
here,我的想法是我已经从Hive表上的请求中收集的元组创建了一个图形.这些对应于国家之间的贸易关系.
以这种方式构建图形后,顶点未标记.我想学习学位分布并获得最相关国家的名字.我尝试了两个选项: >首先:我尝试使用函数idMap中的顶点的字符串名称映射顶点的索引,该函数在收集和打印十个顶部连接度的函数内. 在这两种情况下,我都会收到以下错误:任务不可序列化 全球代码: import org.apache.spark.SparkContext import org.apache.spark.graphx._ import org.apache.spark.rdd.RDD val sqlContext= new org.apache.spark.sql.hive.HiveContext(sc) val data = sqlContext.sql("select year,trade_flow,reporter_iso,partner_iso,sum(trade_value_us) from comtrade.annual_hs where length(commodity_code)='2' and not partner_iso='WLD' group by year,partner_iso").collect() val data_2010 = data.filter(line => line(0)==2010) val couples = data_2010.map(line=>(line(2),line(3))) //pays->pays 情侣看起来像这样:数组[(任何,任何)] =数组((MWI,MOZ),(WSM,AUS),(MDA,CRI),(KNA,HTI),(PER,ERI),(SWE,CUB) ),… val idMap = sc.broadcast(couples .flatMap{case (x: String,y: String) => Seq(x,y)} .distinct .zipWithIndex .map{case (k,v) => (k,v.toLong)} .toMap) val edges: RDD[(VertexId,VertexId)] = sc.parallelize(couples .map{case (x: String,y: String) => (idMap.value(x),idMap.value(y))}) val graph = Graph.fromEdgeTuples(edges,1) 以这种方式构建,顶点看起来像(68,1) val degrees: VertexRDD[Int] = graph.degrees.cache() //Most connected vertices def topNamesAndDegrees(degrees: VertexRDD[Int],graph: Graph[Int,Int]): Array[(Int,Int)] = { val namesAndDegrees = degrees.innerJoin(graph.vertices) { (id,degree,k) => (id.toInt,degree)} val ord = Ordering.by[(Int,Int),Int](_._2) namesAndDegrees.map(_._2).top(10)(ord)} topNamesAndDegrees(degrees,graph).foreach(println) 我们得到:(79,1016),(64,912),(55,889)…… 检索名称的第一个选项: val idMapbis = sc.parallelize(couples .flatMap{case (x: String,y)} .distinct .zipWithIndex .map{case (k,v) => (v,k)} .toMap) def topNamesAndDegrees(degrees: VertexRDD[Int],Int]): Array[(String,name) => (idMapbis.value(id.toInt),degree)} val ord = Ordering.by[(String,graph).foreach(println) 该任务不可序列化,但函数idMapbis正在工作,因为idMapbis.value没有错误(graph.vertices.take(1)(0)._ 1.toInt) 选项2: graph.vertices.map{case (k,idMapbis.value(k.toInt))} 该任务不再可序列化(对于上下文,这里是如何修改topNamesAndDegrees以获取此选项中连接最多的顶点的名称) def topNamesAndDegrees(degrees: VertexRDD[Int],Int]): Array[(String,name) => (name,graph).foreach(println) 我有兴趣了解如何改进这个选项之一,如果有人知道如何,也许两者都有. 解决方法
您尝试的问题是idMapbis是一个RDD.由于我们已经知道您的数据适合内存,您可以像以前一样使用广播变量:
val idMapRev = sc.broadcast(idMap.value.map{case (k,k)}.toMap) graph.mapVertices{case (id,_) => idMapRev.value(id)} 或者,您可以从头开始使用正确的标签: val countries: RDD[(VertexId,String)] = sc .parallelize(idMap.value.map(_.swap).toSeq) val relationships: RDD[Edge[Int]] = sc.parallelize(couples .map{case (x: String,y: String) => Edge(idMap.value(x),idMap.value(y),1)} ) val graph = Graph(countries,relationships) 第二种方法有一个重要的优点 – 如果图形很大,你可以相对容易地用连接替换广播变量. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |