scala中两个数据帧的模式比较
发布时间:2020-12-16 18:42:10 所属栏目:安全 来源:网络整理
导读:我正在尝试编写一些测试用例来验证源(.csv)文件和目标(hive表)之间的数据.其中一个验证是表的结构验证. 我已将.csv数据(使用已定义的模式)加载到一个数据框中,并将hive表数据提取到另一个数据框中. 当我现在尝试比较两个数据帧的模式时,它返回false.不知道为
我正在尝试编写一些测试用例来验证源(.csv)文件和目标(hive表)之间的数据.其中一个验证是表的结构验证.
我已将.csv数据(使用已定义的模式)加载到一个数据框中,并将hive表数据提取到另一个数据框中. 源数据帧架构: scala> res39.printSchema root |-- datetime: timestamp (nullable = true) |-- load_datetime: timestamp (nullable = true) |-- source_bank: string (nullable = true) |-- emp_name: string (nullable = true) |-- header_row_count: integer (nullable = true) |-- emp_hours: double (nullable = true) 目标数据帧架构: scala> targetRawData.printSchema root |-- datetime: timestamp (nullable = true) |-- load_datetime: timestamp (nullable = true) |-- source_bank: string (nullable = true) |-- emp_name: string (nullable = true) |-- header_row_count: integer (nullable = true) |-- emp_hours: double (nullable = true) 当我比较时,它返回false: scala> res39.schema == targetRawData.schema res47: Boolean = false 两个数据帧中的数据如下所示: scala> res39.show +-------------------+-------------------+-----------+--------+----------------+---------+ | datetime| load_datetime|source_bank|emp_name|header_row_count|emp_hours| +-------------------+-------------------+-----------+--------+----------------+---------+ |2017-01-01 01:02:03|2017-01-01 01:02:03| RBS| Naveen | 100| 15.23| |2017-03-15 01:02:03|2017-03-15 01:02:03| RBS| Naveen | 100| 115.78| |2015-04-02 23:24:25|2015-04-02 23:24:25| RBS| Arun | 200| 2.09| |2010-05-28 12:13:14|2010-05-28 12:13:14| RBS| Arun | 100| 30.98| |2018-06-04 10:11:12|2018-06-04 10:11:12| XZX| Arun | 400| 12.0| +-------------------+-------------------+-----------+--------+----------------+---------+ scala> targetRawData.show +-------------------+-------------------+-----------+--------+----------------+---------+ | datetime| load_datetime|source_bank|emp_name|header_row_count|emp_hours| +-------------------+-------------------+-----------+--------+----------------+---------+ |2017-01-01 01:02:03|2017-01-01 01:02:03| RBS| Naveen| 100| 15.23| |2017-03-15 01:02:03|2017-03-15 01:02:03| RBS| Naveen| 100| 115.78| |2015-04-02 23:25:25|2015-04-02 23:25:25| RBS| Arun| 200| 2.09| |2010-05-28 12:13:14|2010-05-28 12:13:14| RBS| Arun| 100| 30.98| +-------------------+-------------------+-----------+--------+----------------+---------+ 完整代码如下所示: //import org.apache.spark import org.apache.spark.sql.hive._ import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import org.apache.spark.sql.functions.{to_date,to_timestamp} import org.apache.spark.sql._ import org.apache.spark.sql.types._ import org.apache.spark.sql.SparkSession import java.sql.Timestamp import java.text.SimpleDateFormat import java.text._ import java.util.Date import scala.util._ import org.apache.spark.sql.hive.HiveContext //val conf = new SparkConf().setAppName("Simple Application") //val sc = new SparkContext(conf) val hc = new HiveContext(sc) val spark: SparkSession = SparkSession.builder().appName("Simple Application").config("spark.master","local").getOrCreate() // set source and target location val sourceDataLocation = "hdfs://localhost:9000/source.txt" val targetTableName = "TableA" // Extract source data println("Extracting SAS source data from csv file location " + sourceDataLocation); val sqlContext = new org.apache.spark.sql.SQLContext(sc) val sourceRawCsvData = sc.textFile(sourceDataLocation) println("Extracting target data from hive table " + targetTableName) val targetRawData = hc.sql("Select datetime,load_datetime,trim(source_bank) as source_bank,trim(emp_name) as emp_name,header_row_count,emp_hours from " + targetTableName) // Add the test cases here // Test 2 - Validate the Structure val headerColumns = sourceRawCsvData.first().split(",").to[List] val schema = TableASchema(headerColumns) val data = sourceRawCsvData.mapPartitionsWithIndex((index,element) => if (index == 0) element.drop(1) else element) .map(_.split(",").toList) .map(row) val dataFrame = spark.createDataFrame(data,schema) val sourceDataFrame = dataFrame.toDF(dataFrame.columns map(_.toLowerCase): _*) data.collect data.getClass // Test 3 - Validate the data // Test 4 - Calculate the average and variance of Int or Dec columns // Test 5 - Test 5 def UpdateResult(tableName: String,returnCode: Int,description: String){ val insertString = "INSERT INTO TestResult VALUES('" + tableName + "'," + returnCode + ",'" + description + "')" val a = hc.sql(insertString) } def TableASchema(columnName: List[String]): StructType = { StructType( Seq( StructField(name = "datetime",dataType = TimestampType,nullable = true),StructField(name = "load_datetime",StructField(name = "source_bank",dataType = StringType,StructField(name = "emp_name",StructField(name = "header_row_count",dataType = IntegerType,StructField(name = "emp_hours",dataType = DoubleType,nullable = true) ) ) } def row(line: List[String]): Row = { Row(convertToTimestamp(line(0).trim),convertToTimestamp(line(1).trim),line(2).trim,line(3).trim,line(4).toInt,line(5).toDouble) } def convertToTimestamp(s: String) : Timestamp = s match { case "" => null case _ => { val format = new SimpleDateFormat("ddMMMyyyy:HH:mm:ss") Try(new Timestamp(format.parse(s).getTime)) match { case Success(t) => t case Failure(_) => null } } } } 解决方法
基于@Derek Kaknes的回答,这是我提出的用于比较模式的解决方案,仅关注列名,数据类型和数据类型.可空性和对元数据漠不关心
// Extract relevant information: name (key),type & nullability (values) of columns def getCleanedSchema(df: DataFrame): Map[String,(DataType,Boolean)] = { df.schema.map { (structField: StructField) => structField.name.toLowerCase -> (structField.dataType,structField.nullable) }.toMap } // Compare relevant information def getSchemaDifference(schema1: Map[String,Boolean)],schema2: Map[String,Boolean)] ): Map[String,(Option[(DataType,Option[(DataType,Boolean)])] = { (schema1.keys ++ schema2.keys). map(_.toLowerCase). toList.distinct. flatMap { (columnName: String) => val schema1FieldOpt: Option[(DataType,Boolean)] = schema1.get(columnName) val schema2FieldOpt: Option[(DataType,Boolean)] = schema2.get(columnName) if (schema1FieldOpt == schema2FieldOpt) None else Some(columnName -> (schema1FieldOpt,schema2FieldOpt)) }.toMap } > getCleanedSchema方法提取感兴趣的信息 – 列数据类型&可空性并将列名称的映射返回给元组> getSchemaDifference方法返回一个映射,该映射仅包含两个模式中不同的列.如果两个模式之一中没有列,那么它的相应属性将为None (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |