scala – 在spark中对多个DataFrame执行连接
发布时间:2020-12-16 09:55:49 所属栏目:安全 来源:网络整理
导读:我有3个不同进程生成的3dataframes. 每个数据框都具有相同名称的列. 我的数据框看起来像这样 id val1 val2 val3 val4 1 null null null null 2 A2 A21 A31 A41id val1 val2 val3 val4 1 B1 B21 B31 B41 2 null null null nullid val1 val2 val3 val4 1 C1 C2
我有3个不同进程生成的3dataframes.
每个数据框都具有相同名称的列. 我的数据框看起来像这样 id val1 val2 val3 val4 1 null null null null 2 A2 A21 A31 A41 id val1 val2 val3 val4 1 B1 B21 B31 B41 2 null null null null id val1 val2 val3 val4 1 C1 C2 C3 C4 2 C11 C12 C13 C14 在这3个数据帧中,我想创建两个数据帧(最终和合并). 如果数据帧1中存在结果(val1!= null),我将该行存储在最终数据帧中. 我的最终结果应该是: id finalVal1 finalVal2 finalVal3 finalVal4 1 B1 B21 B31 B41 2 A2 A21 A31 A41 Consolidated Dataframe将存储所有3个结果. 我怎样才能有效地做到这一点? 解决方法
如果我理解正确的话,对于你想要找出第一个非空值的每一行,首先查看第一个表,然后查看第二个表,然后查看第三个表.
您只需要根据id连接这三个表,然后使用coalesce函数获取第一个非null元素 import org.apache.spark.sql.functions._ val df1 = sc.parallelize(Seq( (1,null,null),(2,"A2","A21","A31","A41")) ).toDF("id","val1","val2","val3","val4") val df2 = sc.parallelize(Seq( (1,"B1","B21","B31","B41"),null)) ).toDF("id","val4") val df3 = sc.parallelize(Seq( (1,"C1","C2","C3","C4"),"C11","C12","C13","C14")) ).toDF("id","val4") val consolidated = df1.join(df2,"id").join(df3,"id").select( df1("id"),coalesce(df1("val1"),df2("val1"),df3("val1")).as("finalVal1"),coalesce(df1("val2"),df2("val2"),df3("val2")).as("finalVal2"),coalesce(df1("val3"),df2("val3"),df3("val3")).as("finalVal3"),coalesce(df1("val4"),df2("val4"),df3("val4")).as("finalVal4") ) 这为您提供了预期的输出 +---+----+----+----+----+ | id|val1|val2|val3|val4| +---+----+----+----+----+ | 1| B1| B21| B31| B41| | 2| A2| A21| A31| A41| +---+----+----+----+----+ (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |