scala – 将嵌套列添加到Spark DataFrame
发布时间:2020-12-16 08:51:48 所属栏目:安全 来源:网络整理
导读:如何在任何嵌套级别上向结构添加或替换字段? 这个输入: val rdd = sc.parallelize(Seq( """{"a": {"xX": 1,"XX": 2},"b": {"z": 0}}""","""{"a": {"xX": 3},"""{"a": {"XX": 3},"""{"a": {"xx": 4},"b": {"z": 0}}"""))var df = sqlContext.read.json(rdd)
如何在任何嵌套级别上向结构添加或替换字段?
这个输入: val rdd = sc.parallelize(Seq( """{"a": {"xX": 1,"XX": 2},"b": {"z": 0}}""","""{"a": {"xX": 3},"""{"a": {"XX": 3},"""{"a": {"xx": 4},"b": {"z": 0}}""")) var df = sqlContext.read.json(rdd) 产生以下模式: root |-- a: struct (nullable = true) | |-- XX: long (nullable = true) | |-- xX: long (nullable = true) | |-- xx: long (nullable = true) |-- b: struct (nullable = true) | |-- z: long (nullable = true) 然后我可以这样做: import org.apache.spark.sql.functions._ val overlappingNames = Seq(col("a.xx"),col("a.xX"),col("a.XX")) df = df .withColumn("a_xx",coalesce(overlappingNames:_*)) .dropNestedColumn("a.xX") .dropNestedColumn("a.XX") .dropNestedColumn("a.xx") (dropNestedColumn来自这个答案: 架构变为: root |-- a: struct (nullable = false) |-- b: struct (nullable = true) | |-- z: long (nullable = true) |-- a_xx: long (nullable = true) 显然它不会替换(或添加)a.xx,而是在根级别添加新字段a_xx. 我希望能够做到这一点: val overlappingNames = Seq(col("a.xx"),col("a.XX")) df = df .withNestedColumn("a.xx",coalesce(overlappingNames:_*)) .dropNestedColumn("a.xX") .dropNestedColumn("a.XX") 这样就会产生这种模式: root |-- a: struct (nullable = false) | |-- xx: long (nullable = true) |-- b: struct (nullable = true) | |-- z: long (nullable = true) 我怎样才能做到这一点? 这里的实际目标是对输入JSON中的列名称不区分大小写.最后一步很简单:收集所有重叠的列名称并在每个名称上应用合并. 解决方法
它可能不像它可能那样优雅或高效,但这是我想出的:
object DataFrameUtils { private def nullableCol(parentCol: Column,c: Column): Column = { when(parentCol.isNotNull,c) } private def nullableCol(c: Column): Column = { nullableCol(c,c) } private def createNestedStructs(splitted: Seq[String],newCol: Column): Column = { splitted .foldRight(newCol) { case (colName,nestedStruct) => nullableCol(struct(nestedStruct as colName)) } } private def recursiveAddNestedColumn(splitted: Seq[String],col: Column,colType: DataType,nullable: Boolean,newCol: Column): Column = { colType match { case colType: StructType if splitted.nonEmpty => { var modifiedFields: Seq[(String,Column)] = colType.fields .map(f => { var curCol = col.getField(f.name) if (f.name == splitted.head) { curCol = recursiveAddNestedColumn(splitted.tail,curCol,f.dataType,f.nullable,newCol) } (f.name,curCol as f.name) }) if (!modifiedFields.exists(_._1 == splitted.head)) { modifiedFields :+= (splitted.head,nullableCol(col,createNestedStructs(splitted.tail,newCol)) as splitted.head) } var modifiedStruct: Column = struct(modifiedFields.map(_._2): _*) if (nullable) { modifiedStruct = nullableCol(col,modifiedStruct) } modifiedStruct } case _ => createNestedStructs(splitted,newCol) } } private def addNestedColumn(df: DataFrame,newColName: String,newCol: Column): DataFrame = { if (newColName.contains('.')) { var splitted = newColName.split('.') val modifiedOrAdded: (String,Column) = df.schema.fields .find(_.name == splitted.head) .map(f => (f.name,recursiveAddNestedColumn(splitted.tail,col(f.name),newCol))) .getOrElse { (splitted.head,newCol) as splitted.head) } df.withColumn(modifiedOrAdded._1,modifiedOrAdded._2) } else { // Top level addition,use spark method as-is df.withColumn(newColName,newCol) } } implicit class ExtendedDataFrame(df: DataFrame) extends Serializable { /** * Add nested field to DataFrame * * @param newColName Dot-separated nested field name * @param newCol New column value */ def withNestedColumn(newColName: String,newCol: Column): DataFrame = { DataFrameUtils.addNestedColumn(df,newColName,newCol) } } } 随意改进它. val data = spark.sparkContext.parallelize(List("""{ "a1": 1,"a3": { "b1": 3,"b2": { "c1": 5,"c2": 6 } } }""")) val df: DataFrame = spark.read.json(data) val df2 = df.withNestedColumn("a3.b2.c3.d1",$"a3.b2") 应该产生: assertResult("struct<a1:bigint,a3:struct<b1:bigint,b2:struct<c1:bigint,c2:bigint,c3:struct<d1:struct<c1:bigint,c2:bigint>>>>>")(df2.shema.simpleString) (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |