加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – Spark JSON文本字段到RDD

发布时间:2020-12-16 09:00:36 所属栏目:安全 来源:网络整理
导读:我有一个cassandra表,其中包含一个名为snapshot的文本字段,其中包含 JSON对象: [identifier,timestamp,snapshot] 我知道为了能够使用Spark对该字段进行转换,我需要将该RDD的该字段转换为另一个RDD以对JSON模式进行转换. 那是对的吗?我应该怎么做呢? 编辑
我有一个cassandra表,其中包含一个名为snapshot的文本字段,其中包含 JSON对象:

[identifier,timestamp,snapshot]

我知道为了能够使用Spark对该字段进行转换,我需要将该RDD的该字段转换为另一个RDD以对JSON模式进行转换.

那是对的吗?我应该怎么做呢?

编辑:现在我设法从单个文本字段创建一个RDD:

val conf = new SparkConf().setAppName("signal-aggregation")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val snapshots = sc.cassandraTable[(String,String,String)]("listener","snapshots")
val first = snapshots.first()
val firstJson = sqlContext.jsonRDD(sc.parallelize(Seq(first._3)))
firstJson.printSchema()

这向我展示了JSON架构.好!

我如何告诉Spark应该在表Snapshots的所有行上应用此模式,以便从每一行获取该快照字段的RDD?

解决方法

几乎就在那里,你只想将你的json传递给你的RDD [String]
jsonRDD方法

val conf = new SparkConf().setAppName("signal-aggregation")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val snapshots = sc.cassandraTable[(String,"snapshots")
val jsons = snapshots.map(_._3) // Get Third Row Element Json(RDD[String]) 
val jsonSchemaRDD = sqlContext.jsonRDD(jsons) // Pass in RDD directly
jsonSchemaRDD.registerTempTable("testjson")
sqlContext.sql("SELECT * FROM testjson where .... ").collect

一个简单的例子

val stringRDD = sc.parallelize(Seq(""" 
  { "isActive": false,"balance": "$1,431.73","picture": "http://placehold.it/32x32","age": 35,"eyeColor": "blue"
  }""","""{
    "isActive": true,"balance": "$2,515.60","age": 34,"""{
    "isActive": false,"balance": "$3,765.29","age": 26,"eyeColor": "blue"
  }""")
)
sqlContext.jsonRDD(stringRDD).registerTempTable("testjson")
csc.sql("SELECT age from testjson").collect
//res24: Array[org.apache.spark.sql.Row] = Array([35],[34],[26])

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读