scala – 如何针对另一个数据帧过滤一个spark数据帧
发布时间:2020-12-16 19:09:32 所属栏目:安全 来源:网络整理
导读:我正在尝试将一个数据帧与另一个数据帧进行过滤: scala val df1 = sc.parallelize((1 to 100).map(a=(s"user $a",a*0.123,a))).toDF("name","score","user_id")scala val df2 = sc.parallelize(List(2,3,4,5,6)).toDF("valid_id") 现在我想过滤df1并返回一
我正在尝试将一个数据帧与另一个数据帧进行过滤:
scala> val df1 = sc.parallelize((1 to 100).map(a=>(s"user $a",a*0.123,a))).toDF("name","score","user_id") scala> val df2 = sc.parallelize(List(2,3,4,5,6)).toDF("valid_id") 现在我想过滤df1并返回一个包含df1中所有行的数据帧,其中user_id在df2(“valid_id”)中.换句话说,我想要df1中的所有行,其中user_id是2,5或6 scala> df1.select("user_id").filter($"user_id" in df2("valid_id")) warning: there were 1 deprecation warning(s); re-run with -deprecation for details org.apache.spark.sql.AnalysisException: resolved attribute(s) valid_id#20 missing from user_id#18 in operator !Filter user_id#18 IN (valid_id#20); 另一方面,当我尝试对函数进行过滤时,一切看起来都很棒: scala> df1.select("user_id").filter(($"user_id" % 2) === 0) res1: org.apache.spark.sql.DataFrame = [user_id: int] 为什么我收到此错误?我的语法有问题吗? 以下评论我试图做左外连接: scala> df1.show +-------+------------------+-------+ | name| score|user_id| +-------+------------------+-------+ | user 1| 0.123| 1| | user 2| 0.246| 2| | user 3| 0.369| 3| | user 4| 0.492| 4| | user 5| 0.615| 5| | user 6| 0.738| 6| | user 7| 0.861| 7| | user 8| 0.984| 8| | user 9| 1.107| 9| |user 10| 1.23| 10| |user 11| 1.353| 11| |user 12| 1.476| 12| |user 13| 1.599| 13| |user 14| 1.722| 14| |user 15| 1.845| 15| |user 16| 1.968| 16| |user 17| 2.091| 17| |user 18| 2.214| 18| |user 19|2.3369999999999997| 19| |user 20| 2.46| 20| +-------+------------------+-------+ only showing top 20 rows scala> df2.show +--------+ |valid_id| +--------+ | 2| | 3| | 4| | 5| | 6| +--------+ scala> df1.join(df2,df1("user_id") === df2("valid_id")) res6: org.apache.spark.sql.DataFrame = [name: string,score: double,user_id: int,valid_id: int] scala> res6.collect res7: Array[org.apache.spark.sql.Row] = Array() scala> df1.join(df2,df1("user_id") === df2("valid_id"),"left_outer") res8: org.apache.spark.sql.DataFrame = [name: string,valid_id: int] scala> res8.count res9: Long = 0 我用scala 2.10.5运行spark 1.5.0 解决方法
你想要一个(常规)内连接,而不是一个外连接:)
df1.join(df2,df1("user_id") === df2("valid_id")) (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |