scala – Spark JSON文本字段到RDD
发布时间:2020-12-16 09:00:36 所属栏目:安全 来源:网络整理
导读:我有一个cassandra表,其中包含一个名为snapshot的文本字段,其中包含 JSON对象: [identifier,timestamp,snapshot] 我知道为了能够使用Spark对该字段进行转换,我需要将该RDD的该字段转换为另一个RDD以对JSON模式进行转换. 那是对的吗?我应该怎么做呢? 编辑
我有一个cassandra表,其中包含一个名为snapshot的文本字段,其中包含
JSON对象:
[identifier,timestamp,snapshot] 我知道为了能够使用Spark对该字段进行转换,我需要将该RDD的该字段转换为另一个RDD以对JSON模式进行转换. 那是对的吗?我应该怎么做呢? 编辑:现在我设法从单个文本字段创建一个RDD: val conf = new SparkConf().setAppName("signal-aggregation") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val snapshots = sc.cassandraTable[(String,String,String)]("listener","snapshots") val first = snapshots.first() val firstJson = sqlContext.jsonRDD(sc.parallelize(Seq(first._3))) firstJson.printSchema() 这向我展示了JSON架构.好! 我如何告诉Spark应该在表Snapshots的所有行上应用此模式,以便从每一行获取该快照字段的RDD? 解决方法
几乎就在那里,你只想将你的json传递给你的RDD [String]
jsonRDD方法 val conf = new SparkConf().setAppName("signal-aggregation") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val snapshots = sc.cassandraTable[(String,"snapshots") val jsons = snapshots.map(_._3) // Get Third Row Element Json(RDD[String]) val jsonSchemaRDD = sqlContext.jsonRDD(jsons) // Pass in RDD directly jsonSchemaRDD.registerTempTable("testjson") sqlContext.sql("SELECT * FROM testjson where .... ").collect 一个简单的例子 val stringRDD = sc.parallelize(Seq(""" { "isActive": false,"balance": "$1,431.73","picture": "http://placehold.it/32x32","age": 35,"eyeColor": "blue" }""","""{ "isActive": true,"balance": "$2,515.60","age": 34,"""{ "isActive": false,"balance": "$3,765.29","age": 26,"eyeColor": "blue" }""") ) sqlContext.jsonRDD(stringRDD).registerTempTable("testjson") csc.sql("SELECT age from testjson").collect //res24: Array[org.apache.spark.sql.Row] = Array([35],[34],[26]) (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |