scala – 如何使用带有两个RDD的键或filter()来使用spark inters
发布时间:2020-12-16 10:06:00 所属栏目:安全 来源:网络整理
导读:我想在spark中使用keys()by key或filter(). 但我真的不知道如何按键使用intersection(). 所以我尝试使用filter(),但它没有用. 示例 – 这是两个RDD: data1 //RDD[(String,Int)] = Array(("a",1),("a",2),("b",3),("c",1))data2 //RDD[(String,5))val data3
我想在spark中使用keys()by key或filter().
但我真的不知道如何按键使用intersection(). 所以我尝试使用filter(),但它没有用. 示例 – 这是两个RDD: data1 //RDD[(String,Int)] = Array(("a",1),("a",2),("b",3),("c",1)) data2 //RDD[(String,5)) val data3 = data2.map{_._1} data1.filter{_._1 == data3}.collect //Array[(String,Int] = Array() 我想根据data2所拥有的密钥获得一个(key,value)对与data1相同的密钥. 数组((“a”,(“a”,(“b”,3))是我想要的结果. 有没有一种方法可以通过key或filter()使用intersection()来解决这个问题? 解决方法
1. filter()中的广播变量 – 需要提高可扩展性 val data1 = sc.parallelize(Seq(("a",1))) val data2 = sc.parallelize(Seq(("a",5))) // broadcast data2 key list to use in filter method,which runs in executor nodes val bcast = sc.broadcast(data2.map(_._1).collect()) val result = data1.filter(r => bcast.value.contains(r._1)) println(result.collect().toList) //Output List((a,(a,(b,3)) 2. cogroup(类似于按键分组) val data1 = sc.parallelize(Seq(("a",5))) val cogroupRdd: RDD[(String,(Iterable[Int],Iterable[Int]))] = data1.cogroup(data2) /* List( (a,(CompactBuffer(1,CompactBuffer(3))),(CompactBuffer(2,CompactBuffer(5))),(c,(CompactBuffer(1),CompactBuffer())) ) */ //Now filter keys which have two non empty CompactBuffer. You can do that with //filter(row => row._2._1.nonEmpty && row._2._2.nonEmpty) also. val filterRdd = cogroupRdd.filter { case (k,(v1,v2)) => v1.nonEmpty && v2.nonEmpty } /* List( (a,CompactBuffer(5))) ) */ //As we care about first data only,lets pick first compact buffer only // by doing v1.map(val1 => (k,val1)) val result = filterRdd.flatMap { case (k,v2)) => v1.map(val1 => (k,val1)) } //List((a,3)) 3.使用内连接 val resultRdd = data1.join(data2).map(r => (r._1,r._2._1)).distinct() //List((b,1)) 这里data1.join(data2)保存带有公共键的对(内连接) //List((a,(1,3)),(2,5)),1)),(3,1))) (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |