加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 百科 > 正文

pyspark读取pickle文件内容并存储到hive

发布时间:2020-12-14 04:39:55 所属栏目:百科 来源:网络整理
导读:在平常工作中,难免要和大数据打交道,而有时需要读取本地文件然后存储到Hive中,本文接下来将具体讲解。 过程: 使用pickle模块读取.plk文件; 将读取到的内容转为RDD; 将RDD转为DataFrame之后存储到Hive仓库中; 1、使用pickle保存和读取pickle文件 impor

在平常工作中,难免要和大数据打交道,而有时需要读取本地文件然后存储到Hive中,本文接下来将具体讲解。

过程:

  • 使用pickle模块读取.plk文件;

  • 将读取到的内容转为RDD;

  • 将RDD转为DataFrame之后存储到Hive仓库中;

1、使用pickle保存和读取pickle文件

import pickle
data = ""
path = "xxx.plj"
#保存为pickle
pickle.dump(data,open(path,'wb'))
读取pickle
data2 = pickle.load(open(path,1)">rb'))

使用python3读取python2保存的pickle文件时,会报错:

UnicodeDecodeError: 'ascii' codec can't decode byte 0xa0 in position 11: ordinal not in range(128)

解决方法:

data2 = pickle.load(open(path,1)">',encoding=latin1'))

使用python2读取python3保存的pickle文件时,会报错:

unsupported pickle protocol:3

解决方法:

 pickle
path = xxx.plk"
path2 = xxx2.plk
data = pickle.load(open(path,1)">保存为python2的pickle
pickle.dump(data,open(path2,1)">'),protocol=2)
读取pickle
data2 = pickle.load(open(path2,1)">'))

2、读取pickle的内容并转为RDD

from pyspark.sql  SparkSession
 Row
 pickle


spark = SparkSession 
    .builder 
    .appName(Python Spark SQL basic example) 
    .config(spark.some.config.option",1)">some-value) 
    .getOrCreate()
with open(picle_path,) as fp:
    data = pickle.load(fp)
    这里可根据data的类型进行相应的操作

假设data是一个一维数组:[1,2,3,4,5],读取数据并转为rdd
pickleRdd = spark.parallelize(data)

3、将rdd转为dataframe并存入到Hive中

定义列名
column = Row(col转为dataframe
pickleDf =pickleRdd.map(lambda x:column(x))
存储到Hive中,会新建数据库:hive_database,新建表:hive_table,以覆盖的形式添加,partitionBy用于指定分区字段
pickleDf..write.saveAsTable(hive_database.hvie_tableoverwrite
data = [
    (1,1)">3145),(1,1)">41465256263281349137)
]
df = spark.createDataFrame(data,[idtest_idcamera_id])
 
 method one,default是默认数据库的名字,write_test 是要写到default中数据表的名字
df.registerTempTable(test_hive)
sqlContext.sql(create table default.write_test select * from test_hive")

或者:

 df 转为临时表/临时视图
df.createOrReplaceTempView(df_tmp_view spark.sql 插入hive
spark.sql(insert overwrite table 
                    XXXXX   表名
                   partition(分区名称=分区值)    多个分区按照逗号分开
                   select 
                   XXXXX   字段名称,跟hive字段顺序对应,不包含分区字段
                   from df_tmp_view""")

(2)以saveAsTable的形式

 "overwrite"是重写表的模式,如果表存在,就覆盖掉原始数据,如果不存在就重新生成一张表
#  mode("append")是在原有表的基础上进行添加数据
df.write.format(hive").mode(").saveAsTable(default.write_test')

以下是通过rdd创建dataframe的几种方法:

(1)通过键值对

d = [{name': Aliceage': 1}]
output = spark.createDataFrame(d).collect()
print(output)

 [Row(age=1,name='Alice')]

(2)通过rdd

a = [()]
rdd = sc.parallelize(a)
output = spark.createDataFrame(rdd).collect()
(output)
output = spark.createDataFrame(rdd,1)">]).collect()
 [Row(_1='Alice',_2=1)] [Row(name='Alice',age=1)]

(3)通过rdd和Row

 Row


a = [( sc.parallelize(a)
Person = Row()
person = rdd.map(lambda r: Person(*r))
output = spark.createDataFrame(person).collect()

from pyspark.sql.types import *

a = [( sc.parallelize(a)
schema = StructType(
    [
        StructField(,StringType(),True),StructField( spark.createDataFrame(rdd,schema).collect()

df = spark.createDataFrame(rdd,1)">])
print(df)   DataFrame[name: string,age: bigint]

print(type(df.toPandas()))   <class 'pandas.core.frame.DataFrame'>

 传入pandas DataFrame
output = spark.createDataFrame(df.toPandas()).collect()
(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读