加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 编程开发 > Python > 正文

如何将动态命名的列连接到字典中?

发布时间:2020-12-17 17:39:08 所属栏目:Python 来源:网络整理
导读:给定这些数据帧: IncomingCount-------------------------Venue|Date | 08 | 10 |-------------------------Hotel|20190101| 15 | 03 |Beach|20190101| 93 | 45 |OutgoingCount-------------------------Venue|Date | 07 | 10 | -------------------------B

给定这些数据帧:

IncomingCount
-------------------------
Venue|Date    | 08 | 10 |
-------------------------
Hotel|20190101| 15 | 03 |
Beach|20190101| 93 | 45 |

OutgoingCount
-------------------------
Venue|Date    | 07 | 10 | 
-------------------------
Beach|20190101| 30 | 5  |
Hotel|20190103| 05 | 15 |

我如何可以合并(完全联接)两个表,从而得到如下结果而不必手动遍历两个表的每一行?

Dictionary:
[
 {"Venue":"Hotel","Date":"20190101","08":{ "IncomingCount":15 },"10":{ "IncomingCount":03 } },{"Venue":"Beach","07":{ "OutgoingCount":30 },"08":{ "IncomingCount":93 },"10":{ "IncomingCount":45,"OutgoingCount":15 } },{"Venue":"Hotel","Date":"20190103","07":{ "OutgoingCount":05 },"10":{ "OutgoingCount":15 } }
]

条件是:

>“地点”和“日期”列的作用类似于加入条件.
>其他以数字表示的列是动态创建的.
>如果动态列不存在,则将其排除(或以None作为值包含在内).

最佳答案
这很奇怪,但是可以通过使用spark中的create_map函数来完成.

基本上将列分为四组:键(地点,日期),通用键(10),仅传入键(08),仅传出键(07).

然后按组(键除外)创建映射器,仅映射每个组可用的映射器.应用映射,删除旧列,然后将映射的列重命名为旧名称.

最后将所有行转换为dict(来自df的rdd)并收集.

from pyspark.sql import SparkSession
from pyspark.sql.functions import create_map,col,lit

spark = SparkSession.builder.appName('hotels_and_beaches').getOrCreate()

incoming_counts = spark.createDataFrame([('Hotel',20190101,15,3),('Beach',93,45)],['Venue','Date','08','10']).alias('inc')
outgoing_counts = spark.createDataFrame([('Beach',30,5),('Hotel',20190103,5,15)],'07','10']).alias('out')

df = incoming_counts.join(outgoing_counts,on=['Venue','Date'],how='full')

outgoing_cols = {c for c in outgoing_counts.columns if c not in {'Venue','Date'}}
incoming_cols = {c for c in incoming_counts.columns if c not in {'Venue','Date'}}

common_cols = outgoing_cols.intersection(incoming_cols)

outgoing_cols = outgoing_cols.difference(common_cols)
incoming_cols = incoming_cols.difference(common_cols)

for c in common_cols:
    df = df.withColumn(
        c + '_new',create_map(
            lit('IncomingCount'),col('inc.{}'.format(c)),lit('OutgoingCount'),col('out.{}'.format(c)),)
    ).drop(c).withColumnRenamed(c + '_new',c)

for c in incoming_cols:
    df = df.withColumn(
        c + '_new',c)

for c in outgoing_cols:
    df = df.withColumn(
        c + '_new',create_map(
            lit('OutgoingCount'),c)

result = df.coalesce(1).rdd.map(lambda r: r.asDict()).collect()
print(result)

结果:

[{'Venue': 'Hotel','Date': 20190101,'10': {'OutgoingCount': None,'IncomingCount': 3},'08': {'IncomingCount': 15},'07': {'OutgoingCount': None}},{'Venue': 'Hotel','Date': 20190103,'10': {'OutgoingCount': 15,'IncomingCount': None},'08': {'IncomingCount': None},'07': {'OutgoingCount': 5}},{'Venue': 'Beach','10': {'OutgoingCount': 5,'IncomingCount': 45},'08': {'IncomingCount': 93},'07': {'OutgoingCount': 30}}]

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读