scala – 如何在Spark中转置RDD
发布时间:2020-12-16 09:15:05 所属栏目:安全 来源:网络整理
导读:我有一个这样的RDD: 1 2 34 5 67 8 9 它是一个矩阵.现在我想像这样转置RDD: 1 4 72 5 83 6 9 我该怎么做? 解决方法 说你有一个N×M矩阵. 如果N和M都很小,可以在内存中保存N×M个项目,那么使用RDD并不太有意义.但是转置它很容易: val rdd = sc.paralleliz
我有一个这样的RDD:
1 2 3 4 5 6 7 8 9 它是一个矩阵.现在我想像这样转置RDD: 1 4 7 2 5 8 3 6 9 我该怎么做? 解决方法
说你有一个N×M矩阵.
如果N和M都很小,可以在内存中保存N×M个项目,那么使用RDD并不太有意义.但是转置它很容易: val rdd = sc.parallelize(Seq(Seq(1,2,3),Seq(4,5,6),Seq(7,8,9))) val transposed = sc.parallelize(rdd.collect.toSeq.transpose) 如果N或M大到不能在内存中保留N或M个条目,那么您不能拥有此大小的RDD行.在这种情况下,原始或转置的矩阵都不可能表示. N和M可能是中等大小:您可以在内存中保留N或M个条目,但不能保留N×M个条目.在这种情况下,您必须炸毁矩阵并重新放在一起: val rdd = sc.parallelize(Seq(Seq(1,9))) // Split the matrix into one number per line. val byColumnAndRow = rdd.zipWithIndex.flatMap { case (row,rowIndex) => row.zipWithIndex.map { case (number,columnIndex) => columnIndex -> (rowIndex,number) } } // Build up the transposed matrix. Group and sort by column index first. val byColumn = byColumnAndRow.groupByKey.sortByKey().values // Then sort by row index. val transposed = byColumn.map { indexedRow => indexedRow.toSeq.sortBy(_._1).map(_._2) } (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |