python – 在PySpark中的cogroup
发布时间:2020-12-20 13:43:20 所属栏目:Python 来源:网络整理
导读:教程建议如下: x = sc.parallelize([("a",1),("b",4)]) y = sc.parallelize([("a",2)]) sorted(x.cogroup(y).collect())[('a',([1],[2])),('b',([4],[]))] 但是,在运行时,我得到以下输出: ('a',(pyspark.resultiterable.ResultIterable object at 0x1d8b19
|
教程建议如下:
>>> x = sc.parallelize([("a",1),("b",4)])
>>> y = sc.parallelize([("a",2)])
>>> sorted(x.cogroup(y).collect())
[('a',([1],[2])),('b',([4],[]))]
但是,在运行时,我得到以下输出: ('a',(<pyspark.resultiterable.ResultIterable object at 0x1d8b190>,<pyspark.resultiterable.ResultIterable object at 0x1d8b150>))
('b',(<pyspark.resultiterable.ResultIterable object at 0x1d8b210>,<pyspark.resultiterable.ResultIterable object at 0x1d8b1d0>))
这有3级嵌套,如果我将输出存储在’r’并执行此操作: for i in r:
for j in i[1]:
print list(j)
我得到了正确的cogrouped数字: 1)为什么Cogroup不在PySpark中返回像rightjoin / leftouterjoin等数字? 解决方法
简单的答案,因为这是cogroup应该返回的
/** * For each key k in `this` or `other`,return a resulting RDD that contains a tuple with the * list of values for that key in `this` as well as `other`. */ def cogroup[W](other: RDD[(K,W)]): RDD[(K,(Iterable[V],Iterable[W]))] spark中的连接实际上是用cogroup实现的,基本上连接只是将cogroup中的iterables分解为元组.这是火花加入的植入. def join[W](other: RDD[(K,W)],partitioner: Partitioner): RDD[(K,(V,W))] = {
this.cogroup(other,partitioner).flatMapValues { case (vs,ws) =>
for (v <- vs; w <- ws) yield (v,w)
}
}
至于解释器输出的细微差别(请记住输出是相同的,除了pyspark Iterable没有显示它的内容),除非我看到教程,否则我无法确定.教程可能会显示输出更清晰,即使实际上并不是出现的.还有一件事我在scala shell中运行了一个类似的脚本,它显示了所有的输出. Array((a,(ArrayBuffer(1),ArrayBuffer(2))),(b,(ArrayBuffer(4),ArrayBuffer()))) (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
