scala spark中的RDD过滤器
发布时间:2020-12-16 19:19:07 所属栏目:安全 来源:网络整理
导读:我有一个数据集,我想提取那些(审查/文本)在x和y之间有(审查/时间),例如(1183334400 time 1185926400),这是我数据的一部分: product/productId: B000278ADAproduct/title: Jobst Ultrasheer 15-20 Knee-High Silky Beige Largeproduct/price: 46.34review/us
我有一个数据集,我想提取那些(审查/文本)在x和y之间有(审查/时间),例如(1183334400< time< 1185926400),这是我数据的一部分:
product/productId: B000278ADA product/title: Jobst Ultrasheer 15-20 Knee-High Silky Beige Large product/price: 46.34 review/userId: A17KXW1PCUAIIN review/profileName: Mark Anthony "Mark" review/helpfulness: 4/4 review/score: 5.0 review/time: 1174435200 review/summary: Jobst UltraSheer Knee High Stockings review/text: Does a very good job of relieving fatigue. product/productId: B000278ADB product/title: Jobst Ultrasheer 15-20 Knee-High Silky Beige Large product/price: 46.34 review/userId: A9Q3932GX4FX8 review/profileName: Trina Wehle review/helpfulness: 1/1 review/score: 3.0 review/time: 1352505600 review/summary: Delivery was very long wait..... review/text: It took almost 3 weeks to recieve the two pairs of stockings . product/productId: B000278ADB product/title: Jobst Ultrasheer 15-20 Knee-High Silky Beige Large product/price: 46.34 review/userId: AUIZ1GNBTG5OB review/profileName: dgodoy review/helpfulness: 1/1 review/score: 2.0 review/time: 1287014400 review/summary: sizes recomended in the size chart are not real review/text: sizes are much smaller than what is recomended in the chart. I tried to put it and sheer it!. 我的Spark-Scala代码: import org.apache.hadoop.conf.Configuration import org.apache.hadoop.io.{LongWritable,Text} import org.apache.hadoop.mapreduce.lib.input.TextInputFormat import org.apache.spark.{SparkConf,SparkContext} object test1 { def main(args: Array[String]): Unit = { val conf1 = new SparkConf().setAppName("golabi1").setMaster("local") val sc = new SparkContext(conf1) val conf: Configuration = new Configuration conf.set("textinputformat.record.delimiter","product/title:") val input1=sc.newAPIHadoopFile("data/Electronics.txt",classOf[TextInputFormat],classOf[LongWritable],classOf[Text],conf) val lines = input1.map { text => text._2} val filt = lines.filter(text=>(text.toString.contains(tt => tt in (startdate until enddate)))) filt.saveAsTextFile("data/filter1") } } 但是我的代码效果不好, 我怎样才能过滤这些线? 解决方法
比那简单得多.试试这个:
object test1 { def main(args: Array[String]): Unit = { val conf1 = new SparkConf().setAppName("golabi1").setMaster("local") val sc = new SparkContext(conf1) def extractDateAndCompare(line: String): Boolean= { val from = line.indexOf("/time: ") + 7 val to = line.indexOf("review/text: ") -1 val date = line.substring(from,to).toLong date > startDate && date < endDate } sc.textFile("data/Electronics.txt") .filter(extractDateAndCompare) .saveAsTextFile("data/filter1") } } 我经常找到那些中间辅助方法来使事情更加清晰.当然,这假定边界日期在某处定义,输入文件包含格式问题.我故意这样做是为了保持这个简单,但添加一个try,返回一个Option子句并使用flatMap()可以帮助你避免错误,如果你有它们. 此外,您的原始文本有点麻烦,您可能想要探索Json,TSV文件或其他一些替代的,更简单的格式. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
相关内容
- twitter-bootstrap – Bootstrap 3 – 从下面的行向上移动项
- 无法在angular2组件函数中读取undefined的属性’navigate’
- 【数据结构】折半查找(二分查找)
- Couldn't register with the bootstrap server错误
- angular1.x中&绑定的函数如何传参?
- bootstrap4.0 css架构
- active-directory -ssssd:有没有办法强制某些组成员使用特
- bash – 使用xargs将stdin分配给变量
- scala – 需要帮助了解此代码中 – >,{}和()的用法
- Docker安装Redis