加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 加入多个rdds

发布时间:2020-12-16 08:44:53 所属栏目:安全 来源:网络整理
导读:我有4DD类型的RDD :((int,int,int),values)和我的rdds是 rdd1: ((a,b,c),value) rdd2:((a,d,e),valueA) rdd3:((f,g),valueB)rdd4:((h,i,valueC) 我如何加入rdd,如rdd1加入rdd2 on“a”rdd1 join rdd2 on“b”and rdd1 join rdd3 on on“c” 所以在Scala中输
我有4DD类型的RDD :((int,int,int),values)和我的rdds是

rdd1: ((a,b,c),value) 
rdd2:((a,d,e),valueA) 
rdd3:((f,g),valueB)
rdd4:((h,i,valueC)

我如何加入rdd,如rdd1加入rdd2 on“a”rdd1 join rdd2 on“b”and rdd1 join rdd3 on on“c”

所以在Scala中输出是finalRdd:((a,valueA,valueB,valueC,value))?

我尝试用collectAsMap做这个,但它没有很好地工作并抛出异常

代码仅适用于rdd1 join rdd2

val newrdd2=rdd2.map{case( (a,d)=>(a,d)}.collectAsMap
val joined=rdd1.map{case( (a,d)=>(newrdd2.get(a).get,c,d)}

rdd1: ((1,2,3),animals)
rdd2:((1,anyInt,anyInt),cat)
rdd3:((anyInt,cow )
rdd 4: ((anyInt,parrot)

输出应该是((1,动物,猫,牛,鹦鹉)

解决方法

在RDD上有一个方便的连接方法,但您需要使用特定的连接键来键入它,这是Spark用于分区和混洗的方法.

从the docs开始:

join(otherDataset,[numTasks]) : When called on datasets of type (K,V) and (K,W),returns a dataset of (K,(V,W)) pairs with all pairs of elements for each key. Outer joins are supported through leftOuterJoin,rightOuterJoin,and fullOuterJoin.

我不能编译我在哪里,但手动它是这样的:

val rdd1KeyA = rdd1.map(x => (x._1._1,(x._1._2,x._1._3. x._2) // RDD(a,(b,value))
val rdd2KeyA = rdd2.map(x => (x._1._1,x._2) // RDD(a,valueA)
val joined1 = rdd1KeyA.join(rdd2KeyA) // RDD(a,((b,value),valueA))

val rdd3KeyB = rdd3.map(x => (x._1._2,x._2) // RDD(b,valueB)
val joined1KeyB = joined1.map(x => (x._2._1._1,(x._1,x._2._1._2,x._2._1._3. x._2._2) // RDD(b,(a,value,valueA))
val joined2 = joined1KeyB.join(rdd3keyB) // RDD(b,((a,valueA),valueB))

…等等

避免收集*函数,因为它们不使用数据的分布式特性,并且容易在大负载上失败,它们将RDD上的所有数据混合到主节点上的内存中集合,可能会使一切都搞砸.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读