?
pyspark 包介绍
内容
PySpark是针对Spark的Python API。根据网上提供的资料,现在汇总一下这些类的基本用法,并举例说明如何具体使用。也是总结一下经常用到的这些公有类的使用方式。方便初学者查询及使用。
Public 类们:
类
- (loadDefaults=True,_jvm=None,_jconf=None)
-
配置一个Spark应用,一般用来设置各种Spark的键值对作为参数。
大多数时候,使用来创建SparkConf对象,也用于载入来自spark.* Java系统的属性值。此时,在对象上设置的任何参数都有高于系统属性的优先级。
对于单元测试,也能调用来略过额外的配置,无论系统属性是什么都可以获得相同的配置。
这个类中的设值方法都是支持链式结构的,例如,你可以这样编写配置conf.setMaster(“local”).setAppName(“My app”)。
(key)
-
配置中是否包含一个指定键。
(key,defaultValue=None)
-
获取配置的某些键值,或者返回默认值。
()
-
得到所有的键值对的list。
(key,value)
-
设置配置属性。
(pairs)
-
通过传递一个键值对的list,为多个参数赋值。
(value)
-
设置应用名称
(key=None,value=None,pairs=None)
-
设置环境变量复制给执行器。
(key,value)
-
如果没有,则设置一个配置属性。
(value)
-
设置主连接地址。
(value)
-
设置工作节点上的Spark安装路径。
()
-
返回一个可打印的配置版本。
(master=None,appName=None,sparkHome=None,pyFiles=None,environment=None,batchSize=0,serializer=PickleSerializer(),conf=None,gateway=None,jsc=None,profiler_cls=)
-
Spark功能的主入口,SparkContext 代表到Spark 集群的连接,并且在集群上能创建RDD和broadcast。
(value,accum_param=None)
-
用指定的初始化值创建一个 累加器。使用
(path,recursive=False)
-
使用在每个节点上的Spark job添加文件下载。这里path 参数可以使本地文件也可以使在HDFS中的文件,也可以是HTTP、HTTPS或者URI。
在Spark的job中访问文件,使用L{SparkFiles.get(fileName)}可以找到下载位置。
如果递归选项被设置为“TRUE”则路径能被指定。当前路径仅仅支持Hadoop文件系统。
>>> pyspark >>> path = os.path.join(tempdir, >>> with open(path, ... _ = testFile.write( >>> >>> ... with open(SparkFiles.get( ... fileVal = ... [x * fileVal x >>> sc.parallelize([1,2,3,4 [100,200,300,400]
(path)
-
为所有将在SparkContext上执行的任务添加一个a.py或者.zip的附件。这里path 参数可以使本地文件也可以使在HDFS中的文件,也可以是HTTP、HTTPS或者FTP URI。
-
Spark应用的唯一ID,它的格式取决于调度器实现。
本地模式下像这样的ID‘local-1433865536131’
- 模式下像这样的ID‘application_1433865536131_34483’
>>>
(path,minPartitions=None)
-
注意
- 从HDFS上读取二进制文件的路径,本地文件系统(在所有节点上都可用),或者其他hadoop支持的文件系统URI党组偶一个二进制数组。每个文件作为单独的记录,并且返回一个键值对,这个键就是每个文件的了路径,值就是每个文件的内容。
-
(path,recordLength)
path – 输入文件路径
-
recordLength – 分割记录的长度(位数)
-
注意
从平面二进制文件中载入数据,假设每个记录都是一套指定数字格式的数字(ByteBuffer),并且每个记录位数的数是恒定的。
(value)
-
广播一个制度变量到集群,返回一个L{Broadcast} 对象在分布式函数中读取。这个变量将只发一次给每个集群。
()
-
取消所有已排程的或者正在运行的job。
(groupId)
-
取消指定组的已激活job,查看
-
当不被用户指定时,默认Hadoop RDDs 为最小分区。
-
当不被用户指定时,默认并行级别执行。(例如reduce task)
(path)
-
转存配置信息到目录路径下。
()
-
创建没有分区或者元素的RDD。
()
(key)
-
(conf=None)
-
参数:conf – SparkConf (optional)
-
获取或者实例化一个SparkContext并且注册为单例模式对象。
(path,inputFormatClass,keyClass,valueClass,keyConverter=None,valueConverter=None,batchSize=0)、
-
用任意来自HDFS的键和值类读取一个老的Hadoop输入格式,本地系统(所有节点可用),或者任何支持Hadoop的文件系统的URI。这个机制是与sc.sequenceFile是一样的。
Hadoop 配置可以作为Python的字典传递。这将被转化成Java中的配置。
参数:
path – Hadoop文件路径
-
inputFormatClass – 输入的Hadoop文件的规范格式(例如 “org.apache.hadoop.mapred.TextInputFormat”)
-
keyClass – 可写键类的合格类名 (例如“org.apache.hadoop.io.Text”)
-
valueClass –可写值类的合格类名 (e.g. “org.apache.hadoop.io.LongWritable”)
-
keyConverter – (默认为none)
-
valueConverter – (默认为none)
-
conf – Hadoop配置,作为一个字典传值 (默认为none)
-
batchSize – Python对象的数量代表一个单一的JAVA对象 (默认 0,表示自动匹配batchSize)
(inputFormatClass,batchSize=0)
-
读取Hadoop输入格式用任意键值类。与上面的类相似。
参数:
inputFormatClass – 输入的Hadoop文件的规范格式(例如 “org.apache.hadoop.mapred.TextInputFormat”)
-
keyClass – 可写键类的合格类名 (例如“org.apache.hadoop.io.Text”)
-
valueClass –可写值类的合格类名 (e.g. “org.apache.hadoop.io.LongWritable”)
-
keyConverter – (默认为none)
-
valueConverter – (默认为none)
-
conf – Hadoop配置,作为一个字典传值 (默认为none)
-
batchSize – Python对象的数量代表一个单一的JAVA对象 (默认 0,表示自动匹配batchSize)
(path,batchSize=0)
-
与上面的功能类似.
(inputFormatClass,batchSize=0)
-
任意Hadoop的配置作为参数传递。
(c,numSlices=None)
-
分配一个本Python集合构成一个RDD。如果输入代表了一个性能范围,建议使用xrange。
>>> sc.parallelize([0,4,6],52],[3],[4],[6>>> sc.parallelize(xrange(0,6,2),5
(name,minPartitions=None)
-
载入使用方法保存的RDD。
>>> tmpFile = NamedTemporaryFile(delete=>>>>>> sc.parallelize(range(10)).saveAsPickleFile(tmpFile.name,5>>> sorted(sc.pickleFile(tmpFile.name,31,5,7,8,9]
(start,end=None,step=1,numSlices=None)
-
创建一个int类型元素组成的RDD,从开始值到结束(不包含结束),里面都是按照步长增长的元素。这就要用到Python内置的函数range()。如果只有一个参数调用,这个参数就表示结束值,开始值默认为0.
参数:
start –起始值
-
end – 结束值(不包含)
-
step – 步长(默认: 1)
-
numSlices –RDD分区数量(切片数)
返回值:RDD
>>> sc.range(5>>> sc.range(2,42,3>>> sc.range(1,21,5]
(rdd,partitionFunc,partitions=None,allowLocal=False)
-
执行指定的partitionFunc 在指定的分区,返回一个元素数组。如果不指定分区,则将运行在所有分区上。
>>> myRDD = sc.parallelize(range(6),3>>> sc.runJob(myRDD, part: [x * x x >>> myRDD = sc.parallelize(range(6),<span style="color: #0000ff">lambda part: [x * x <span style="color: #0000ff">for x <span style="color: #0000ff">in part],[0,2<span style="color: #000000">],True)
[0,25]
<br class="gp"><div class="highlight">?
-
(path,keyClass=None,valueClass=None,minSplits=None,batchSize=0)
-
读取Hadoop 的SequenceFile,机制如下:
1.一个Java RDD通过SequenceFile或者其他输入格式创建,需要键值的可写类参数。
2.序列化
3.如果失败,则对每个键值调用‘toString’。
4.在Python上,用来反序列化。
参数:
path –序列化文件路径
keyClass – 可用键类(例如 “org.apache.hadoop.io.Text”)
valueClass – 可用值类 (例如 “org.apache.hadoop.io.LongWritable”)
keyConverter –
valueConverter –
minSplits – 数据集最低分割数(默认 min(2,sc.defaultParallelism))
batchSize – 代表一个JAVA对象Python对象的数量 (默认0,自动)
(dirName)
设定作为检查点的RDD的目录,如果运行在集群上,则目录一定时HDFS路径。
(groupId,description,interruptOnCancel=False)
-
分配一个组ID给所有被这个线程开启的job。
通常,一个执行单位由多个Spark 的action或者job组成。应用程序可以将所有把所有job组成一个组,给一个组的描述。一旦设置好,Spark的web UI 将关联job和组。
应用使用来取消组。
>>> >>> time >>> result =
>>> lock =>>> 100 Exception(>>> ,== >>> 5>>> supress =>>> supress = threading.Thread(target=start_job,args=(10>>> supress = threading.Thread(target=>>> supress =>>>
如果对于job组,interruptOnCancel被设定为True,那么那么取消job将在执行线程中调用Thread.interrupt()。这对于确保任务实时停止是有作用的。但是默认情况下,HDFS可以通过标记节点为dead状态来停止线程。
(key,value)
-
设定本地影响提交工作的属性,例如Spark 公平调度池。
(logLevel)
-
控制日志级别。重写任何用户自定义的日志设定。有效的日志级别包括:ALL,DEBUG,ERROR,FATAL,INFO,OFF,TRACE,WARN。
(key,value)
-
设定Java系统属性,例如spark.executor.memory,这一定要在实例化SparkContext之前被激活。
()
-
打印配置信息到标准输出。
()
-
为运行SparkContext 的用户获得SPARK_USER
-
当SparkContext被发起,则返回新的时间纪元。
()
-
Return object
返回对象
()
-
关闭SparkContext。
(name,minPartitions=None,use_unicode=True)
-
从HDFS中读取一个text文件,本地文件系统(所有节点可用),或者任何支持Hadoop的文件系统的URI,然后返回一个字符串类型的RDD。
如果用户use_unicode为False,则strings类型将为str(用utf-8编码),这是一种比unicode更快、更小的编码(Spark1.2以后加入)。
>>> path = os.path.join(tempdir,>>> with open(path,) = testFile.write(>>> textFile =>>>]
-
返回由SparkContext的SparkUI实例化开启的URL。
(rdds)
-
建立RDD列表的联合。
支持不同序列化格式的RDD的unions()方法,需要使用默认的串行器将它们强制序列化(串行化):
>>> path = os.path.join(tempdir,>>> with open(path,) = testFile.write(>>> textFile =>>>>>> parallelized = sc.parallelize([>>>,]
-
应用运行的Spark的版本。
(path,use_unicode=True)
-
读取HDFS的文本文件的路径,这是一个本地文件系统(所有节点可用),或者任何支持Hadoop的文件系统的URI。每个文件被当做一个独立记录来读取,然后返回一个键值对,键为每个文件的路径,值为每个文件的内容。
如果用户use_unicode为False,则strings类型将为str(用utf-8编码),这是一种比unicode更快、更小的编码(Spark1.2以后加入)。
举例说明,如果有如下文件:
hdfs://a-hdfs-path/part-00000
hdfs://a-hdfs-path/part-00001
...
hdfs://a-hdfs-path/part-nnnnn
如果执行 rdd = sparkContext.wholeTextFiles(“hdfs://a-hdfs-path”),那么 包含:
(a-hdfs-path/part-00000,its content)
(a-hdfs-path/part-00001,its content)
...
(a-hdfs-path/part-nnnnn,its content)
>>> dirPath = os.path.join(tempdir,>>>>>> with open(os.path.join(dirPath,),) = file1.write(>>> with open(os.path.join(dirPath,),) = file2.write(>>> textFiles =>>>,u),(u,u)]
本篇接少了两个类和
(编辑:李大同)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|