scala – 按某些列值拆分Spark数据帧,然后独立于其他列旋转每个
发布时间:2020-12-16 08:46:26 所属栏目:安全 来源:网络整理
导读:我正在尝试根据一个(或多个)列的值拆分数据帧,并独立于其余列旋转每个结果数据帧.即,给定输入数据帧: val inputDF = Seq(("tom","20","a","street a","germany"),("jimmy","30","b","street b",("lola","50","c","street c","argentina"),("maria","60","d"
我正在尝试根据一个(或多个)列的值拆分数据帧,并独立于其余列旋转每个结果数据帧.即,给定输入数据帧:
val inputDF = Seq(("tom","20","a","street a","germany"),("jimmy","30","b","street b",("lola","50","c","street c","argentina"),("maria","60","d","street d",("joe","70","e","street e","argentina") .toDF("name","age","company","address","country") //+-----+---+-------+--------+---------+ //| name|age|company| address| country| //+-----+---+-------+--------+---------+ //| tom| 20| a|street a| germany| //|jimmy| 30| b|street b| germany| //| lola| 40| c|street c|argentina| //|maria| 50| d|street d|argentina| //| joe| 60| e|street e|argentina| //+-----+---+-------+--------+---------+ 我需要按“国家/地区”列的不同值拆分记录.对于输入数据帧,拆分应该产生: //+-----+---+-------+--------+---------+ //| name|age|company| address| country| //+-----+---+-------+--------+---------+ //| tom| 20| a|street a| germany| //|jimmy| 30| b|street b| germany| //+-----+---+-------+--------+---------+ //+-----+---+-------+--------+---------+ //| name|age|company| address| country| //+-----+---+-------+--------+---------+ //| lola| 40| c|street c|argentina| //|maria| 50| d|street d|argentina| //| joe| 60| e|street e|argentina| //+-----+---+-------+--------+---------+ 而且我还必须在每个数据框下旋转“名称”和“年龄”列,以便每个人拥有不同的公司和地址,同时保持其余列的完整性.所需的输出数据框如下所示: //+-----+---+-------+--------+---------+ //| name|age|company| address| country| //+-----+---+-------+--------+---------+ //|jimmy| 30| a|street a| germany| //| tom| 20| b|street b| germany| //| joe| 60| c|street c|argentina| //| lola| 40| d|street d|argentina| //|maria| 50| e|street e|argentina| //+-----+---+-------+--------+---------+ 最后的行顺序无关紧要 我的第一次尝试(在Spark-shell上运行) 我尝试为每一行分配一个唯一的id,然后将所需的列(名称和年龄)洗牌,并使用辅助id值将重新排序的数据帧与其余的数据帧连接起来.这里的主要问题是使用collect(),这对于大数据帧可能是危险的,并且重新分区(1)几乎违反分布式计算和Spark(它用于在使用不同数量的分区压缩rdds时避免异常) . import org.apache.spark.sql.{DataFrame,Row} import org.apache.spark.sql.functions.{col,monotonically_increasing_id,rand} import org.apache.spark.sql.types.LongType // column(s) names to split the input dataframe val colToSplit = Seq("country") val splitCols = colToSplit.map(col) // list of columns names to be rotated (together) val colsToRotate = Seq("name","age") val rotateCols = colsToRotate.map(col) :+ col(auxCol) // add an auxiliar column for joining the dataframe in the final step val auxCol = "aux" val dfWithID = inputDF.withColumn(auxCol,monotonically_increasing_id()) val splitValuesSchema = dfWithID.select(splitCols: _*).schema // create one dataframe for each value of the splitting column val splitValuesDFs = dfWithID.select(splitCols: _*).distinct().collect() .map(row => spark.sparkContext.makeRDD(List(row))) .map(rdd => spark.createDataFrame(rdd,splitValuesSchema)) val rotateIDCols = Array(auxCol) ++ colsToRotate // join the split values with their records (DFs with id + colsToRotate) val splittedDFs = splitValuesDFs .map(df => df.join(dfWithID,colToSplit).selectExpr(rotateIDCols: _*)) // random reorder the auxiliar id column (DFs with random ids) val randIdDFs = splittedDFs .map(df => df.select(auxCol).orderBy(rand()).toDF()) // get rdds with random ids val randIdRdds = randIdDFs .map(df => df.select(auxCol).rdd.map(row => row(0))) // finally,zip and append the rdds with the random ids to the dataframes created by // splitting the main df to obtain the rotated dataframe with all the data val tuples = (splittedDFs,randIdRdds).zipped val newRdds = tuples .map((df: DataFrame,rdd) => df.rdd.repartition(1).zip(rdd.repartition(1)) .map(row => Row.fromSeq(row._1.toSeq ++ Seq(row._2)))) val tuples2 = (splittedDFs,newRdds).zipped val rotatedDF = tuples2.map((df: DataFrame,rdd) => spark .createDataFrame(rdd,df.schema.add("rotated_id",LongType)).drop(auxCol)) .reduce(_ union _).withColumnRenamed("rotated_id","column2join") // get the rest of the columns val noRotateCols = dfWithID.columns.diff(colsToRotate).map(col) val noRotatedDF = dfWithID.select(noRotateCols: _*) .withColumnRenamed(auxCol,"column2join") // join both dataframes val outputDF = noRotatedDF.join(rotatedDF,"column2join") .select(inputDF.columns.map(col): _*) // to keep the initial columns order 显示输出数据帧会产生与上述预期输出类似的结果(它主要取决于rand()函数顺序) 我想尽可能避免使用收集和重新分区,以获得更实用的解决方案. 欢迎任何评论或想法! 解决方法
通过尽可能地删除表现不佳的电话(重新分配和一些收集),我一直在努力寻找更好,更清晰,更实用的解决方案.我添加了一个辅助方法来索引数据帧行,以便能够连接不相关的部分(无法通过任何公共列连接的列或dfs).这是我目前的开发,它还删除了rdds和数据帧之间的多个转换,看起来更易读和易懂.
我希望这可以帮助有同样担忧的人. import org.apache.spark.sql.{DataFrame,rand} import org.apache.spark.sql.types.{LongType,StructField,StructType} // auxiliar method to index row in dataframes def addRowIndex(df: DataFrame) = spark.createDataFrame( df.rdd.zipWithIndex.map { case (row,index) => Row.fromSeq(row.toSeq :+ index) },StructType(df.schema.fields :+ StructField("index",LongType,false)) ) // column(s) names to split the input dataframe val colToSplit = Seq("country") val splitCols = colToSplit.map(col) // list of columns names to be rotated (together) val colsToRotate = Seq("name","age") // add an auxiliar column for joining the dataframe in the final step val auxCol = "aux" val dfWithID = inputDF.withColumn(auxCol,monotonically_increasing_id()) val rotateIDCols = (Array(auxCol) ++ colsToRotate).map(col) // get an array of dfs with the different values of the splitter column(s) // --assuming there will not be too much different values in the splitter column-- val filterValues = dfWithID.select(splitCols: _*).distinct().collect() // retrieve the different dfs according to the splitter values val splitDfs = filterValues.map(filterRow => filterRow.getValuesMap(colToSplit) .foldLeft(dfWithID) { (df,filterField) => df.filter(col(filterField._1) === filterField._2) .select(rotateIDCols: _*) }) // get and random reorder the aux id column for each dataframe val randIdDfs = splitDfs.map(_.select(auxCol).orderBy(rand()).toDF()) // remove aux column for each dataframe val splitWithoutIdDfs = splitDfs.map(_.drop(auxCol)) val dfsTuples = splitWithoutIdDfs.zip(randIdDfs) // index row of dfs with columns to rotate and dfs with random ids val indexedDfsTuples = dfsTuples.map { case (colsDf,idsDf) => (addRowIndex(colsDf),addRowIndex(idsDf)) } // join reordered-ids dfs and cols to rotate dataframes by the index val reorderedDfs = indexedDfsTuples.map { case (df1,df2) => df1.join(df2,Seq("index")) .drop("index").withColumnRenamed(auxCol,"column2join") } // union both dataframes to create the rotated df reorderedDfs.tail.foldLeft(reorderedDfs.head) { (acc,df) => acc.union(df) } // get the rest of the columns to get the part of the main df which does not change val noRotateCols = dfWithID.columns.diff(colsToRotate).map(col) val noRotatedDF = dfWithID.select(noRotateCols: _*) .withColumnRenamed(auxCol,"column2join") // join the rotated and no rotated dataframes val outputDF = noRotatedDF.join(rotatedDF,"column2join") .select(inputDF.columns.map(col): _*) // to keep the initial columns order (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
推荐文章
站长推荐
热点阅读