scala – 使用复杂类型查询Spark SQL DataFrame
|
如何使用复杂类型(如maps / arrays)查询RDD?
例如,当我写这个测试代码时: case class Test(name: String,map: Map[String,String])
val map = Map("hello" -> "world","hey" -> "there")
val map2 = Map("hello" -> "people","hey" -> "you")
val rdd = sc.parallelize(Array(Test("first",map),Test("second",map2)))
我虽然语法会像: sqlContext.sql("SELECT * FROM rdd WHERE map.hello = world")
要么 sqlContext.sql("SELECT * FROM rdd WHERE map[hello] = world")
但我得到了
和
分别。 解决方法
这取决于列的类型。让我们从一些虚拟数据开始:
import org.apache.spark.sql.functions.{udf,lit}
import scala.util.Try
case class SubRecord(x: Int)
case class ArrayElement(foo: String,bar: Int,vals: Array[Double])
case class Record(
an_array: Array[Int],a_map: Map[String,String],a_struct: SubRecord,an_array_of_structs: Array[ArrayElement])
val df = sc.parallelize(Seq(
Record(Array(1,2,3),Map("foo" -> "bar"),SubRecord(1),Array(
ArrayElement("foo",1,Array(1.0,2.0)),ArrayElement("bar",Array(3.0,4.0)))),Record(Array(4,5,6),Map("foz" -> "baz"),SubRecord(2),Array(ArrayElement("foz",3,Array(5.0,6.0)),ArrayElement("baz",4,Array(7.0,8.0))))
)).toDF
df.registerTempTable("df")
df.printSchema
// root
// |-- an_array: array (nullable = true)
// | |-- element: integer (containsNull = false)
// |-- a_map: map (nullable = true)
// | |-- key: string
// | |-- value: string (valueContainsNull = true)
// |-- a_struct: struct (nullable = true)
// | |-- x: integer (nullable = false)
// |-- an_array_of_structs: array (nullable = true)
// | |-- element: struct (containsNull = true)
// | | |-- foo: string (nullable = true)
// | | |-- bar: integer (nullable = false)
// | | |-- vals: array (nullable = true)
// | | | |-- element: double (containsNull = false)
>数组列: > Column.getItem方法 df.select($"an_array".getItem(1)).show // +-----------+ // |an_array[1]| // +-----------+ // | 2| // | 5| // +-----------+ > Hive方括号语法: sqlContext.sql("SELECT an_array[1] FROM df").show
// +---+
// |_c0|
// +---+
// | 2|
// | 5|
// +---+
> UDF val get_ith = udf((xs: Seq[Int],i: Int) => Try(xs(i)).toOption) df.select(get_ith($"an_array",lit(1))).show // +---------------+ // |UDF(an_array,1)| // +---------------+ // | 2| // | 5| // +---------------+ >地图列 >使用Column.getField方法: df.select($"a_map".getField("foo")).show
// +----------+
// |a_map[foo]|
// +----------+
// | bar|
// | null|
// +----------+
>使用Hive括号语法: sqlContext.sql("SELECT a_map['foz'] FROM df").show
// +----+
// | _c0|
// +----+
// |null|
// | baz|
// +----+
>使用带有点语法的完整路径: df.select($"a_map.foo").show // +----+ // | foo| // +----+ // | bar| // |null| // +----+ >使用UDF val get_field = udf((kvs: Map[String,k: String) => kvs.get(k))
df.select(get_field($"a_map",lit("foo"))).show
// +--------------+
// |UDF(a_map,foo)|
// +--------------+
// | bar|
// | null|
// +--------------+
>使用带有点语法的完整路径的struct列: >使用DataFrame API df.select($"a_struct.x").show // +---+ // | x| // +---+ // | 1| // | 2| // +---+ >使用原始SQL sqlContext.sql("SELECT a_struct.x FROM df").show
// +---+
// | x|
// +---+
// | 1|
// | 2|
// +---+
可以使用点语法,名称和标准列方法访问结构体数组中的字段: df.select($"an_array_of_structs.foo").show
// +----------+
// | foo|
// +----------+
// |[foo,bar]|
// |[foz,baz]|
// +----------+
sqlContext.sql("SELECT an_array_of_structs[0].foo FROM df").show
// +---+
// |_c0|
// +---+
// |foo|
// |foz|
// +---+
df.select($"an_array_of_structs.vals".getItem(1).getItem(1)).show
// +------------------------------+
// |an_array_of_structs.vals[1][1]|
// +------------------------------+
// | 4.0|
// | 8.0|
// +------------------------------+
>可以使用UDF访问用户定义的类型(UDT)字段。详见SparkSQL referencing attributes of UDT。 笔记: >根据Spark版本,这些方法中的一些只能使用HiveContext。 UDF应与标准SQLContext和HiveContext独立于版本。 df.select(explode($"an_array_of_structs")).show // +--------------------+ // | col| // +--------------------+ // |[foo,WrappedArr...| // |[bar,WrappedArr...| // |[foz,WrappedArr...| // |[baz,WrappedArr...| // +--------------------+ >点语法可以与通配符(*)组合以选择(可能是多个)字段,而不必明确指定名称: df.select($"a_struct.*").show // +---+ // | x| // +---+ // | 1| // | 2| // +---+ (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
