scala – 如何在spark中调用diff文件名的单独逻辑
我的文件夹中有3个日志文件.
喜欢 foldera = emplog,deptlog,companylog folderb = emplog,companylog folderc = emplog,companylog 我有3个diff scala程序文件来从每个文件中提取数据. employee.scala department.scala companylog.scala 他们每个人的代码如下. 我想结合所有这些文件并以并行方式执行它们. package com.sample import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD.rddToPairRDDFunctions import org.apache.spark.sql.SQLContext import org.apache.log4j.{Level,Logger} object logparser { def main(args: Array[String]) = { Logger.getLogger("org").setLevel(Level.OFF) Logger.getLogger("akka").setLevel(Level.OFF) //Start the Spark context val conf = new SparkConf() .setAppName("Parser") .setMaster("local") val sc = new SparkContext(conf) val sqlContext= new SQLContext(sc) val test = sc.wholeTextFiles("C:mkdir**") .map{l => if(l._1.endsWith("emplog.txt")){ empparser(l._2,sc,sqlContext) } l } .foreach{println} } def empparser(record:String,sc:SparkContext,sqlContext:SQLContext) = { val emppattern="""[(](d+)[)]s([ws._]{30})s+""".r import sqlContext.implicits._ val indrecs = emppattern.findAllIn(record) .map{ line => val emppattern(eid,ename) = line (eid,ename) } .toSeq .toDF("eid","ename") .show() } } 我已尝试将代码附加到同一对象中的每个方法. 现在出现2个问题 Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext Serialization stack: - object not serializable (class: org.apache.spark.SparkContext,value: org.apache.spark.SparkContext@6b0615ae) - field (class: com.sample.logparser$$anonfun$1,name: sc$1,type: class org.apache.spark.SparkContext) - object (class com.sample.logparser$$anonfun$1,<function1>) 据我所知(仅限新手)Spark上下文无法序列化.如果我不传递sc作为参数,我得到Nullpointer异常.我该如何解决这个问题? Q2:转换为DF后,我将在empparser方法中插入hive表代码.完成后,我不想在我的主要内容中做任何事情.但是我的地图代码不会执行,除非我之后有动作.这就是为什么我之后有foreacch println.有办法克服这个问题吗? 解决方法
为了尝试回答这个问题,我将假设处理员工或部门的结果会产生相同类型的记录.我希望每种数据都有所不同,所以我要分别处理不同种类的记录,以便进行“现实调整”.
首先,我们为不同类型或记录类型定义记录案例类和解析器. (为了简单起见,我在这里复制相同的impl) case class Record(id:String,name: String) val empParser: String => Option[Record] = { record => val pattern="""^[(](d+)[)]s([ws._]{30})s+$""".r record match { case pattern(eid,ename) => Some(Record(eid,ename)) case _ => None } } val deptParser: String => Option[Record] = { record => val pattern="""^[(](d+)[)]s([ws._]{30})s+$""".r record match { case pattern(eid,ename)) case _ => None } } val companyParser: String => Option[Record] = { record => val pattern="""^[(](d+)[)]s([ws._]{30})s+$""".r record match { case pattern(eid,ename)) case _ => None } } 我们使用wholeFiles加载数据: val dataPath = "/.../data/wholefiles/*/*" val logFiles = sc.wholeTextFiles(dataPath) 然后,我们通过过滤文件来处理不同类型的记录,以获得我们需要的文件类型,并应用我们在上面定义的解析器.请注意我们实际上是如何重复相同的过程.这可以抽象出来. val empLogs = logFiles.filter{case (filename,content) => filename.endsWith("emplog.txt")}.flatMap{case (_,content) => content.split("n").flatMap(line=> empParser(line))} val deptLogs = logFiles.filter{case (filename,content) => filename.endsWith("deptlog.txt")}.flatMap{case (_,content) => content.split("n").flatMap(line=> deptParser(line))} val compLogs = logFiles.filter{case (filename,content) => filename.endsWith("companylog.txt")}.flatMap{case (_,content) => content.split("n").flatMap(line=> companyParser(line))} 我们现在转换为DataFrame val empDF = empLogs.toDF 我们也可以为其他记录类型做同样的事情. 根据我们是否可以在不同数据类型的过程中找到共性,在此过程中有足够的空间来减少代码重复. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |