加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 如何使用Graph.fromEdgeTuples从Array [(Any,Any)]创建

发布时间:2020-12-16 09:25:55 所属栏目:安全 来源:网络整理
导读:我很兴奋,但是我想从Hive表中获得关系图.我发现一个函数应该允许这个而不定义顶点,但我不能让它工作. 我知道这不是一个可重现的例子,但这是我的代码: import org.apache.spark.SparkContextimport org.apache.spark.graphx._import org.apache.spark.rdd.RD
我很兴奋,但是我想从Hive表中获得关系图.我发现一个函数应该允许这个而不定义顶点,但我不能让它工作.

我知道这不是一个可重现的例子,但这是我的代码:

import org.apache.spark.SparkContext
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
val sqlContext= new org.apache.spark.sql.hive.HiveContext(sc)
val data = sqlContext.sql("select year,trade_flow,reporter_iso,partner_iso,sum(trade_value_us) from comtrade.annual_hs where length(commodity_code)='2' and not partner_iso='WLD' group by year,partner_iso").collect()
val data_2010 = data.filter(line => line(0)==2010)
val couples = data_2010.map(line=>(line(2),line(3)) //country to country 

val graph = Graph.fromEdgeTuples(couples,1)

最后一行生成以下错误:

val graph = Graph.fromEdgeTuples(sc.parallelize(couples),1)
<console>:31: error: type mismatch;
found   : Array[(Any,Any)]
required: Seq[(org.apache.spark.graphx.VertexId,org.apache.spark.graphx.VertexId)]
Error occurred in an application involving default arguments.
val graph = Graph.fromEdgeTuples(sc.parallelize(couples),1)

情侣看起来像这样:

couples: Array[(Any,Any)] = Array((MWI,MOZ),(WSM,AUS),(MDA,CRI),(KNA,HTI),(PER,ERI),(SWE,CUB),(DEU,PRK),(THA,DJI),(BIH,SVK),(RUS,THA),(SGP,BLR),(MEX,TGO),(TUR,ZAF),(ZWE,SYC),(UGA,GHA),(OMN,SVN),(NZL,SYR),(CHE,SLV),(CZE,LUX),(TGO,COM),(TTO,WLF),(NGA,PAN),(FJI,UKR),(BRA,ECU),(EGY,SWE),(ITA,ARG),(MUS,MLT),(MDG,DZA),(ARE,SUR),(CAN,GUY),COG),(NAM,FIN),HMD),CHE),(SDN,NER),(TUN,USA),GMB),(HUN,TTO),(FRA,BEN),(NER,TCD),(CHN,JPN),(DNK,(MLT,(ARM,OMN),(PRT,IDN),(BEN,PER),BRA),(KAZ,SMR),(CPV,""),(ARG,(BLR,TJK),(AZE,STP),IRL),(POL,(PRY,ETH),(HKG,(QAT,GAB),MUS),(PHL,SGS),KHM),KOR),(AUT,(SYR,GBR),(DOM,(CYP,LAO),(USA,LBR)

如何转换为合适的格式?

解决方法

首先,您不能将String用作VertexId,因此您必须将标签映射到Long.然后,我们需要准备从label到id的映射.只要唯一值的数量相对较小,最简单的方法是创建一个广播变量:

val idMap = sc.broadcast(couples // -> Array[(Any,Any)]
  // Make sure we use String not Any returned from Row.apply
  // And convert to Seq so we can flatten results
  .flatMap{case (x: String,y: String) => Seq(x,y)} // -> Array[String]
  // Get different keys
  .distinct // -> Array[String]
  // Create (key,value) pairs
  .zipWithIndex  // -> Array[(String,Int)]
  // Convert values to Long so we can use it as a VertexId
  .map{case (k,v) => (k,v.toLong)}  // -> Array[(String,Long)]
  // Create map
  .toMap) // -> Map[String,Long]

接下来我们可以使用上面的方法来执行映射:

val edges: RDD[(VertexId,VertexId)] = sc.parallelize(couples
  .map{case (x: String,y: String) => (idMap.value(x),idMap.value(y))}
)

最后我们得到一个图表:

val graph = Graph.fromEdgeTuples(edges,1)

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读