加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 使用复杂类型查询Spark SQL DataFrame

发布时间:2020-12-16 09:33:47 所属栏目:安全 来源:网络整理
导读:如何使用复杂类型(如maps / arrays)查询RDD? 例如,当我写这个测试代码时: case class Test(name: String,map: Map[String,String])val map = Map("hello" - "world","hey" - "there")val map2 = Map("hello" - "people","hey" - "you")val rdd = sc.paral
如何使用复杂类型(如maps / arrays)查询RDD?
例如,当我写这个测试代码时:

case class Test(name: String,map: Map[String,String])
val map = Map("hello" -> "world","hey" -> "there")
val map2 = Map("hello" -> "people","hey" -> "you")
val rdd = sc.parallelize(Array(Test("first",map),Test("second",map2)))

我虽然语法会像:

sqlContext.sql("SELECT * FROM rdd WHERE map.hello = world")

要么

sqlContext.sql("SELECT * FROM rdd WHERE map[hello] = world")

但我得到了

Can’t access nested field in type MapType(StringType,StringType,true)

org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes

分别。

解决方法

这取决于列的类型。让我们从一些虚拟数据开始:

import org.apache.spark.sql.functions.{udf,lit}
import scala.util.Try

case class SubRecord(x: Int)
case class ArrayElement(foo: String,bar: Int,vals: Array[Double])
case class Record(
  an_array: Array[Int],a_map: Map[String,String],a_struct: SubRecord,an_array_of_structs: Array[ArrayElement])


val df = sc.parallelize(Seq(
  Record(Array(1,2,3),Map("foo" -> "bar"),SubRecord(1),Array(
           ArrayElement("foo",1,Array(1.0,2.0)),ArrayElement("bar",Array(3.0,4.0)))),Record(Array(4,5,6),Map("foz" -> "baz"),SubRecord(2),Array(ArrayElement("foz",3,Array(5.0,6.0)),ArrayElement("baz",4,Array(7.0,8.0))))
)).toDF
df.registerTempTable("df")
df.printSchema

// root
// |-- an_array: array (nullable = true)
// |    |-- element: integer (containsNull = false)
// |-- a_map: map (nullable = true)
// |    |-- key: string
// |    |-- value: string (valueContainsNull = true)
// |-- a_struct: struct (nullable = true)
// |    |-- x: integer (nullable = false)
// |-- an_array_of_structs: array (nullable = true)
// |    |-- element: struct (containsNull = true)
// |    |    |-- foo: string (nullable = true)
// |    |    |-- bar: integer (nullable = false)
// |    |    |-- vals: array (nullable = true)
// |    |    |    |-- element: double (containsNull = false)

>数组列:

> Column.getItem方法

df.select($"an_array".getItem(1)).show

// +-----------+
// |an_array[1]|
// +-----------+
// |          2|
// |          5|
// +-----------+

> Hive方括号语法:

sqlContext.sql("SELECT an_array[1] FROM df").show

// +---+
// |_c0|
// +---+
// |  2|
// |  5|
// +---+

> UDF

val get_ith = udf((xs: Seq[Int],i: Int) => Try(xs(i)).toOption)

df.select(get_ith($"an_array",lit(1))).show

// +---------------+
// |UDF(an_array,1)|
// +---------------+
// |              2|
// |              5|
// +---------------+

>地图列

>使用Column.getField方法:

df.select($"a_map".getField("foo")).show

// +----------+
// |a_map[foo]|
// +----------+
// |       bar|
// |      null|
// +----------+

>使用Hive括号语法:

sqlContext.sql("SELECT a_map['foz'] FROM df").show

// +----+
// | _c0|
// +----+
// |null|
// | baz|
// +----+

>使用带有点语法的完整路径:

df.select($"a_map.foo").show

// +----+
// | foo|
// +----+
// | bar|
// |null|
// +----+

>使用UDF

val get_field = udf((kvs: Map[String,k: String) => kvs.get(k))

df.select(get_field($"a_map",lit("foo"))).show

// +--------------+
// |UDF(a_map,foo)|
// +--------------+
// |           bar|
// |          null|
// +--------------+

>使用带有点语法的完整路径的struct列:

>使用DataFrame API

df.select($"a_struct.x").show

// +---+
// |  x|
// +---+
// |  1|
// |  2|
// +---+

>使用原始SQL

sqlContext.sql("SELECT a_struct.x FROM df").show

// +---+
// |  x|
// +---+
// |  1|
// |  2|
// +---+

可以使用点语法,名称和标准列方法访问结构体数组中的字段:

df.select($"an_array_of_structs.foo").show

// +----------+
// |       foo|
// +----------+
// |[foo,bar]|
// |[foz,baz]|
// +----------+

sqlContext.sql("SELECT an_array_of_structs[0].foo FROM df").show

// +---+
// |_c0|
// +---+
// |foo|
// |foz|
// +---+

df.select($"an_array_of_structs.vals".getItem(1).getItem(1)).show

// +------------------------------+
// |an_array_of_structs.vals[1][1]|
// +------------------------------+
// |                           4.0|
// |                           8.0|
// +------------------------------+

>可以使用UDF访问用户定义的类型(UDT)字段。详见SparkSQL referencing attributes of UDT。

笔记:

>根据Spark版本,这些方法中的一些只能使用HiveContext。 UDF应与标准SQLContext和HiveContext独立于版本。
一般来说,嵌套价值观是二等公民。嵌套字段不支持所有典型操作。根据上下文,可以更好地平坦化模式和/或爆炸集合

df.select(explode($"an_array_of_structs")).show

// +--------------------+
// |                 col|
// +--------------------+
// |[foo,WrappedArr...|
// |[bar,WrappedArr...|
// |[foz,WrappedArr...|
// |[baz,WrappedArr...|
// +--------------------+

>点语法可以与通配符(*)组合以选择(可能是多个)字段,而不必明确指定名称:

df.select($"a_struct.*").show
// +---+
// |  x|
// +---+
// |  1|
// |  2|
// +---+

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读