加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

使用Scala在Spark DataFrame中重用JSON中的Schema

发布时间:2020-12-16 18:56:38 所属栏目:安全 来源:网络整理
导读:我有一些像这样的 JSON数据: {"gid":"111","createHour":"2014-10-20 01:00:00.0","revisions":[{"revId":"2","modDate":"2014-11-20 01:40:37.0"},{"revId":"4","modDate":"2014-11-20 01:40:40.0"}],"comments":[],"replies":[]}{"gid":"222","createHour
我有一些像这样的 JSON数据:

{"gid":"111","createHour":"2014-10-20 01:00:00.0","revisions":[{"revId":"2","modDate":"2014-11-20 01:40:37.0"},{"revId":"4","modDate":"2014-11-20 01:40:40.0"}],"comments":[],"replies":[]}
{"gid":"222","createHour":"2014-12-20 01:00:00.0","modDate":"2014-11-20 01:39:31.0"},"modDate":"2014-11-20 01:39:34.0"}],"replies":[]}
{"gid":"333","createHour":"2015-01-21 00:00:00.0","revisions":[{"revId":"25","modDate":"2014-11-21 00:34:53.0"},{"revId":"110","modDate":"2014-11-21 00:47:10.0"}],"comments":[{"comId":"4432","content":"How are you?"}],"replies":[{"repId":"4441","content":"I am good."}]}
{"gid":"444","createHour":"2015-09-20 23:00:00.0","modDate":"2014-11-20 23:23:47.0"}],"replies":[]}
{"gid":"555","createHour":"2016-01-21 01:00:00.0","revisions":[{"revId":"135","modDate":"2014-11-21 01:01:58.0"}],"replies":[]}
{"gid":"666","createHour":"2016-04-23 19:00:00.0","revisions":[{"revId":"136","modDate":"2014-11-23 19:50:51.0"}],"replies":[]}

我可以阅读:

val df = sqlContext.read.json("./data/full.json")

我可以使用df.printSchema打印模式

root
 |-- comments: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- comId: string (nullable = true)
 |    |    |-- content: string (nullable = true)
 |-- createHour: string (nullable = true)
 |-- gid: string (nullable = true)
 |-- replies: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- content: string (nullable = true)
 |    |    |-- repId: string (nullable = true)
 |-- revisions: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- modDate: string (nullable = true)
 |    |    |-- revId: string (nullable = true)

我可以显示数据df.show(10,false)

+---------------------+---------------------+---+-------------------+---------------------------------------------------------+
|comments             |createHour           |gid|replies            |revisions                                                |
+---------------------+---------------------+---+-------------------+---------------------------------------------------------+
|[]                   |2014-10-20 01:00:00.0|111|[]                 |[[2014-11-20 01:40:37.0,2],[2014-11-20 01:40:40.0,4]]   |
|[]                   |2014-12-20 01:00:00.0|222|[]                 |[[2014-11-20 01:39:31.0,[2014-11-20 01:39:34.0,4]]   |
|[[4432,How are you?]]|2015-01-21 00:00:00.0|333|[[I am good.,4441]]|[[2014-11-21 00:34:53.0,25],[2014-11-21 00:47:10.0,110]]|
|[]                   |2015-09-20 23:00:00.0|444|[]                 |[[2014-11-20 23:23:47.0,2]]                              |
|[]                   |2016-01-21 01:00:00.0|555|[]                 |[[2014-11-21 01:01:58.0,135]]                            |
|[]                   |2016-04-23 19:00:00.0|666|[]                 |[[2014-11-23 19:50:51.0,136]]                            |
+---------------------+---------------------+---+-------------------+---------------------------------------------------------+

我可以打印/读取架构val dfSc = df.schema:

StructType(StructField(comments,ArrayType(StructType(StructField(comId,StringType,true),StructField(content,true)),StructField(createHour,StructField(gid,StructField(replies,ArrayType(StructType(StructField(content,StructField(repId,StructField(revisions,ArrayType(StructType(StructField(modDate,StructField(revId,true))

我可以更好地打印出来:

println(df.schema.fields.mkString(",n"))
StructField(comments,true)

现在,如果我在同一个文件中读取没有注释和回复的行,则使用val df2 = sqlContext.read.
json(“./ data / partialRevOnly.json”)只是删除那些行,我用printSchema得到这样的东西:

root
 |-- comments: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- createHour: string (nullable = true)
 |-- gid: string (nullable = true)
 |-- replies: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- revisions: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- modDate: string (nullable = true)
 |    |    |-- revId: string (nullable = true)

我不喜欢那样,所以我使用:

val df3 = sqlContext.read.
  schema(dfSc).
  json("./data/partialRevOnly.json")

其中原始架构是dfSc.所以现在我得到了删除数据之前的模式:

root
 |-- comments: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- comId: string (nullable = true)
 |    |    |-- content: string (nullable = true)
 |-- createHour: string (nullable = true)
 |-- gid: string (nullable = true)
 |-- replies: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- content: string (nullable = true)
 |    |    |-- repId: string (nullable = true)
 |-- revisions: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- modDate: string (nullable = true)
 |    |    |-- revId: string (nullable = true)

这很完美……差不多.我想将此架构分配给类似于此的变量:

val textSc =  StructField(comments,true)

好的 – 由于双引号和’其他一些结构’的东西,这不起作用,所以试试这个(有错误):

import org.apache.spark.sql.types._

val textSc = StructType(Array(
    StructField("comments",ArrayType(StructType(StructField("comId",StructField("content",StructField("createHour",StructField("gid",StructField("replies",ArrayType(StructType(StructField("content",StructField("repId",StructField("revisions",ArrayType(StructType(StructField("modDate",StructField("revId",true)
))

Name: Compile Error
Message: <console>:78: error: overloaded method value apply with alternatives:
  (fields: Array[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType <and>
  (fields: java.util.List[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType <and>
  (fields: Seq[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType
 cannot be applied to (org.apache.spark.sql.types.StructField,org.apache.spark.sql.types.StructField)
           StructField("comments",

…没有这个错误(我无法快速解决),我想使用textSc代替dfSc来读取带有强制模式的JSON数据.

我无法找到一种“一对一匹配”的方式(通过println或…)获得具有可接受语法的模式(如上所述).我想一些编码可以通过大小写匹配来完成双引号.但是,我仍然不清楚需要哪些规则才能从测试夹具中获取确切的架构,我可以在我的定期生产(与测试夹具)代码中重复使用.有没有办法让这个架构完全像我编码一样打印?

注意:这包括双引号和所有正确的StructField / Types等等,以便与代码兼容.

作为侧边栏,我考虑保存一个完整格式的黄金JSON文件,以便在Spark作业开始时使用,但我想最终在适用的结构位置使用日期字段和其他更简洁的类型而不是字符串.

如何从我的测试工具中获取dataFrame信息(使用带有注释和回复的完整形式的JSON输入行)到我可以将模式作为源代码放入生产代码Scala Spark作业的位置?

注意:最好的答案是一些编码方式,但一个解释,所以我可以通过编码跋涉,plod,toil,wade,犁和slog也是有帮助的.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读