加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 编程开发 > Python > 正文

Python技术栈与Spark交叉数据分析双向整合技术实战!

发布时间:2020-12-17 00:25:14 所属栏目:Python 来源:网络整理
导读:版权声明:本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。进群:960410445 即可获取数十套PDF!,如有任何学术交流,可随时联系。

版权声明:本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。进群:960410445 即可获取数十套PDF!,如有任何学术交流,可随时联系。

  • Python Spark DataFrame 基础
df = spark.read.parquet('/sql/users.parquet')
 df.show()

+------+--------------+----------------+
| name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa| null| [3,9,15,20]|
| Ben| red| []|
+------+--------------+----------------+

  • Python Spark DataFrame 聚合统计
CustomerID,Genre,Age,Annual Income (k$),Spending Score (1-100)
 0001,Male,19,39
 0002,21,81
 0003,Female,20,16,6
 0004,23,77
 0005,31,17,40
 0006,22,76

df = spark.read.csv('/sql/customers.csv',header=True)
df.printSchema()
df.show()

root
|-- CustomerID: string (nullable = true)
|-- Genre: string (nullable = true)
|-- Age: string (nullable = true)
|-- Annual Income (k$): string (nullable = true)
|-- Spending Score (1-100): string (nullable = true)

+----------+------+---+------------------+----------------------+
|CustomerID| Genre|Age|Annual Income (k$)|Spending Score (1-100)|
+----------+------+---+------------------+----------------------+
| 0001| Male| 19| 15| 39|
| 0002| Male| 21| 15| 81|
| 0003|Female| 20| 16| 6|
| 0004|Female| 23| 16| 77|
| 0005|Female| 31| 17| 40|
| 0006|Female| 22| 17| 76|
| 0007|Female| 35| 18| 6|
| 0008|Female| 23| 18| 94|
| 0009| Male| 64| 19| 3|
| 0010|Female| 30| 19| 72|
| 0011| Male| 67| 19| 14|
| 0012|Female| 35| 19| 99|
| 0013|Female| 58| 20| 15|
| 0014|Female| 24| 20| 77|
| 0015| Male| 37| 20| 13|
| 0016| Male| 22| 20| 79|
| 0017|Female| 35| 21| 35|
| 0018| Male| 20| 21| 66|
| 0019| Male| 52| 23| 29|
| 0020|Female| 35| 23| 98|
+----------+------+---+------------------+----------------------+

df.agg({"Age": "max","Annual Income (k$)":"mean","Spending Score (1-100)":"mean"}).show()

+---------------------------+-----------------------+--------+
|avg(Spending Score (1-100))|avg(Annual Income (k$))|max(Age)|
+---------------------------+-----------------------+--------+
| 50.2| 60.56| 70|
+---------------------------+-----------------------+--------+

  • alias(alias)为DataFrame定义一个别名,稍后再函数中就可以利用这个别名来做相关的运 算,例如说自关联Join:
df1 = df.alias('cus1')
 type(df1)
 df2 = df.alias('cus2')
 df3 = df1.join(df2,col('cus1.CustomerId')==col('cus2.CustomerId'),'inner')
 df3.count()

200

+----------+------+---+------------------+----------------------+----------+------+---+------------------+----------------------+
|CustomerID| Genre|Age|Annual Income (k$)|Spending Score (1-100)|CustomerID| Genre|Age|Annual Income (k$)|Spending Score (1-100)|
+----------+------+---+------------------+----------------------+----------+------+---+------------------+----------------------+
| 0001| Male| 19| 15| 39| 0001| Male| 19| 15| 39|
| 0002| Male| 21| 15| 81| 0002| Male| 21| 15| 81|
| 0003|Female| 20| 16| 6| 0003|Female| 20| 16| 6|
| 0004|Female| 23| 16| 77| 0004|Female| 23| 16| 77|
| 0005|Female| 31| 17| 40| 0005|Female| 31| 17| 40|
| 0006|Female| 22| 17| 76| 0006|Female| 22| 17| 76|
| 0007|Female| 35| 18| 6| 0007|Female| 35| 18| 6|
| 0008|Female| 23| 18| 94| 0008|Female| 23| 18| 94|
| 0009| Male| 64| 19| 3| 0009| Male| 64| 19| 3|
| 0010|Female| 30| 19| 72| 0010|Female| 30| 19| 72|
+----------+------+---+------------------+----------------------+----------+------+---+------------------+----------------------+
only showing top 10 rows

  • cache(),将DataFrame缓存到StorageLevel对应的缓存级别中,默认是 MEMORY_AND_DISK
df = spark.read.csv('/sql/customers.csv',header=True)
 a = df.cache()
 a.show()

+----------+------+---+------------------+----------------------+
|CustomerID| Genre|Age|Annual Income (k$)|Spending Score (1-100)|
+----------+------+---+------------------+----------------------+
| 0001| Male| 19| 15| 39|
| 0002| Male| 21| 15| 81|
| 0003|Female| 20| 16| 6|
| 0004|Female| 23| 16| 77|
| 0005|Female| 31| 17| 40|
| 0006|Female| 22| 17| 76|
| 0007|Female| 35| 18| 6|
| 0008|Female| 23| 18| 94|
| 0009| Male| 64| 19| 3|
| 0010|Female| 30| 19| 72|
| 0011| Male| 67| 19| 14|
| 0012|Female| 35| 19| 99|

  • checkpoint(eager=True) 对DataFrame设置断点,这个方法是Spark2.1引入的方法,这个方法的调用会斩断在这个 DataFrame上的逻辑执行计划,将前后的依赖关系持久化到checkpoint文件中去。
sc
 sc.setCheckpointDir('/datas/checkpoint')
 a.checkpoint()
 a.show()
  • coalesce(numPartitions) 重分区算法,传入的参数是DataFrame的分区数量。
注意通过read方法读取文件,创建的DataFrame默认的分区数为文件的个数,即一个文件对
 应一个分区,在分区数少于coalesce指定的分区数的时候,调用coalesce是不起作用的

df = spark.read.csv('/sql/customers.csv',header=True)
df.rdd.getNumPartitions()
1

spark.read.csv('/sql/customers.csv',header=True).coalesce(3).rdd.getNumPartitions()
1

df = spark.range(0,2,3)
df.rdd.getNumPartitions()
df.coalesce(2).rdd.getNumPartitions()
2

  • repartition(numPartitions,*cols)这个方法和coalesce(numPartitions) 方法一样,都是 对DataFrame进行重新的分区,但是repartition这个方法会使用hash算法,在整个集群中进 行shuffle,效率较低。repartition方法不仅可以指定分区数,还可以指定按照哪些列来做分 区。
df = spark.read.csv('/sql/customers.csv',header=True)
 df.rdd.getNumPartitions()
 1

df2 = df.repartition(3)
df2.rdd.getNumPartitions()
3

df2.columns
df3 = df2.repartition(6,'Genre')
df3.show(20)

+----------+------+---+------------------+----------------------+
|CustomerID| Genre|Age|Annual Income (k$)|Spending Score (1-100)|
+----------+------+---+------------------+----------------------+
| 0003|Female| 20| 16| 6|
| 0004|Female| 23| 16| 77|
| 0005|Female| 31| 17| 40|
| 0006|Female| 22| 17| 76|
| 0007|Female| 35| 18| 6|
| 0008|Female| 23| 18| 94|
| 0010|Female| 30| 19| 72|
| 0012|Female| 35| 19| 99|
| 0013|Female| 58| 20| 15|

df3.rdd.getNumPartitions()
6

  • colRegex(colName)用正则表达式的方式返回我们想要的列。
df = spark.createDataFrame([("a",1),("b",2),("c",3)],["Col1","a"])
 df.select(df.colRegex("`(Col1)?+.+`")).show()
 +---+
 | a|
 +---+
 | 1|
 | 2|
 | 3|
 +---+
  • collect(),返回DataFrame中的所有数据,注意数据量大了容易造成Driver节点内存溢 出!
df = spark.createDataFrame([("a","a"])
 df.collect()
 [Row(Col1='a',a=1),Row(Col1='b',a=2),Row(Col1='c',a=3)]
  • columns,以列表的形式返回DataFrame的所有列名
df = spark.read.csv('/sql/customers.csv',header=True)
 df.columns

df = spark.read.csv('/sql/customers.csv',header=True)
df.columns
['CustomerID','Genre','Age','Annual Income (k$)','Spending Score (1-100)']

  • SparkSQL DataFrame 转换为 PandasDataFrame
df = spark.read.csv('/sql/customers.csv',header=True)
 pdf = df.toPandas()
 pdf.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 200 entries,0 to 199
Data columns (total 5 columns):
CustomerID 200 non-null object
Genre 200 non-null object
Age 200 non-null object
Annual Income (k$) 200 non-null object
Spending Score (1-100) 200 non-null object
dtypes: object(5)
memory usage: 7.9+ KB

pdf['Age'] = pdf['Age'].astype('int')
pdf["Annual Income (k$)"]=pdf["Annual Income (k$)"].astype('int')
pdf["Spending Score (1-100)"]=pdf["Spending Score (1-100)"].astype('int')
pdf.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 200 entries,0 to 199
Data columns (total 5 columns):
CustomerID 200 non-null object
Genre 200 non-null object
Age 200 non-null int64
Annual Income (k$) 200 non-null int64
Spending Score (1-100) 200 non-null int64
dtypes: int64(3),object(2)
memory usage: 7.9+ KB

  • PandasDataFrame 转换为 SparkSQL DataFrame
df1 = spark.createDataFrame(pdf)
 df1.corr("Age","Annual Income (k$)")
 df1.corr("Spending Score (1-100)","Annual Income (k$)")

0.009902848094037492

  • count()返回DataFrame中Row的数量
df = spark.read.csv('/sql/customers.csv',header=True)
 df.count()

200

  • createGlobalTempView(name)使用DataFrame创建一个全局的临时表,其生命周期 和启动的app的周期一致,即启动的spark应用存在则这个临时的表就一直能访问。直到 sparkcontext的stop方法的调用退出应用为止。创建的临时表保存在global_temp这个库 中

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读