加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 如何通过.map在另一个RDD中传递一个RDD

发布时间:2020-12-16 10:00:29 所属栏目:安全 来源:网络整理
导读:我有两个rdd,我想对rdd1的每个项目的RDD2项目进行一些计算.所以,我在用户定义的函数中传递RDD2,如下所示,但我得到的错误就像rdd1不能在另一个rdd中传递.如果我想在两个rdd上执行操作,我可以知道如何实现这个目的吗? 例如: RDD1.map(line = function(line,R
我有两个rdd,我想对rdd1的每个项目的RDD2项目进行一些计算.所以,我在用户定义的函数中传递RDD2,如下所示,但我得到的错误就像rdd1不能在另一个rdd中传递.如果我想在两个rdd上执行操作,我可以知道如何实现这个目的吗?

例如:

RDD1.map(line => function(line,RDD2))

解决方法

错误说明,Spark不支持嵌套RDD.通常你必须通过重新设计算法来绕过它.

如何做到这取决于实际的用例,功能中究竟发生了什么以及它的输出是什么.

有时RDD1.cartesian(RDD2),每个元组执行操作然后按键减少将起作用.有时,如果你有(K,V)类型,两个RDD之间的连接将起作用.

如果RDD2很小,你总是可以在驱动程序中收集它,使它成为一个广播变量,并在函数中使用该变量而不是RDD2.

@编辑:

例如,假设您的RDD持有字符串,函数将计算RDD中给定RDD记录的次数:

def function(line: String,rdd: RDD[String]): (String,Int) = {
   (line,rdd.filter(_ == line).count)
}

这将返回RDD [(String,Int)].

Idea1

您可以尝试使用RDD的笛卡尔方法使用cartesian product.

val cartesianProduct = RDD1.cartesian(RDD2) // creates RDD[(String,String)]
                           .map( (r1,r2) => (r1,function2) ) // creates RDD[(String,Int)]
                           .reduceByKey( (c1,c2) => c1 + c2 ) // final RDD[(String,Int)]

这里function2取r1和r2(它们是字符串),如果相等则返回1,否则返回0.最终的映射将产生一个RDD,它将具有元组,其中键将是来自r1的记录,值将是总计数.

问题1:如果你在RDD1中有重复的字符串,这将不起作用.你必须考虑一下.如果RDD1记录有一些完美的唯一ID.

问题2:这确实创造了很多对(对于两个RDD中的1mln记录,它将创建大约500bln对),会很慢并且很可能导致很多shuffling.

Idea2

我不明白你对RDD2的大小lacs的评论,所以这可能会或可能不会工作:

val rdd2array = sc.broadcast(RDD2.collect())
val result = RDD1.map(line => function(line,rdd2array))

问题:这可能会炸毁你的记忆.在驱动程序上调用collect(),并将rdd2中的所有记录加载到驱动程序节点上的内存中.

Idea3

根据用例,还有其他方法可以解决这个问题,例如brute force algorithm for Similarity Search与您的用例类似(不打算).对此的替代解决方案之一是Locality Sensitive Hashing.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读