加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

如何迭代记录火花scala?

发布时间:2020-12-16 09:26:25 所属栏目:安全 来源:网络整理
导读:我有一个变量“myrdd”,它是一个avro文件,通过hadoopfile加载了10条记录. 当我做 myrdd.first_1.datum.getName() 我可以得到这个名字.问题是,我在“myrdd”中有10条记录.当我做: myrdd.map(x = {println(x._1.datum.getName())}) 它不起作用,一次打印出一个
我有一个变量“myrdd”,它是一个avro文件,通过hadoopfile加载了10条记录.

当我做

myrdd.first_1.datum.getName()

我可以得到这个名字.问题是,我在“myrdd”中有10条记录.当我做:

myrdd.map(x => {println(x._1.datum.getName())})

它不起作用,一次打印出一个奇怪的物体.如何迭代所有记录?

解决方法

以下是使用spark-shell和类似场景的会话日志.

特定

scala> persons
res8: org.apache.spark.sql.DataFrame = [name: string,age: int]

scala> persons.first
res7: org.apache.spark.sql.Row = [Justin,19]

你的问题看起来像

scala> persons.map(t => println(t))
res4: org.apache.spark.rdd.RDD[Unit] = MapPartitionsRDD[10]

因此map只返回另一个RDD(该函数不立即应用,当你真正迭代结果时,该函数被“懒惰地”应用).

因此,当您实现(使用collect())时,您将获得“正常”集合:

scala> persons.collect()
res11: Array[org.apache.spark.sql.Row] = Array([Justin,19])

你可以映射到哪个.注意,在这种情况下,你在传递给map(println)的闭包中有副作用,println的结果是Unit:

scala> persons.collect().map(t => println(t))
[Justin,19]
res5: Array[Unit] = Array(())

如果在最后应用收集,结果相同:

scala> persons.map(t => println(t)).collect()
[Justin,19]
res19: Array[Unit] = Array(())

但是,如果您只想打印行,可以将其简化为使用foreach:

scala> persons.foreach(t => println(t))
[Justin,19]

正如@RohanAletty在评论中指出的那样,这适用于本地Spark作业.如果作业在群集中运行,则还需要收集:

persons.collect().foreach(t => println(t))

笔记

>在Iterator类中可以观察到相同的行为.
>上述会话的输出已重新排序

更新

至于过滤:如果你在收集之后应用过滤器,那么收集的位置是“坏”,之前可以应用.

例如,这些表达式给出了相同的结果:

scala> persons.filter("age > 20").collect().foreach(println)
[Michael,29]
[Andy,30]

scala> persons.collect().filter(r => r.getInt(1) >= 20).foreach(println)
[Michael,30]

但第二种情况更糟,因为过滤器可能在收集之前应用.

这同样适用于任何类型的聚合.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读