scala – 在spark中导入TSV文件
发布时间:2020-12-16 09:54:48 所属栏目:安全 来源:网络整理
导读:我是新来的,所以请原谅我提出一个基本问题.我正在尝试将我的tsv文件导入spark,但我不确定它是否正常工作. val lines = sc.textFile("/home/cloudera/Desktop/Test/test.tsvval split_lines = lines.map(_.split("t"))split_lines.first() 一切似乎都很好.有
我是新来的,所以请原谅我提出一个基本问题.我正在尝试将我的tsv文件导入spark,但我不确定它是否正常工作.
val lines = sc.textFile("/home/cloudera/Desktop/Test/test.tsv val split_lines = lines.map(_.split("t")) split_lines.first() 一切似乎都很好.有没有办法我可以看到tsv文件是否已正确加载? 示例数据:(全部使用制表符作为空格) hastag 200904 24 blackcat hastag 200908 1 oaddisco hastag 200904 1 blah hastag 200910 3 mydda 解决方法
到目前为止,您的代码对我来说很好.如果您将第一行打印到控制台,您是否看到了预期的数据?
要探索Spark API,最好的办法是使用Spark-shell,一个Scala REPL,它富含Spark特有的功能,可以为您构建一个默认的Spark Context. 它可以让您更轻松地浏览数据. 这是一个加载~65k行csv文件的示例.我猜你用的是类似的用法: $><spark_dir>/bin/spark-shell Welcome to ____ __ / __/__ ___ _____/ /__ _ / _ / _ `/ __/ '_/ /___/ .__/_,_/_/ /_/_ version 1.0.0-SNAPSHOT /_/ scala> val lines=sc.textFile("/home/user/playground/ts-data.csv") lines: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at <console>:12 scala> val csv=lines.map(_.split(";")) csv: org.apache.spark.rdd.RDD[Array[String]] = MappedRDD[2] at map at <console>:14 scala> csv.count (... spark processing ...) res0: Long = 67538 // let's have a look at the first record scala> csv.first 14/06/01 12:22:17 INFO SparkContext: Starting job: first at <console>:17 14/06/01 12:22:17 INFO DAGScheduler: Got job 1 (first at <console>:17) with 1 output partitions (allowLocal=true) 14/06/01 12:22:17 INFO DAGScheduler: Final stage: Stage 1(first at <console>:17) 14/06/01 12:22:17 INFO DAGScheduler: Parents of final stage: List() 14/06/01 12:22:17 INFO DAGScheduler: Missing parents: List() 14/06/01 12:22:17 INFO DAGScheduler: Computing the requested partition locally 14/06/01 12:22:17 INFO HadoopRDD: Input split: file:/home/user/playground/ts-data.csv:0+1932934 14/06/01 12:22:17 INFO SparkContext: Job finished: first at <console>:17,took 0.003210457 s res1: Array[String] = Array(20140127,0000df,d063b4,***,***-Service,app180000m,49) // groupby id - count unique scala> csv.groupBy(_(4)).count (... Spark processing ...) res2: Long = 37668 // records per day csv.map(record => record(0)->1).reduceByKey(_+_).collect (... more Spark processing ...) res8: Array[(String,Int)] = Array((20140117,1854),(20140120,2028),(20140124,3398),(20140131,6084),(20140122,5076),(20140128,8310),(20140123,8476),(20140127,1932),(20140130,8482),(20140129,8488),(20140118,5100),(20140109,3488),(20140110,4822)) *使用添加到问题的数据进行编辑* val rawData="""hastag 200904 24 blackcat hastag 200908 1 oaddisco hastag 200904 1 blah hastag 200910 3 mydda""" //split lines val data= rawData.split("n") val rdd= sc.parallelize(data) // Split using space as separator val byId=rdd.map(_.split(" ")).groupBy(_(1)) byId.count res11: Long = 3 (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |