python – 在PySpark中的cogroup
发布时间:2020-12-20 13:43:20 所属栏目:Python 来源:网络整理
导读:教程建议如下: x = sc.parallelize([("a",1),("b",4)]) y = sc.parallelize([("a",2)]) sorted(x.cogroup(y).collect())[('a',([1],[2])),('b',([4],[]))] 但是,在运行时,我得到以下输出: ('a',(pyspark.resultiterable.ResultIterable object at 0x1d8b19
教程建议如下:
>>> x = sc.parallelize([("a",1),("b",4)]) >>> y = sc.parallelize([("a",2)]) >>> sorted(x.cogroup(y).collect()) [('a',([1],[2])),('b',([4],[]))] 但是,在运行时,我得到以下输出: ('a',(<pyspark.resultiterable.ResultIterable object at 0x1d8b190>,<pyspark.resultiterable.ResultIterable object at 0x1d8b150>)) ('b',(<pyspark.resultiterable.ResultIterable object at 0x1d8b210>,<pyspark.resultiterable.ResultIterable object at 0x1d8b1d0>)) 这有3级嵌套,如果我将输出存储在’r’并执行此操作: for i in r: for j in i[1]: print list(j) 我得到了正确的cogrouped数字: 1)为什么Cogroup不在PySpark中返回像rightjoin / leftouterjoin等数字? 解决方法
简单的答案,因为这是cogroup应该返回的
/** * For each key k in `this` or `other`,return a resulting RDD that contains a tuple with the * list of values for that key in `this` as well as `other`. */ def cogroup[W](other: RDD[(K,W)]): RDD[(K,(Iterable[V],Iterable[W]))] spark中的连接实际上是用cogroup实现的,基本上连接只是将cogroup中的iterables分解为元组.这是火花加入的植入. def join[W](other: RDD[(K,W)],partitioner: Partitioner): RDD[(K,(V,W))] = { this.cogroup(other,partitioner).flatMapValues { case (vs,ws) => for (v <- vs; w <- ws) yield (v,w) } } 至于解释器输出的细微差别(请记住输出是相同的,除了pyspark Iterable没有显示它的内容),除非我看到教程,否则我无法确定.教程可能会显示输出更清晰,即使实际上并不是出现的.还有一件事我在scala shell中运行了一个类似的脚本,它显示了所有的输出. Array((a,(ArrayBuffer(1),ArrayBuffer(2))),(b,(ArrayBuffer(4),ArrayBuffer()))) (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |