Java Spark Streaming JSON解析
发布时间:2020-12-15 04:36:38  所属栏目:Java  来源:网络整理 
            导读:我已经开始从Spark引擎学习火花流,并且很新的数据分析和火花.我只是想创建一个小IOT应用程序,我想在其中预测未来的数据. 我有Tiva硬件,它发送实时传感器JSON数据如下, [{"t":1478091719000,"sensors":[{"s":"s1","d":"+253.437"},{"s":"s2","d":"+129.750"},
                
                
                
            | 
 我已经开始从Spark引擎学习火花流,并且很新的数据分析和火花.我只是想创建一个小IOT应用程序,我想在其中预测未来的数据. 
  
  我有Tiva硬件,它发送实时传感器JSON数据如下, [{"t":1478091719000,"sensors":[{"s":"s1","d":"+253.437"},{"s":"s2","d":"+129.750"},{"s":"s3","d":"+45.500"},{"s":"s4","d":"+255.687"},{"s":"s5","d":"+290.062"},{"s":"s6","d":"+281.500"},{"s":"s7","d":"+308.250"},{"s":"s8","d":"+313.812"}]}]在此t中是发布数据的unix时间戳. 我想要做的是,使用这些数据并创建火花流的对象,然后通过spark的Mlib(机器学习)或等效库传递所有数据以预测未来的数据. 我想要了解所有技术选择是否可行 >我决定使用? 这是我用来消费来自KAFKA的消息的代码. SparkConf conf = new SparkConf().setAppName("DattusSpark").setMaster("local[2]");
    JavaSparkContext sc = new JavaSparkContext(conf);
    JavaStreamingContext ssc = new JavaStreamingContext(sc,new Duration(2000));
    // TODO: processing pipeline
    Map<String,String> kafkaParams = new HashMap<String,String>();
    kafkaParams.put("metadata.broker.list","kafkaserver_address:9092");
    Set<String> topics = Collections.singleton("RAH");
    JavaPairInputDStream<String,String> directKafkaStream = 
            KafkaUtils.createDirectStream(ssc,String.class,StringDecoder.class,kafkaParams,topics);
    JavaDStream<String> json = directKafkaStream.map(new Function<Tuple2<String,String>,String>() {
        public String call(Tuple2<String,String> message) throws Exception {
            System.out.println(message._2());
            return message._2();
        };
    });
    System.out.println(" json is  0------ 0"+ json);
    json.foreachRDD(rdd -> {
        rdd.foreach(
                record -> System.out.println(record));
    });
    ssc.start();
    ssc.awaitTermination();PS:我想在Java中做到这一点,以保持线性和良好的性能. 解决方法
 由于您使用的是SparkSession的SPark 2.0,因此您可以阅读JSON 
  
  
  json.foreachRDD( rdd -> {
      DataFrame df= spark.read.json(rdd)
      //process json with this DF.
}或者您可以将rdd转换为Row的RDD,然后您可以使用createDataFrame方法. json.foreachRDD( rdd -> {
          DataFrame df= spark.createDataFrame(rdd);
          //process json with this DF.
    }DF可以嵌套JSON处理,您可以按照this文章进行操作. 此外,一旦你将你的json转换为DF,你可以在任何火花模块中使用它(如spark sql,ML) (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! | 
