加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

Scala – Spark中的分层抽样

发布时间:2020-12-16 09:20:19 所属栏目:安全 来源:网络整理
导读:我有包含用户和购买数据的数据集.这里是一个例子,其中第一个元素是userId,第二个是productId,第三个表示布尔值. (2147481832,23355149,1)(2147481832,973010692,2134870842,541023347,1682206630,1138211459,852202566,201375938,486538879,919187908,1)...
我有包含用户和购买数据的数据集.这里是一个例子,其中第一个元素是userId,第二个是productId,第三个表示布尔值.

(2147481832,23355149,1)
(2147481832,973010692,2134870842,541023347,1682206630,1138211459,852202566,201375938,486538879,919187908,1)
...

我想确保我只占用每个用户数据的80%,并构建RDD,同时占用20%的其余部分,并构建另一个RDD.让火车和测试.我想远离使用groupBy开始,因为它可以创建内存问题,因为数据集很大.最好的方法是什么?

我可以做以下的工作,但这不会给每个用户80%的.

val percentData = data.map(x => ((math.random * 100).toInt,x._1. x._2,x._3)
val train = percentData.filter(x => x._1 < 80).values.repartition(10).cache()

解决方法

霍顿的答案有一种可能性,而另一种可能性是:

您可以使用PairRDDFunctions类中的sampleByKeyExact转换.

sampleByKeyExact(boolean withReplacement,scala.collection.Map fractions,long seed)
Return a subset of this RDD sampled by key (via stratified sampling) containing exactly math.ceil(numItems * samplingRate) for each stratum (group of pairs with the same key).

这就是我会怎么做的:

考虑到以下列表:

val list = List((2147481832,1),(2147481832,(214748183,91187908,1))

我将创建一个RDD对,将所有用户映射为密钥:

val data = sc.parallelize(list.toSeq).map(x => (x._1,(x._2,x._3)))

然后我将为每个键设置分数,如下所示,因为您已经注意到sampleByKeyExact为每个键获取分数的映射:

val fractions = data.map(_._1).distinct.map(x => (x,0.8)).collectAsMap

我在这里做的是实际上映射关键字以找到不同的,然后将每个关键字关联到一个等于0.8的分数,然后我把整个作为一个地图.

现在要抽样,我所要做的就是:

import org.apache.spark.rdd.PairRDDFunctions
val sampleData = data.sampleByKeyExact(false,fractions,2L)

要么

val sampleData = data.sampleByKeyExact(withReplacement = false,fractions = fractions,seed = 2L)

您可以检查钥匙或数据或数据样本的计数:

scala > data.count
// [...]
// res10: Long = 12

scala > sampleData.count
// [...]
// res11: Long = 10

编辑:我决定添加一部分来对DataFrames执行分层抽样.

所以我们将从上面的例子中考虑相同的数据(列表).

val df = list.toDF("keyColumn","value1","value2")
df.show
// +----------+----------+------+
// | keyColumn|    value1|value2|
// +----------+----------+------+
// |2147481832|  23355149|     1|
// |2147481832| 973010692|     1|
// |2147481832|2134870842|     1|
// |2147481832| 541023347|     1|
// |2147481832|1682206630|     1|
// |2147481832|1138211459|     1|
// |2147481832| 852202566|     1|
// |2147481832| 201375938|     1|
// |2147481832| 486538879|     1|
// |2147481832| 919187908|     1|
// | 214748183| 919187908|     1|
// | 214748183|  91187908|     1|
// +----------+----------+------+

我们将需要底层的RDD,通过将我们的密钥定义为第一列:我们创建这个RDD元素的元组:

val data: RDD[(Int,Row)] = df.rdd.keyBy(_.getInt(0))
val fractions: Map[Int,Double] = data.map(_._1)
                                      .distinct
                                      .map(x => (x,0.8))
                                      .collectAsMap

val sampleData: RDD[Row] = data.sampleByKeyExact(withReplacement = false,2L)
                               .values

val sampleDataDF: DataFrame = spark.createDataFrame(sampleData,df.schema) // you can use sqlContext.createDataFrame(...) instead for spark 1.6)

现在您可以检查钥匙或数据样本的数量:

scala > df.count
// [...]
// res9: Long = 12

scala > sampleDataDF.count
// [...]
// res10: Long = 10

编辑2:由于Spark 1.5.0可以使用DataFrameStatFunctions.sampleBy方法:

df.stat.sampleBy("keyColumn",seed)

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读