加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 如何从嵌套的struct元素数组创建Spark DataFrame?

发布时间:2020-12-16 19:11:33 所属栏目:安全 来源:网络整理
导读:我在Spark中读过一个 JSON文件.该文件具有以下结构: scala tweetBlob.printSchemaroot |-- related: struct (nullable = true) | |-- next: struct (nullable = true) | | |-- href: string (nullable = true) |-- search: struct (nullable = true) | |--
我在Spark中读过一个 JSON文件.该文件具有以下结构:

scala> tweetBlob.printSchema
root
 |-- related: struct (nullable = true)
 |    |-- next: struct (nullable = true)
 |    |    |-- href: string (nullable = true)
 |-- search: struct (nullable = true)
 |    |-- current: long (nullable = true)
 |    |-- results: long (nullable = true)
 |-- tweets: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- cde: struct (nullable = true)
...
...
 |    |    |-- cdeInternal: struct (nullable = true)
...
...
 |    |    |-- message: struct (nullable = true)
...
...

我理想的是一个DataFrame,列有“cde”,“cdeInternal”,“message”……如下图所示

root
|-- cde: struct (nullable = true)
...
...
|-- cdeInternal: struct (nullable = true)
...
...
|-- message: struct (nullable = true)
...
...

我已设法使用“explode”将“tweets”数组中的元素提取到名为“tweets”的列中

scala> val tweets = tweetBlob.select(explode($"tweets").as("tweets"))
tweets: org.apache.spark.sql.DataFrame = [tweets: struct<cde:struct<author:struct<gender:string,location:struct<city:string,country:string,state:string>,maritalStatus:struct<evidence:string,isMarried:string>,parenthood:struct<evidence:string,isParent:string>>,content:struct<sentiment:struct<evidence:array<struct<polarity:string,sentimentTerm:string>>,polarity:string>>>,cdeInternal:struct<compliance:struct<isActive:boolean,userProtected:boolean>,tracks:array<struct<id:string>>>,message:struct<actor:struct<displayName:string,favoritesCount:bigint,followersCount:bigint,friendsCount:bigint,id:string,image:string,languages:array<string>,link:string,links:array<struct<href:string,rel:string>>,listedCount:bigint,location:struct<displayName:string,objectType:string>,objectType:string,postedTime...
scala> tweets.printSchema
root
 |-- tweets: struct (nullable = true)
 |    |-- cde: struct (nullable = true)
...
...
 |    |-- cdeInternal: struct (nullable = true)
...
...
 |    |-- message: struct (nullable = true)
...
...

如何选择结构中的所有列并从中创建一个DataFrame?如果我的理解是正确的,爆炸对结构不起作用.

任何帮助表示赞赏.

解决方法

处理此问题的一种可能方法是从架构中提取所需信息.让我们从一些虚拟数据开始:

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types._


case class Bar(x: Int,y: String)
case class Foo(bar: Bar)

val df = sc.parallelize(Seq(Foo(Bar(1,"first")),Foo(Bar(2,"second")))).toDF

df.printSchema

// root
//  |-- bar: struct (nullable = true)
//  |    |-- x: integer (nullable = false)
//  |    |-- y: string (nullable = true)

和辅助函数:

def children(colname: String,df: DataFrame) = {
  val parent = df.schema.fields.filter(_.name == colname).head
  val fields = parent.dataType match {
    case x: StructType => x.fields
    case _ => Array.empty[StructField]
  }
  fields.map(x => col(s"$colname.${x.name}"))
}

最后结果如下:

df.select(children("bar",df): _*).printSchema

// root
// |-- x: integer (nullable = true)
// |-- y: string (nullable = true)

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读