加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala中两个数据帧的模式比较

发布时间:2020-12-16 18:42:10 所属栏目:安全 来源:网络整理
导读:我正在尝试编写一些测试用例来验证源(.csv)文件和目标(hive表)之间的数据.其中一个验证是表的结构验证. 我已将.csv数据(使用已定义的模式)加载到一个数据框中,并将hive表数据提取到另一个数据框中. 当我现在尝试比较两个数据帧的模式时,它返回false.不知道为
我正在尝试编写一些测试用例来验证源(.csv)文件和目标(hive表)之间的数据.其中一个验证是表的结构验证.

我已将.csv数据(使用已定义的模式)加载到一个数据框中,并将hive表数据提取到另一个数据框中.
当我现在尝试比较两个数据帧的模式时,它返回false.不知道为什么.对此有什么想法吗?

源数据帧架构:

scala> res39.printSchema
root
 |-- datetime: timestamp (nullable = true)
 |-- load_datetime: timestamp (nullable = true)
 |-- source_bank: string (nullable = true)
 |-- emp_name: string (nullable = true)
 |-- header_row_count: integer (nullable = true)
 |-- emp_hours: double (nullable = true)

目标数据帧架构:

scala> targetRawData.printSchema
root
 |-- datetime: timestamp (nullable = true)
 |-- load_datetime: timestamp (nullable = true)
 |-- source_bank: string (nullable = true)
 |-- emp_name: string (nullable = true)
 |-- header_row_count: integer (nullable = true)
 |-- emp_hours: double (nullable = true)

当我比较时,它返回false:

scala> res39.schema == targetRawData.schema
res47: Boolean = false

两个数据帧中的数据如下所示:

scala> res39.show
+-------------------+-------------------+-----------+--------+----------------+---------+
|           datetime|      load_datetime|source_bank|emp_name|header_row_count|emp_hours|
+-------------------+-------------------+-----------+--------+----------------+---------+
|2017-01-01 01:02:03|2017-01-01 01:02:03|        RBS| Naveen |             100|    15.23|
|2017-03-15 01:02:03|2017-03-15 01:02:03|        RBS| Naveen |             100|   115.78|
|2015-04-02 23:24:25|2015-04-02 23:24:25|        RBS|   Arun |             200|     2.09|
|2010-05-28 12:13:14|2010-05-28 12:13:14|        RBS|   Arun |             100|    30.98|
|2018-06-04 10:11:12|2018-06-04 10:11:12|        XZX|   Arun |             400|     12.0|
+-------------------+-------------------+-----------+--------+----------------+---------+


scala> targetRawData.show
+-------------------+-------------------+-----------+--------+----------------+---------+
|           datetime|      load_datetime|source_bank|emp_name|header_row_count|emp_hours|
+-------------------+-------------------+-----------+--------+----------------+---------+
|2017-01-01 01:02:03|2017-01-01 01:02:03|        RBS|  Naveen|             100|    15.23|
|2017-03-15 01:02:03|2017-03-15 01:02:03|        RBS|  Naveen|             100|   115.78|
|2015-04-02 23:25:25|2015-04-02 23:25:25|        RBS|    Arun|             200|     2.09|
|2010-05-28 12:13:14|2010-05-28 12:13:14|        RBS|    Arun|             100|    30.98|
+-------------------+-------------------+-----------+--------+----------------+---------+

完整代码如下所示:

//import org.apache.spark
import org.apache.spark.sql.hive._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions.{to_date,to_timestamp}
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.SparkSession
import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.text._
import java.util.Date
import scala.util._
import org.apache.spark.sql.hive.HiveContext

  //val conf = new SparkConf().setAppName("Simple Application")
  //val sc = new SparkContext(conf)
  val hc = new HiveContext(sc)
  val spark: SparkSession = SparkSession.builder().appName("Simple Application").config("spark.master","local").getOrCreate()

   // set source and target location
    val sourceDataLocation = "hdfs://localhost:9000/source.txt"
    val targetTableName = "TableA"

    // Extract source data
    println("Extracting SAS source data from csv file location " + sourceDataLocation);
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    val sourceRawCsvData = sc.textFile(sourceDataLocation)

    println("Extracting target data from hive table " + targetTableName)
    val targetRawData = hc.sql("Select datetime,load_datetime,trim(source_bank) as source_bank,trim(emp_name) as emp_name,header_row_count,emp_hours from " + targetTableName)


    // Add the test cases here
    // Test 2 - Validate the Structure
       val headerColumns = sourceRawCsvData.first().split(",").to[List]
       val schema = TableASchema(headerColumns)

       val data = sourceRawCsvData.mapPartitionsWithIndex((index,element) => if (index == 0) element.drop(1) else element)
       .map(_.split(",").toList)
       .map(row)

       val dataFrame = spark.createDataFrame(data,schema)
       val sourceDataFrame = dataFrame.toDF(dataFrame.columns map(_.toLowerCase): _*)
       data.collect
       data.getClass
    // Test 3 - Validate the data
    // Test 4 - Calculate the average and variance of Int or Dec columns
    // Test 5 - Test 5

  def UpdateResult(tableName: String,returnCode: Int,description: String){
    val insertString = "INSERT INTO TestResult VALUES('" + tableName + "'," + returnCode + ",'" + description + "')"
    val a = hc.sql(insertString)

    }


  def TableASchema(columnName: List[String]): StructType = {
    StructType(
      Seq(
        StructField(name = "datetime",dataType = TimestampType,nullable = true),StructField(name = "load_datetime",StructField(name = "source_bank",dataType = StringType,StructField(name = "emp_name",StructField(name = "header_row_count",dataType = IntegerType,StructField(name = "emp_hours",dataType = DoubleType,nullable = true)
        )
    )
  }

  def row(line: List[String]): Row = {
       Row(convertToTimestamp(line(0).trim),convertToTimestamp(line(1).trim),line(2).trim,line(3).trim,line(4).toInt,line(5).toDouble)
    }


  def convertToTimestamp(s: String) : Timestamp = s match {
     case "" => null
     case _ => {
        val format = new SimpleDateFormat("ddMMMyyyy:HH:mm:ss")
        Try(new Timestamp(format.parse(s).getTime)) match {
        case Success(t) => t
        case Failure(_) => null
      }
    }
  }

  }

解决方法

基于@Derek Kaknes的回答,这是我提出的用于比较模式的解决方案,仅关注列名,数据类型和数据类型.可空性和对元数据漠不关心

// Extract relevant information: name (key),type & nullability (values) of columns
def getCleanedSchema(df: DataFrame): Map[String,(DataType,Boolean)] = {
    df.schema.map { (structField: StructField) =>
      structField.name.toLowerCase -> (structField.dataType,structField.nullable)
    }.toMap
  }

// Compare relevant information
def getSchemaDifference(schema1: Map[String,Boolean)],schema2: Map[String,Boolean)]
                       ): Map[String,(Option[(DataType,Option[(DataType,Boolean)])] = {
  (schema1.keys ++ schema2.keys).
    map(_.toLowerCase).
    toList.distinct.
    flatMap { (columnName: String) =>
      val schema1FieldOpt: Option[(DataType,Boolean)] = schema1.get(columnName)
      val schema2FieldOpt: Option[(DataType,Boolean)] = schema2.get(columnName)

      if (schema1FieldOpt == schema2FieldOpt) None
      else Some(columnName -> (schema1FieldOpt,schema2FieldOpt))
    }.toMap
}

> getCleanedSchema方法提取感兴趣的信息 – 列数据类型&可空性并将列名称的映射返回给元组> getSchemaDifference方法返回一个映射,该映射仅包含两个模式中不同的列.如果两个模式之一中没有列,那么它的相应属性将为None

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读