加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 如何将RDD [(Key,Value)]转换为Map [Key,RDD [Value]]

发布时间:2020-12-16 18:47:06 所属栏目:安全 来源:网络整理
导读:我搜索了很长一段时间的解决方案,但没有得到任何正确的算法. 在scala中使用Spark RDD,如何将RDD [(Key,Value)]转换为Map [key,RDD [Value]],知道我不能使用collect或其他可能将数据加载到内存中的方法? 事实上,我的最终目标是按键循环Map [Key,RDD [Value]]
我搜索了很长一段时间的解决方案,但没有得到任何正确的算法.

在scala中使用Spark RDD,如何将RDD [(Key,Value)]转换为Map [key,RDD [Value]],知道我不能使用collect或其他可能将数据加载到内存中的方法?

事实上,我的最终目标是按键循环Map [Key,RDD [Value]]并为每个RDD调用saveAsNewAPIHadoopFile [Value]

例如,如果我得到:

RDD[("A",1),("A",2),3),("B",4),5),("C",6)]

我想要 :

Map[("A" -> RDD[1,2,3]),("B" -> RDD[4,5]),("C" -> RDD[6])]

我想知道在RDD [(Key,Value)]的每个键A,B,C上使用过滤器是否花费不太多,但是我不知道调用过滤器是否有多少次有不同的键将是高效? (当然不是,但可能使用缓存?)

谢谢

解决方法

您应该使用这样的代码(Python):

rdd = sc.parallelize( [("A",6)] ).cache()
keys = rdd.keys().distinct().collect()
for key in keys:
    out = rdd.filter(lambda x: x[0] == key).map(lambda (x,y): y)
    out.saveAsNewAPIHadoopFile (...)

一个RDD不能是另一个RDD的一部分,您无法只收集密钥并将其相关值转换为单独的RDD.在我的示例中,您将迭代缓存的RDD,这是正常的并且可以快速工作

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读