加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 编程开发 > Python > 正文

python – 在PySpark中的cogroup

发布时间:2020-12-20 13:43:20 所属栏目:Python 来源:网络整理
导读:教程建议如下: x = sc.parallelize([("a",1),("b",4)]) y = sc.parallelize([("a",2)]) sorted(x.cogroup(y).collect())[('a',([1],[2])),('b',([4],[]))] 但是,在运行时,我得到以下输出: ('a',(pyspark.resultiterable.ResultIterable object at 0x1d8b19
教程建议如下:

>>> x = sc.parallelize([("a",1),("b",4)])
>>> y = sc.parallelize([("a",2)])
>>> sorted(x.cogroup(y).collect())
[('a',([1],[2])),('b',([4],[]))]

但是,在运行时,我得到以下输出:

('a',(<pyspark.resultiterable.ResultIterable object at 0x1d8b190>,<pyspark.resultiterable.ResultIterable object at 0x1d8b150>))
('b',(<pyspark.resultiterable.ResultIterable object at 0x1d8b210>,<pyspark.resultiterable.ResultIterable object at 0x1d8b1d0>))

这有3级嵌套,如果我将输出存储在’r’并执行此操作:

for i in r:
     for j in i[1]:
         print list(j)

我得到了正确的cogrouped数字:

1)为什么Cogroup不在PySpark中返回像rightjoin / leftouterjoin等数字?
2)为什么我不能在PySpark shell上复制这个例子?

解决方法

简单的答案,因为这是cogroup应该返回的

/**
   * For each key k in `this` or `other`,return a resulting RDD that contains a tuple with the
   * list of values for that key in `this` as well as `other`.
   */
  def cogroup[W](other: RDD[(K,W)]): RDD[(K,(Iterable[V],Iterable[W]))]

spark中的连接实际上是用cogroup实现的,基本上连接只是将cogroup中的iterables分解为元组.这是火花加入的植入.

def join[W](other: RDD[(K,W)],partitioner: Partitioner): RDD[(K,(V,W))] = {
    this.cogroup(other,partitioner).flatMapValues { case (vs,ws) =>
      for (v <- vs; w <- ws) yield (v,w)
    }
  }

至于解释器输出的细微差别(请记住输出是相同的,除了pyspark Iterable没有显示它的内容),除非我看到教程,否则我无法确定.教程可能会显示输出更清晰,即使实际上并不是出现的.还有一件事我在scala shell中运行了一个类似的脚本,它显示了所有的输出.

Array((a,(ArrayBuffer(1),ArrayBuffer(2))),(b,(ArrayBuffer(4),ArrayBuffer())))

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读