scala – 如何将RDD [(Key,Value)]转换为Map [Key,RDD [Value]]
发布时间:2020-12-16 18:47:06 所属栏目:安全 来源:网络整理
导读:我搜索了很长一段时间的解决方案,但没有得到任何正确的算法. 在scala中使用Spark RDD,如何将RDD [(Key,Value)]转换为Map [key,RDD [Value]],知道我不能使用collect或其他可能将数据加载到内存中的方法? 事实上,我的最终目标是按键循环Map [Key,RDD [Value]]
我搜索了很长一段时间的解决方案,但没有得到任何正确的算法.
在scala中使用Spark RDD,如何将RDD [(Key,Value)]转换为Map [key,RDD [Value]],知道我不能使用collect或其他可能将数据加载到内存中的方法? 事实上,我的最终目标是按键循环Map [Key,RDD [Value]]并为每个RDD调用saveAsNewAPIHadoopFile [Value] 例如,如果我得到: RDD[("A",1),("A",2),3),("B",4),5),("C",6)] 我想要 : Map[("A" -> RDD[1,2,3]),("B" -> RDD[4,5]),("C" -> RDD[6])] 我想知道在RDD [(Key,Value)]的每个键A,B,C上使用过滤器是否花费不太多,但是我不知道调用过滤器是否有多少次有不同的键将是高效? (当然不是,但可能使用缓存?) 谢谢 解决方法
您应该使用这样的代码(Python):
rdd = sc.parallelize( [("A",6)] ).cache() keys = rdd.keys().distinct().collect() for key in keys: out = rdd.filter(lambda x: x[0] == key).map(lambda (x,y): y) out.saveAsNewAPIHadoopFile (...) 一个RDD不能是另一个RDD的一部分,您无法只收集密钥并将其相关值转换为单独的RDD.在我的示例中,您将迭代缓存的RDD,这是正常的并且可以快速工作 (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |