加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 如何使用带有两个RDD的键或filter()来使用spark inters

发布时间:2020-12-16 10:06:00 所属栏目:安全 来源:网络整理
导读:我想在spark中使用keys()by key或filter(). 但我真的不知道如何按键使用intersection(). 所以我尝试使用filter(),但它没有用. 示例 – 这是两个RDD: data1 //RDD[(String,Int)] = Array(("a",1),("a",2),("b",3),("c",1))data2 //RDD[(String,5))val data3
我想在spark中使用keys()by key或filter().

但我真的不知道如何按键使用intersection().

所以我尝试使用filter(),但它没有用.

示例 – 这是两个RDD:

data1 //RDD[(String,Int)] = Array(("a",1),("a",2),("b",3),("c",1))
data2 //RDD[(String,5))

val data3 = data2.map{_._1}

data1.filter{_._1 == data3}.collect //Array[(String,Int] = Array()

我想根据data2所拥有的密钥获得一个(key,value)对与data1相同的密钥.

数组((“a”,(“a”,(“b”,3))是我想要的结果.

有没有一种方法可以通过key或filter()使用intersection()来解决这个问题?

解决方法

This can be achieved in different ways

1. filter()中的广播变量 – 需要提高可扩展性

val data1 = sc.parallelize(Seq(("a",1)))
val data2 = sc.parallelize(Seq(("a",5)))

// broadcast data2 key list to use in filter method,which runs in executor nodes
val bcast = sc.broadcast(data2.map(_._1).collect())

val result = data1.filter(r => bcast.value.contains(r._1))


println(result.collect().toList)
//Output
List((a,(a,(b,3))

2. cogroup(类似于按键分组)

val data1 = sc.parallelize(Seq(("a",5)))

val cogroupRdd: RDD[(String,(Iterable[Int],Iterable[Int]))] = data1.cogroup(data2)
/* List(
  (a,(CompactBuffer(1,CompactBuffer(3))),(CompactBuffer(2,CompactBuffer(5))),(c,(CompactBuffer(1),CompactBuffer()))
) */

//Now filter keys which have two non empty CompactBuffer. You can do that with 
//filter(row => row._2._1.nonEmpty && row._2._2.nonEmpty) also. 
val filterRdd = cogroupRdd.filter { case (k,(v1,v2)) => v1.nonEmpty && v2.nonEmpty } 
/* List(
  (a,CompactBuffer(5)))
) */

//As we care about first data only,lets pick first compact buffer only 
// by doing v1.map(val1 => (k,val1))
val result = filterRdd.flatMap { case (k,v2)) => v1.map(val1 => (k,val1)) }
//List((a,3))

3.使用内连接

val resultRdd = data1.join(data2).map(r => (r._1,r._2._1)).distinct()
//List((b,1))

这里data1.join(data2)保存带有公共键的对(内连接)

//List((a,(1,3)),(2,5)),1)),(3,1)))

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读