Scala – Spark中的分层抽样
我有包含用户和购买数据的数据集.这里是一个例子,其中第一个元素是userId,第二个是productId,第三个表示布尔值.
(2147481832,23355149,1) (2147481832,973010692,2134870842,541023347,1682206630,1138211459,852202566,201375938,486538879,919187908,1) ... 我想确保我只占用每个用户数据的80%,并构建RDD,同时占用20%的其余部分,并构建另一个RDD.让火车和测试.我想远离使用groupBy开始,因为它可以创建内存问题,因为数据集很大.最好的方法是什么? 我可以做以下的工作,但这不会给每个用户80%的. val percentData = data.map(x => ((math.random * 100).toInt,x._1. x._2,x._3) val train = percentData.filter(x => x._1 < 80).values.repartition(10).cache() 解决方法
霍顿的答案有一种可能性,而另一种可能性是:
您可以使用PairRDDFunctions类中的sampleByKeyExact转换.
这就是我会怎么做的: 考虑到以下列表: val list = List((2147481832,1),(2147481832,(214748183,91187908,1)) 我将创建一个RDD对,将所有用户映射为密钥: val data = sc.parallelize(list.toSeq).map(x => (x._1,(x._2,x._3))) 然后我将为每个键设置分数,如下所示,因为您已经注意到sampleByKeyExact为每个键获取分数的映射: val fractions = data.map(_._1).distinct.map(x => (x,0.8)).collectAsMap 我在这里做的是实际上映射关键字以找到不同的,然后将每个关键字关联到一个等于0.8的分数,然后我把整个作为一个地图. 现在要抽样,我所要做的就是: import org.apache.spark.rdd.PairRDDFunctions val sampleData = data.sampleByKeyExact(false,fractions,2L) 要么 val sampleData = data.sampleByKeyExact(withReplacement = false,fractions = fractions,seed = 2L) 您可以检查钥匙或数据或数据样本的计数: scala > data.count // [...] // res10: Long = 12 scala > sampleData.count // [...] // res11: Long = 10 编辑:我决定添加一部分来对DataFrames执行分层抽样. 所以我们将从上面的例子中考虑相同的数据(列表). val df = list.toDF("keyColumn","value1","value2") df.show // +----------+----------+------+ // | keyColumn| value1|value2| // +----------+----------+------+ // |2147481832| 23355149| 1| // |2147481832| 973010692| 1| // |2147481832|2134870842| 1| // |2147481832| 541023347| 1| // |2147481832|1682206630| 1| // |2147481832|1138211459| 1| // |2147481832| 852202566| 1| // |2147481832| 201375938| 1| // |2147481832| 486538879| 1| // |2147481832| 919187908| 1| // | 214748183| 919187908| 1| // | 214748183| 91187908| 1| // +----------+----------+------+ 我们将需要底层的RDD,通过将我们的密钥定义为第一列:我们创建这个RDD元素的元组: val data: RDD[(Int,Row)] = df.rdd.keyBy(_.getInt(0)) val fractions: Map[Int,Double] = data.map(_._1) .distinct .map(x => (x,0.8)) .collectAsMap val sampleData: RDD[Row] = data.sampleByKeyExact(withReplacement = false,2L) .values val sampleDataDF: DataFrame = spark.createDataFrame(sampleData,df.schema) // you can use sqlContext.createDataFrame(...) instead for spark 1.6) 现在您可以检查钥匙或数据样本的数量: scala > df.count // [...] // res9: Long = 12 scala > sampleDataDF.count // [...] // res10: Long = 10 编辑2:由于Spark 1.5.0可以使用DataFrameStatFunctions.sampleBy方法: df.stat.sampleBy("keyColumn",seed) (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |