Scala 高级算子
==> mapPartitionsWithIndex ????--->?定义:?def mapPartitionsWithIndex[U](f:(Int,Iterator[T]) => Iterator[U],preserversPartitioning: Boolean = false) ? ? --->?作用:?对?RDD?每个分区进行操作,带有分区号 ? ? --->?示例:输出分区号和内容 //?创建一个RDD val?rdd1?=?sc.parallelize(List(1,2,3,4,5,6,7,8,9)) //?创建一个函数,作为?f?的值 def?func(index:Int,?iter:Iterator[Int]):Iterator[String]?=?{ ????iter.toList.map(x=>"[PartID:?"?+?index?+?",?value=?"?+?x?+?"]").iterator } //?调用 rdd1.mapPartitionsWithIndex(func).colect //?结果 res15:?Array[String]?=?Array([PartitionID:?0,value=1],?[PartitionID:?0,value=2],value=3],value=4],? ?????????????????????????????[PartitionID:?1,value=5],?[PartitionID:?1,value=6],value=7],value=8],value=9]) ==>?aggregate ? ? --->?定义:def aggregate[U: ClassTag](zeroValue: U)(seqOp:(U,T) => U,combOp: (U,U) => U): U ????????---- (zeroValue: U)????????????初始值 ? ? ????---- seqOp:(U,T) => U????局部操作 ? ? ????---- combOp:(U,U) => U????????全局操作 ? ? --->?作用:先对局部进行操作,再对全局进行操作 ? ? --->?示例: //?求两个分区最大值的和,初始值为0 val?rdd1?=?sc.parallelize(List(1,9)) rdd1.aggregate(0)(math.max(_,_),?_+_) //?结果为:res16:?Int?=?13 ==>?aggregateByKey
? ? --->?定义: ? ? --->?作用:对?key-value?格式?的数据进行?aggregate?操作 ? ? --->?示例: //?准备一个?key-value?格式的?RDD val?parRDD?=?sc.parallelize(List(("cat",?2),("cat",?5),("mouse",?4),?12),("dog",?2)),?2) //?计算每个分区中的动物最多的个数求和 parRDD.aggregateByKey(0)(math.max(_,?_),?_+_) //?结果为:??Array[(String,?Int)]?=?Array((dog,12),?(cat,17),?(mouse,6)) //?计算每种动物的总数量 parRDD.aggregateByKey(0)(_+_,?_+_).collect????????//?方法一 parRDD.reduceByKey(_+_).collect ==> coalesce?与 repartition? ?? ? ? --->?作用:将?RDD?中的分区进行重分区 ? ? --->?区别: coalesce?默认不会进行 shuffle(false) ????????????????????????repartition?会进行 shuffle(true),?会将数据真正通过网络进行重分区 ? ? --->?示例: //?定义一个?RDD? val?rdd?=?sc.parallelize(List(1,8),?2) //?显示分区中的分区号和分区号中的内容 def?func(index:Int,?value=?"?+?x?+?"]").iterator } //?查看?rdd?中的分区情况 rdd.mapPartitionsWithIndex(func).collect //?结果为:?Array[String]?=?Array( //?[PartID:?0,?value=?1],?[PartID:?0,?value=?2],?value=?3],?value=?4],? //?[PartID:?1,?value=?5],?[PartID:?1,?value=?6],?value=?7],?value=?8]) //?使用?repartition?将分区数改为3 val?rdd2?=?rdd1.repartition(3) val?rdd3?=?rdd1.coalesce(3,?true) //?查看rdd2?与rdd3?的分区情况 rdd2.mapPartitionsWithIndex(func).collect rdd3.mapPartitionsWithIndex(func).collect //?结果为:Array[String]?=?Array( //?[PartID:?0,? //?[PartID:?2,?[PartID:?2,?value=?5]) (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |