scala – 如何使用Graph.fromEdgeTuples从Array [(Any,Any)]创建
发布时间:2020-12-16 09:25:55 所属栏目:安全 来源:网络整理
导读:我很兴奋,但是我想从Hive表中获得关系图.我发现一个函数应该允许这个而不定义顶点,但我不能让它工作. 我知道这不是一个可重现的例子,但这是我的代码: import org.apache.spark.SparkContextimport org.apache.spark.graphx._import org.apache.spark.rdd.RD
我很兴奋,但是我想从Hive表中获得关系图.我发现一个函数应该允许这个而不定义顶点,但我不能让它工作.
我知道这不是一个可重现的例子,但这是我的代码: import org.apache.spark.SparkContext import org.apache.spark.graphx._ import org.apache.spark.rdd.RDD val sqlContext= new org.apache.spark.sql.hive.HiveContext(sc) val data = sqlContext.sql("select year,trade_flow,reporter_iso,partner_iso,sum(trade_value_us) from comtrade.annual_hs where length(commodity_code)='2' and not partner_iso='WLD' group by year,partner_iso").collect() val data_2010 = data.filter(line => line(0)==2010) val couples = data_2010.map(line=>(line(2),line(3)) //country to country val graph = Graph.fromEdgeTuples(couples,1) 最后一行生成以下错误: val graph = Graph.fromEdgeTuples(sc.parallelize(couples),1) <console>:31: error: type mismatch; found : Array[(Any,Any)] required: Seq[(org.apache.spark.graphx.VertexId,org.apache.spark.graphx.VertexId)] Error occurred in an application involving default arguments. val graph = Graph.fromEdgeTuples(sc.parallelize(couples),1) 情侣看起来像这样: couples: Array[(Any,Any)] = Array((MWI,MOZ),(WSM,AUS),(MDA,CRI),(KNA,HTI),(PER,ERI),(SWE,CUB),(DEU,PRK),(THA,DJI),(BIH,SVK),(RUS,THA),(SGP,BLR),(MEX,TGO),(TUR,ZAF),(ZWE,SYC),(UGA,GHA),(OMN,SVN),(NZL,SYR),(CHE,SLV),(CZE,LUX),(TGO,COM),(TTO,WLF),(NGA,PAN),(FJI,UKR),(BRA,ECU),(EGY,SWE),(ITA,ARG),(MUS,MLT),(MDG,DZA),(ARE,SUR),(CAN,GUY),COG),(NAM,FIN),HMD),CHE),(SDN,NER),(TUN,USA),GMB),(HUN,TTO),(FRA,BEN),(NER,TCD),(CHN,JPN),(DNK,(MLT,(ARM,OMN),(PRT,IDN),(BEN,PER),BRA),(KAZ,SMR),(CPV,""),(ARG,(BLR,TJK),(AZE,STP),IRL),(POL,(PRY,ETH),(HKG,(QAT,GAB),MUS),(PHL,SGS),KHM),KOR),(AUT,(SYR,GBR),(DOM,(CYP,LAO),(USA,LBR) 如何转换为合适的格式? 解决方法
首先,您不能将String用作VertexId,因此您必须将标签映射到Long.然后,我们需要准备从label到id的映射.只要唯一值的数量相对较小,最简单的方法是创建一个广播变量:
val idMap = sc.broadcast(couples // -> Array[(Any,Any)] // Make sure we use String not Any returned from Row.apply // And convert to Seq so we can flatten results .flatMap{case (x: String,y: String) => Seq(x,y)} // -> Array[String] // Get different keys .distinct // -> Array[String] // Create (key,value) pairs .zipWithIndex // -> Array[(String,Int)] // Convert values to Long so we can use it as a VertexId .map{case (k,v) => (k,v.toLong)} // -> Array[(String,Long)] // Create map .toMap) // -> Map[String,Long] 接下来我们可以使用上面的方法来执行映射: val edges: RDD[(VertexId,VertexId)] = sc.parallelize(couples .map{case (x: String,y: String) => (idMap.value(x),idMap.value(y))} ) 最后我们得到一个图表: val graph = Graph.fromEdgeTuples(edges,1) (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |