scala – 如何检查点DataFrames?
我正在寻找一种检查点DataFrames的方法.检查点目前是RDD上的一个操作,但是我找不到如何使用DataFrames.持久化和缓存(它们是彼此的同义词)可用于DataFrame,但它们不会“破坏谱系”,因此不适合可以循环数百次(或数千次)迭代的方法.
作为一个例子,假设我有一个功能列表,它的签名是DataFrame =>数据帧.即使我的功能有数百或数千条目,我想要有一种方法来计算以下内容: def foo(dataset: DataFrame,g: DataFrame => Unit) = myfunctions.foldLeft(dataset) { case (df,f) => val nextDF = f(df) g(nextDF) nextDF } 解决方法
TL; DR:对于Spark版本高达1.6,要实际获得“检查点DF”,我建议的解决方案是基于另一个答案,但是有一个额外的行:
df.rdd.checkpoint df.rdd.count val df2 = sqlContext.createDataFrame(df.rdd,df.schema) // df2 is checkpointed 说明 进一步研究后更新. 如所指出的那样,直接检查DataFrame是不可能的(Spark 1.6.1),尽管在Spark的Jira上有一个issue. 所以,一个可能的解决方法是另一个答案建议: df.rdd.checkpoint // Assuming the checkpoint dir has already been set df.count // An action to compute the checkpoint 但是,通过这种方法,只有df.rdd对象将被检查点.这可以通过调用toDebugString到df.rdd来验证: scala> df.rdd.toDebugString (32) MapPartitionsRDD[1] at rdd at <console>:38 [] | ReliableCheckpointRDD[2] at count at <console>:38 [] 然后在快速转换为df后调用toDebugString(请注意,我从JDBC源创建了我的DataFrame),返回以下内容: scala> df.withColumn("new_column",lit(0)).rdd.toDebugString res4: String = (32) MapPartitionsRDD[5] at rdd at <console>:38 [] | MapPartitionsRDD[4] at rdd at <console>:38 [] | JDBCRDD[3] at rdd at <console>:38 [] df.explain还显示一个提示: scala> df.explain == Physical Plan == Scan JDBCRelation (...) 所以,要真正实现一个“检查点”的DataFrame,我只能想到从检查点的RDD创建一个新的: val newDF = sqlContext.createDataFrame(df.rdd,df.schema) // or val newDF = df.rdd.map { case Row(val1: Int,...,valN: Int) => (val1,valN) }.toDF("col1","colN") 然后我们可以验证新的DataFrame是“checkpointed”: 1)newDF.explain: scala> newDF.explain == Physical Plan == Scan PhysicalRDD[col1#5,col2#6,col3#7] 2)newDF.rdd.toDebugString: scala> newDF.rdd.toDebugString res7: String = (32) MapPartitionsRDD[10] at rdd at <console>:40 [] | MapPartitionsRDD[8] at createDataFrame at <console>:37 [] | MapPartitionsRDD[1] at rdd at <console>:38 [] | ReliableCheckpointRDD[2] at count at <console>:38 [] 3)随着转型: scala> newDF.withColumn("new_column",lit(0)).rdd.toDebugString res9: String = (32) MapPartitionsRDD[12] at rdd at <console>:40 [] | MapPartitionsRDD[11] at rdd at <console>:40 [] | MapPartitionsRDD[8] at createDataFrame at <console>:37 [] | MapPartitionsRDD[1] at rdd at <console>:38 [] | ReliableCheckpointRDD[2] at count at <console>:38 [] 此外,我尝试了一些更复杂的变换,我实际上可以检查newDF对象是否被检查点. 因此,我发现可靠地检查DataFrame的唯一方法是通过检查其关联的RDD并从中创建一个新的DataFrame对象. 我希望它有帮助.干杯. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |