加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 如何在spark中调用diff文件名的单独逻辑

发布时间:2020-12-16 18:31:53 所属栏目:安全 来源:网络整理
导读:我的文件夹中有3个日志文件. 喜欢 foldera = emplog,deptlog,companylogfolderb = emplog,companylogfolderc = emplog,companylog 我有3个diff scala程序文件来从每个文件中提取数据. employee.scaladepartment.scalacompanylog.scala 他们每个人的代码如下.
我的文件夹中有3个日志文件.
喜欢

foldera = emplog,deptlog,companylog
folderb = emplog,companylog
folderc = emplog,companylog

我有3个diff scala程序文件来从每个文件中提取数据.

employee.scala
department.scala
companylog.scala

他们每个人的代码如下.

我想结合所有这些文件并以并行方式执行它们.

package com.sample
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
import org.apache.spark.sql.SQLContext
import org.apache.log4j.{Level,Logger}

object logparser {
  def main(args: Array[String]) = {

      Logger.getLogger("org").setLevel(Level.OFF)     
      Logger.getLogger("akka").setLevel(Level.OFF)
    //Start the Spark context
    val conf = new SparkConf()
      .setAppName("Parser")
      .setMaster("local")

      val sc = new SparkContext(conf)
      val sqlContext= new SQLContext(sc)

      val test = sc.wholeTextFiles("C:mkdir**")
      .map{l =>
             if(l._1.endsWith("emplog.txt")){ 
             empparser(l._2,sc,sqlContext)
               }

             l
        }
      .foreach{println}
  }

  def empparser(record:String,sc:SparkContext,sqlContext:SQLContext) = {
     val emppattern="""[(](d+)[)]s([ws._]{30})s+""".r

      import sqlContext.implicits._
     val indrecs = emppattern.findAllIn(record)
    .map{ line => 
      val emppattern(eid,ename) = line

     (eid,ename)
    }
     .toSeq
   .toDF("eid","ename")

   .show() 


  }
}

我已尝试将代码附加到同一对象中的每个方法.

现在出现2个问题
Q1.当我编译我得到

Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
Serialization stack:
    - object not serializable (class: org.apache.spark.SparkContext,value: org.apache.spark.SparkContext@6b0615ae)
    - field (class: com.sample.logparser$$anonfun$1,name: sc$1,type: class org.apache.spark.SparkContext)
    - object (class com.sample.logparser$$anonfun$1,<function1>)

据我所知(仅限新手)Spark上下文无法序列化.如果我不传递sc作为参数,我得到Nullpointer异常.我该如何解决这个问题?

Q2:转换为DF后,我将在empparser方法中插入hive表代码.完成后,我不想在我的主要内容中做任何事情.但是我的地图代码不会执行,除非我之后有动作.这就是为什么我之后有foreacch println.有办法克服这个问题吗?

解决方法

为了尝试回答这个问题,我将假设处理员工或部门的结果会产生相同类型的记录.我希望每种数据都有所不同,所以我要分别处理不同种类的记录,以便进行“现实调整”.

首先,我们为不同类型或记录类型定义记录案例类和解析器. (为了简单起见,我在这里复制相同的impl)

case class Record(id:String,name: String)

val empParser: String =>  Option[Record] = { record => 
  val pattern="""^[(](d+)[)]s([ws._]{30})s+$""".r
  record match {
    case pattern(eid,ename) => Some(Record(eid,ename))
    case _ => None
  }
}

val deptParser: String =>  Option[Record] = { record => 
  val pattern="""^[(](d+)[)]s([ws._]{30})s+$""".r
  record match {
    case pattern(eid,ename))
    case _ => None
  }
}

val companyParser: String =>  Option[Record] = { record => 
  val pattern="""^[(](d+)[)]s([ws._]{30})s+$""".r
  record match {
    case pattern(eid,ename))
    case _ => None
  }
}

我们使用wholeFiles加载数据:

val dataPath = "/.../data/wholefiles/*/*"
val logFiles =  sc.wholeTextFiles(dataPath)

然后,我们通过过滤文件来处理不同类型的记录,以获得我们需要的文件类型,并应用我们在上面定义的解析器.请注意我们实际上是如何重复相同的过程.这可以抽象出来.

val empLogs = logFiles.filter{case (filename,content) => filename.endsWith("emplog.txt")}.flatMap{case (_,content) => content.split("n").flatMap(line=> empParser(line))}
val deptLogs = logFiles.filter{case (filename,content) => filename.endsWith("deptlog.txt")}.flatMap{case (_,content) => content.split("n").flatMap(line=> deptParser(line))}
val compLogs = logFiles.filter{case (filename,content) => filename.endsWith("companylog.txt")}.flatMap{case (_,content) => content.split("n").flatMap(line=> companyParser(line))}

我们现在转换为DataFrame

val empDF  = empLogs.toDF

我们也可以为其他记录类型做同样的事情.

根据我们是否可以在不同数据类型的过程中找到共性,在此过程中有足够的空间来减少代码重复.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读