¼ÓÈëÊÕ²Ø | ÉèΪÊ×Ò³ | »áÔ±ÖÐÐÄ | ÎÒҪͶ¸å Àî´óͬ £¨https://www.lidatong.com.cn/£©- ¿Æ¼¼¡¢½¨Õ¾¡¢¾­Ñé¡¢ÔƼÆËã¡¢5G¡¢´óÊý¾Ý,Õ¾³¤Íø!
µ±Ç°Î»Ö㺠Ê×Ò³ > ±à³Ì¿ª·¢ > Java > ÕýÎÄ

java spark list תΪ RDD תΪ dataset дÈë±íÖÐ

·¢²¼Ê±¼ä£º2020-12-15 08:02:30 ËùÊôÀ¸Ä¿£ºJava À´Ô´£ºÍøÂçÕûÀí
µ¼¶Á£ºpackage com.example.demo;import java.util.ArrayList;import java.util.Arrays;import java.util.HashMap;import java.util.List;import java.util.Map;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;impo
package com.example.demo;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;

public class DemoApplication {

	public static void main(String[] args) {
		
		
//		/*-----------------------ÏßÉϵ÷Ó÷½Ê½--------------------------*/
		// ¶ÁÈëµêÆÌidÊý¾Ý
		SparkSession spark = SparkSession.builder().appName("demo_spark").enableHiveSupport().getOrCreate();
		Dataset<Row> vender_set = spark.sql("select pop_vender_id from app.app_sjzt_payout_apply_with_order where dt = ¡®2019-08-05¡® and pop_vender_id is not null");
		System.out.println( "Êý¾Ý¶ÁÈ¡ OK" );
		
		
		JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
//		JavaSparkContext sc = new JavaSparkContext();
		SQLContext sqlContext = new SQLContext(sc);

		// ½«Êý¾ÝÈ¥ÖØ£¬×ª»»³É List<Row> ¸ñʽ
		vender_set =  vender_set.distinct();
		vender_set = vender_set.na().fill(0L);
		JavaRDD<Row> vender= vender_set.toJavaRDD();
		List<Row> vender_list = vender.collect();
		

		// ±éÀúÉ̼Òid£¬µ÷ÓÃjsf½Ó¿Ú,´´½¨list ±£´æ·µ»ØÊý¾Ý
		List<String> list_temp = new ArrayList<String>();
		for(Row row:vender_list) {
			String id = row.getString(0);
			String result = service.venderDownAmountList(id);
			
			System.out.println( "½Ó¿Úµ÷Ó÷µ»ØÖµ OK" );
			
			// ½âÎöjson´®,°´ÕÕJSONObject ºÍ JSONArray Ò»²ãÒ»²ã½âÎö ²¢¹ý·µ»ØÂËÊý¾Ý
			JSONObject jsonOBJ = JSON.parSEObject(result);
			JSONArray data = jsonOBJ.getJSONArray("data");
			if (data != null) {
				JSONObject data_all = data.getJSONObject(0);
				double amount = data_all.getDouble("jfDownAmount");
				// ½«É̼Òid ºÍ µ¹¹Ò½ð¶î´æÏÂÀ´
				list_temp.add("{"vender_id":"+id+","amount":"+amount+"}");
			}
			else {
				continue;
			}
			
			System.out.println( "½âÎö OK" );
			
		}
		// list תΪ RDD 
		JavaRDD<String> venderRDD = sc.parallelize(list_temp);
		
		// ×¢²á³É±í
		Dataset<Row> vender_table = sqlContext.read().json(venderRDD);
		vender_table.registerTempTable("vender");
		System.out.println( "×¢²á±í OK" );
		
		// дÈëÊý¾Ý¿â
		spark.sql("insert overwrite table dev.dev_jypt_vender_dropaway_amount select vender.vender_id,vender.amount from vender");
		System.out.println( "дÈëÊý¾Ý±í OK" );

		sc.stop();		
		System.out.println( "Hello World!" );
		
	}
}

£¨±à¼­£ºÀî´óͬ£©

¡¾ÉùÃ÷¡¿±¾Õ¾ÄÚÈݾùÀ´×ÔÍøÂ磬ÆäÏà¹ØÑÔÂÛ½ö´ú±í×÷Õ߸öÈ˹۵㣬²»´ú±í±¾Õ¾Á¢³¡¡£ÈôÎÞÒâÇÖ·¸µ½ÄúµÄȨÀû£¬Ç뼰ʱÓëÁªÏµÕ¾³¤É¾³ýÏà¹ØÄÚÈÝ!

    ÍƼöÎÄÕÂ
      ÈȵãÔĶÁ