加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

Scala spark通过键减少并找到共同的价值

发布时间:2020-12-16 18:18:38 所属栏目:安全 来源:网络整理
导读:我有一个csv数据文件存储在HDFS上的sequenceFile中,格式为name,zip,country,fav_food1,fav_food2,fav_food3,fav_colour.可能有许多具有相同名称的条目,我需要找出他们最喜欢的食物是什么(即计算所有具有该名称的记录中的所有食物条目并返回最受欢迎的食物.我
我有一个csv数据文件存储在HDFS上的sequenceFile中,格式为name,zip,country,fav_food1,fav_food2,fav_food3,fav_colour.可能有许多具有相同名称的条目,我需要找出他们最喜欢的食物是什么(即计算所有具有该名称的记录中的所有食物条目并返回最受欢迎的食物.我是 Scala和Spark的新手并拥有彻底的多个教程和搜索论坛,但我仍然坚持如何继续.到目前为止,我已经得到了文本到字符串格式的序列文件,然后过滤了条目

以下是文件中一行的示例数据条目

Bob,123,USA,Pizza,Soda,Blue
Bob,456,UK,Chocolate,Cheese,Green
Bob,12,Yellow
Mary,68,Chips,Pasta,Blue

所以输出应该是元组(Bob,Soda),因为苏打在Bob的条目中出现次数最多.

import org.apache.hadoop.io._

var lines  = sc.sequenceFile("path",classOf[LongWritable],classOf[Text]).values.map(x => x.toString())
// converted to string since I could not get filter to run on Text and removing the longwritable

var filtered = lines.filter(_.split(",")(0) == "Bob");
// removed entries with all other users

var f_tuples = filtered.map(line => lines.split(",");
// split all the values

var f_simple = filtered.map(line => (line(0),(line(3),line(4),line(5))
// removed unnecessary fields

我现在的问题是,我认为我有这个[< name,[f,f,f]>]结构,并且不知道如何进行压扁并获得最受欢迎的食物.我需要组合所有条目,所以我有一个带a的条目,然后获取值中最常见的元素.任何帮助,将不胜感激.谢谢

我试过这个让它变得扁平化,但似乎我尝试的越多,数据结构就越复杂.

var f_trial = fpairs.groupBy(_._1).mapValues(_.map(_._2))
// the resulting structure was of type org.apache.spark.rdd.RDD[(String,Interable[(String,String,String)]

这是一个记录的println在f_trial之后的样子

("Bob",List((Pizza,),(Chocolate,Soda)))

括号细分

("Bob",List(

(Pizza,<missing value>),Soda)

) // ends List paren

) // ends first paren

解决方法

我找时间了.建立:

import org.apache.spark.SparkContext
    import org.apache.spark.SparkContext._
    import org.apache.spark.SparkConf

    val conf = new SparkConf().setAppName("spark-scratch").setMaster("local")
    val sc = new SparkContext(conf)

    val data = """   
  Bob,Blue
  Bob,Green
  Bob,Yellow
  Mary,Blue
  """.trim

    val records = sc.parallelize(data.split('n'))

提取食物选择,并为每个人做一个元组((名称,食物),1)

val r2 = records.flatMap { r =>
      val Array(name,id,food1,food2,food3,color) = r.split(',');
      List(((name,food1),1),((name,food2),food3),1))
    }

每个名称/食物组合总计:

val r3 = r2.reduceByKey((x,y) => x + y)

重新映射,以便名称(仅)是关键

val r4 = r3.map { case ((name,food),total) => (name,(food,total)) }

选择每一步计数最多的食物

val res = r4.reduceByKey((x,y) => if (y._2 > x._2) y else x)

我们已经完成了

println(res.collect().mkString)
    //(Mary,(Chips,1))(Bob,(Soda,3))

编辑:要收集所有人数最多的食品,我们只需更改最后两行:

从包含总计的项目列表开始:

val r5 = r3.map { case ((name,(List(food),total)) }

在相同的情况下,将食品项目列表与该分数连接起来

val res2 = r5.reduceByKey((x,y) => if (y._2 > x._2) y 
                                    else if (y._2 < x._2) x
                                    else (y._1:::x._1,y._2))

//(Mary,(List(Chocolate,Chips),1))
//(Bob,(List(Soda),3))

如果你想要top-3,比如说,然后使用aggregateByKey来组合每个人最喜欢的食物列表而不是第二个reduceByKey

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读