加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 编程开发 > Python > 正文

如何在Spark中的map函数中使用数据帧?

发布时间:2020-12-16 22:52:58 所属栏目:Python 来源:网络整理
导读:定义: sampleDF是样本数据框,具有用于查找目的的列表记录. sampleDS是一个RDD,其中包含元素列表. mappingFunction是在sampleDF中查找sampleDS的元素,如果它们存在于sampleDF中则映射到1,如果不存在则映射到0. 我有一个映射函数如下: def mappingFunction(e

定义:

> sampleDF是样本数据框,具有用于查找目的的列表记录.
> sampleDS是一个RDD,其中包含元素列表.
> mappingFunction是在sampleDF中查找sampleDS的元素,如果它们存在于sampleDF中则映射到1,如果不存在则映射到0.

我有一个映射函数如下:

def mappingFunction(element):
    # The dataframe lookup!
    lookupResult = sampleDF.filter(sampleDF[0] == element).collect()
    if len(lookupResult) > 0:
        print lookupResult
        return 1
    return 0

问题:

在映射函数之外访问sampleDF工作得非常好但是只要我在函数内部使用它,我就会收到以下错误:

py4j.Py4JException: Method __getnewargs__([]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:335)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:344)
    at py4j.Gateway.invoke(Gateway.java:252)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:209)
    at java.lang.Thread.run(Thread.java:744)

我还尝试了什么:

我确实尝试保存一个临时表并在map函数中使用sqlContext select仍然无法使其工作.这是我得到的错误:

  File "/usr/lib64/python2.6/pickle.py",line 286,in save
    f(self,obj) # Call unbound method with explicit self
  File "/usr/lib64/python2.6/pickle.py",line 649,in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib64/python2.6/pickle.py",line 686,in _batch_setitems
    save(v)
  File "/usr/lib64/python2.6/pickle.py",line 331,in save
    self.save_reduce(obj=obj,*rv)
  File "/opt/spark/python/pyspark/cloudpickle.py",line 542,in save_reduce
    save(state)
  File "/usr/lib64/python2.6/pickle.py",line 681,line 306,in save
    rv = reduce(self.proto)
TypeError: 'JavaPackage' object is not callable

我要的是:

我试图通过简单的例子来简化我的问题.任何有关如何在地图功能中使用数据框的帮助都非常受欢迎.

最佳答案
这不可能. Spark不支持对分布式数据结构(RDD,DataFrame,Datasets)的嵌套操作.即使它确实执行了大量的工作也不是一个好主意.根据您显示的代码,您可能希望将RDD转换为DataFrame并执行join wit

(rdd.map(x => (x,)).toDF(["element"])
  .join(sampleDF,sampleDF[0] == df[0])
  .groupBy("element")
  .agg(count("element") > 0))

在侧面说明打印内部地图是完全没用的,不提及它增加额外的IO开销.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读