scala – 如何通过.map在另一个RDD中传递一个RDD
我有两个rdd,我想对rdd1的每个项目的RDD2项目进行一些计算.所以,我在用户定义的函数中传递RDD2,如下所示,但我得到的错误就像rdd1不能在另一个rdd中传递.如果我想在两个rdd上执行操作,我可以知道如何实现这个目的吗?
例如: RDD1.map(line => function(line,RDD2)) 解决方法
错误说明,Spark不支持嵌套RDD.通常你必须通过重新设计算法来绕过它.
如何做到这取决于实际的用例,功能中究竟发生了什么以及它的输出是什么. 有时RDD1.cartesian(RDD2),每个元组执行操作然后按键减少将起作用.有时,如果你有(K,V)类型,两个RDD之间的连接将起作用. 如果RDD2很小,你总是可以在驱动程序中收集它,使它成为一个广播变量,并在函数中使用该变量而不是RDD2. @编辑: 例如,假设您的RDD持有字符串,函数将计算RDD中给定RDD记录的次数: def function(line: String,rdd: RDD[String]): (String,Int) = { (line,rdd.filter(_ == line).count) } 这将返回RDD [(String,Int)]. Idea1 您可以尝试使用RDD的笛卡尔方法使用cartesian product. val cartesianProduct = RDD1.cartesian(RDD2) // creates RDD[(String,String)] .map( (r1,r2) => (r1,function2) ) // creates RDD[(String,Int)] .reduceByKey( (c1,c2) => c1 + c2 ) // final RDD[(String,Int)] 这里function2取r1和r2(它们是字符串),如果相等则返回1,否则返回0.最终的映射将产生一个RDD,它将具有元组,其中键将是来自r1的记录,值将是总计数. 问题1:如果你在RDD1中有重复的字符串,这将不起作用.你必须考虑一下.如果RDD1记录有一些完美的唯一ID. 问题2:这确实创造了很多对(对于两个RDD中的1mln记录,它将创建大约500bln对),会很慢并且很可能导致很多shuffling. Idea2 我不明白你对RDD2的大小lacs的评论,所以这可能会或可能不会工作: val rdd2array = sc.broadcast(RDD2.collect()) val result = RDD1.map(line => function(line,rdd2array)) 问题:这可能会炸毁你的记忆.在驱动程序上调用collect(),并将rdd2中的所有记录加载到驱动程序节点上的内存中. Idea3 根据用例,还有其他方法可以解决这个问题,例如brute force algorithm for Similarity Search与您的用例类似(不打算).对此的替代解决方案之一是Locality Sensitive Hashing. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |