pyspark读取pickle文件内容并存储到hive
发布时间:2020-12-14 04:39:55 所属栏目:百科 来源:网络整理
导读:在平常工作中,难免要和大数据打交道,而有时需要读取本地文件然后存储到Hive中,本文接下来将具体讲解。 过程: 使用pickle模块读取.plk文件; 将读取到的内容转为RDD; 将RDD转为DataFrame之后存储到Hive仓库中; 1、使用pickle保存和读取pickle文件 impor
在平常工作中,难免要和大数据打交道,而有时需要读取本地文件然后存储到Hive中,本文接下来将具体讲解。 过程:
1、使用pickle保存和读取pickle文件 import pickle data = "" path = "xxx.plj" #保存为pickle pickle.dump(data,open(path,'wb')) 读取pickle data2 = pickle.load(open(path,1)">rb')) 使用python3读取python2保存的pickle文件时,会报错: UnicodeDecodeError: 'ascii' codec can't decode byte 0xa0 in position 11: ordinal not in range(128) 解决方法: data2 = pickle.load(open(path,1)">',encoding=latin1')) 使用python2读取python3保存的pickle文件时,会报错: unsupported pickle protocol:3 解决方法: pickle path = xxx.plk" path2 = xxx2.plk data = pickle.load(open(path,1)">保存为python2的pickle pickle.dump(data,open(path2,1)">'),protocol=2) 读取pickle data2 = pickle.load(open(path2,1)">')) 2、读取pickle的内容并转为RDD from pyspark.sql SparkSession Row pickle spark = SparkSession .builder .appName(Python Spark SQL basic example) .config(spark.some.config.option",1)">some-value) .getOrCreate() with open(picle_path,) as fp: data = pickle.load(fp) 这里可根据data的类型进行相应的操作 假设data是一个一维数组:[1,2,3,4,5],读取数据并转为rdd pickleRdd = spark.parallelize(data) 3、将rdd转为dataframe并存入到Hive中 定义列名
column = Row(col转为dataframe
pickleDf =pickleRdd.map(lambda x:column(x))
存储到Hive中,会新建数据库:hive_database,新建表:hive_table,以覆盖的形式添加,partitionBy用于指定分区字段
pickleDf..write.saveAsTable(hive_database.hvie_tableoverwrite
或者: df 转为临时表/临时视图 df.createOrReplaceTempView(df_tmp_view spark.sql 插入hive spark.sql(insert overwrite table XXXXX 表名 partition(分区名称=分区值) 多个分区按照逗号分开 select XXXXX 字段名称,跟hive字段顺序对应,不包含分区字段 from df_tmp_view""") (2)以saveAsTable的形式 "overwrite"是重写表的模式,如果表存在,就覆盖掉原始数据,如果不存在就重新生成一张表 # mode("append")是在原有表的基础上进行添加数据 df.write.format(hive").mode(").saveAsTable(default.write_test') 以下是通过rdd创建dataframe的几种方法: (1)通过键值对 d = [{name': Aliceage': 1}] output = spark.createDataFrame(d).collect() print(output) [Row(age=1,name='Alice')] (2)通过rdd a = [()] rdd = sc.parallelize(a) output = spark.createDataFrame(rdd).collect() (output) output = spark.createDataFrame(rdd,1)">]).collect() [Row(_1='Alice',_2=1)] [Row(name='Alice',age=1)] (3)通过rdd和Row Row a = [( sc.parallelize(a) Person = Row() person = rdd.map(lambda r: Person(*r)) output = spark.createDataFrame(person).collect()from pyspark.sql.types import * a = [( sc.parallelize(a) schema = StructType( [ StructField(,StringType(),True),StructField( spark.createDataFrame(rdd,schema).collect()df = spark.createDataFrame(rdd,1)">])print(df) DataFrame[name: string,age: bigint] print(type(df.toPandas())) <class 'pandas.core.frame.DataFrame'> 传入pandas DataFrame output = spark.createDataFrame(df.toPandas()).collect() (编辑:李大同) |