scala – 使用复杂类型查询Spark SQL DataFrame
如何使用复杂类型(如maps / arrays)查询RDD?
例如,当我写这个测试代码时: case class Test(name: String,map: Map[String,String]) val map = Map("hello" -> "world","hey" -> "there") val map2 = Map("hello" -> "people","hey" -> "you") val rdd = sc.parallelize(Array(Test("first",map),Test("second",map2))) 我虽然语法会像: sqlContext.sql("SELECT * FROM rdd WHERE map.hello = world") 要么 sqlContext.sql("SELECT * FROM rdd WHERE map[hello] = world") 但我得到了
和
分别。 解决方法
这取决于列的类型。让我们从一些虚拟数据开始:
import org.apache.spark.sql.functions.{udf,lit} import scala.util.Try case class SubRecord(x: Int) case class ArrayElement(foo: String,bar: Int,vals: Array[Double]) case class Record( an_array: Array[Int],a_map: Map[String,String],a_struct: SubRecord,an_array_of_structs: Array[ArrayElement]) val df = sc.parallelize(Seq( Record(Array(1,2,3),Map("foo" -> "bar"),SubRecord(1),Array( ArrayElement("foo",1,Array(1.0,2.0)),ArrayElement("bar",Array(3.0,4.0)))),Record(Array(4,5,6),Map("foz" -> "baz"),SubRecord(2),Array(ArrayElement("foz",3,Array(5.0,6.0)),ArrayElement("baz",4,Array(7.0,8.0)))) )).toDF df.registerTempTable("df") df.printSchema // root // |-- an_array: array (nullable = true) // | |-- element: integer (containsNull = false) // |-- a_map: map (nullable = true) // | |-- key: string // | |-- value: string (valueContainsNull = true) // |-- a_struct: struct (nullable = true) // | |-- x: integer (nullable = false) // |-- an_array_of_structs: array (nullable = true) // | |-- element: struct (containsNull = true) // | | |-- foo: string (nullable = true) // | | |-- bar: integer (nullable = false) // | | |-- vals: array (nullable = true) // | | | |-- element: double (containsNull = false) >数组列: > Column.getItem方法 df.select($"an_array".getItem(1)).show // +-----------+ // |an_array[1]| // +-----------+ // | 2| // | 5| // +-----------+ > Hive方括号语法: sqlContext.sql("SELECT an_array[1] FROM df").show // +---+ // |_c0| // +---+ // | 2| // | 5| // +---+ > UDF val get_ith = udf((xs: Seq[Int],i: Int) => Try(xs(i)).toOption) df.select(get_ith($"an_array",lit(1))).show // +---------------+ // |UDF(an_array,1)| // +---------------+ // | 2| // | 5| // +---------------+ >地图列 >使用Column.getField方法: df.select($"a_map".getField("foo")).show // +----------+ // |a_map[foo]| // +----------+ // | bar| // | null| // +----------+ >使用Hive括号语法: sqlContext.sql("SELECT a_map['foz'] FROM df").show // +----+ // | _c0| // +----+ // |null| // | baz| // +----+ >使用带有点语法的完整路径: df.select($"a_map.foo").show // +----+ // | foo| // +----+ // | bar| // |null| // +----+ >使用UDF val get_field = udf((kvs: Map[String,k: String) => kvs.get(k)) df.select(get_field($"a_map",lit("foo"))).show // +--------------+ // |UDF(a_map,foo)| // +--------------+ // | bar| // | null| // +--------------+ >使用带有点语法的完整路径的struct列: >使用DataFrame API df.select($"a_struct.x").show // +---+ // | x| // +---+ // | 1| // | 2| // +---+ >使用原始SQL sqlContext.sql("SELECT a_struct.x FROM df").show // +---+ // | x| // +---+ // | 1| // | 2| // +---+ 可以使用点语法,名称和标准列方法访问结构体数组中的字段: df.select($"an_array_of_structs.foo").show // +----------+ // | foo| // +----------+ // |[foo,bar]| // |[foz,baz]| // +----------+ sqlContext.sql("SELECT an_array_of_structs[0].foo FROM df").show // +---+ // |_c0| // +---+ // |foo| // |foz| // +---+ df.select($"an_array_of_structs.vals".getItem(1).getItem(1)).show // +------------------------------+ // |an_array_of_structs.vals[1][1]| // +------------------------------+ // | 4.0| // | 8.0| // +------------------------------+ >可以使用UDF访问用户定义的类型(UDT)字段。详见SparkSQL referencing attributes of UDT。 笔记: >根据Spark版本,这些方法中的一些只能使用HiveContext。 UDF应与标准SQLContext和HiveContext独立于版本。 df.select(explode($"an_array_of_structs")).show // +--------------------+ // | col| // +--------------------+ // |[foo,WrappedArr...| // |[bar,WrappedArr...| // |[foz,WrappedArr...| // |[baz,WrappedArr...| // +--------------------+ >点语法可以与通配符(*)组合以选择(可能是多个)字段,而不必明确指定名称: df.select($"a_struct.*").show // +---+ // | x| // +---+ // | 1| // | 2| // +---+ (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |