加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala spark中的RDD过滤器

发布时间:2020-12-16 19:19:07 所属栏目:安全 来源:网络整理
导读:我有一个数据集,我想提取那些(审查/文本)在x和y之间有(审查/时间),例如(1183334400 time 1185926400),这是我数据的一部分: product/productId: B000278ADAproduct/title: Jobst Ultrasheer 15-20 Knee-High Silky Beige Largeproduct/price: 46.34review/us
我有一个数据集,我想提取那些(审查/文本)在x和y之间有(审查/时间),例如(1183334400< time< 1185926400),这是我数据的一部分:

product/productId: B000278ADA
product/title: Jobst Ultrasheer 15-20 Knee-High Silky Beige Large
product/price: 46.34
review/userId: A17KXW1PCUAIIN
review/profileName: Mark Anthony "Mark"
review/helpfulness: 4/4
review/score: 5.0
review/time: 1174435200
review/summary: Jobst UltraSheer Knee High Stockings
review/text: Does a very good job of relieving fatigue.

product/productId: B000278ADB
product/title: Jobst Ultrasheer 15-20 Knee-High Silky Beige Large
product/price: 46.34
review/userId: A9Q3932GX4FX8
review/profileName: Trina Wehle
review/helpfulness: 1/1
review/score: 3.0
review/time: 1352505600
review/summary: Delivery was very long wait.....
review/text: It took almost 3 weeks to recieve the two pairs of stockings .

product/productId: B000278ADB
product/title: Jobst Ultrasheer 15-20 Knee-High Silky Beige Large
product/price: 46.34
review/userId: AUIZ1GNBTG5OB
review/profileName: dgodoy
review/helpfulness: 1/1
review/score: 2.0
review/time: 1287014400
review/summary: sizes recomended in the size chart are not real
review/text: sizes are much smaller than what is recomended in the chart. I tried to put it and sheer it!.

我的Spark-Scala代码:

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.{LongWritable,Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark.{SparkConf,SparkContext}

object test1 {
  def main(args: Array[String]): Unit = {
    val conf1 = new SparkConf().setAppName("golabi1").setMaster("local")
    val sc = new SparkContext(conf1)
    val conf: Configuration = new Configuration
    conf.set("textinputformat.record.delimiter","product/title:")
    val input1=sc.newAPIHadoopFile("data/Electronics.txt",classOf[TextInputFormat],classOf[LongWritable],classOf[Text],conf)
    val lines = input1.map { text => text._2}
    val filt = lines.filter(text=>(text.toString.contains(tt => tt in (startdate until enddate))))
    filt.saveAsTextFile("data/filter1")
  }
}

但是我的代码效果不好,

我怎样才能过滤这些线?

解决方法

比那简单得多.试试这个:

object test1 
{
  def main(args: Array[String]): Unit = 
  {
    val conf1 = new SparkConf().setAppName("golabi1").setMaster("local")
    val sc = new SparkContext(conf1)

    def extractDateAndCompare(line: String): Boolean=
    {
        val from = line.indexOf("/time: ") + 7
        val to = line.indexOf("review/text: ") -1
        val date = line.substring(from,to).toLong
        date > startDate && date < endDate
    }

    sc.textFile("data/Electronics.txt")
        .filter(extractDateAndCompare)
        .saveAsTextFile("data/filter1")
  }
}

我经常找到那些中间辅助方法来使事情更加清晰.当然,这假定边界日期在某处定义,输入文件包含格式问题.我故意这样做是为了保持这个简单,但添加一个try,返回一个Option子句并使用flatMap()可以帮助你避免错误,如果你有它们.

此外,您的原始文本有点麻烦,您可能想要探索Json,TSV文件或其他一些替代的,更简单的格式.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读