加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 如何检查点DataFrames?

发布时间:2020-12-16 09:20:22 所属栏目:安全 来源:网络整理
导读:我正在寻找一种检查点DataFrames的方法.检查点目前是RDD上的一个操作,但是我找不到如何使用DataFrames.持久化和缓存(它们是彼此的同义词)可用于DataFrame,但它们不会“破坏谱系”,因此不适合可以循环数百次(或数千次)迭代的方法. 作为一个例子,假设我有一个
我正在寻找一种检查点DataFrames的方法.检查点目前是RDD上的一个操作,但是我找不到如何使用DataFrames.持久化和缓存(它们是彼此的同义词)可用于DataFrame,但它们不会“破坏谱系”,因此不适合可以循环数百次(或数千次)迭代的方法.

作为一个例子,假设我有一个功能列表,它的签名是DataFrame =>数据帧.即使我的功能有数百或数千条目,我想要有一种方法来计算以下内容:

def foo(dataset: DataFrame,g: DataFrame => Unit) =
    myfunctions.foldLeft(dataset) {
        case (df,f) =>
            val nextDF = f(df)
            g(nextDF)
            nextDF
   }

解决方法

TL; DR:对于Spark版本高达1.6,要实际获得“检查点DF”,我建议的解决方案是基于另一个答案,但是有一个额外的行:

df.rdd.checkpoint
df.rdd.count
val df2 = sqlContext.createDataFrame(df.rdd,df.schema)
// df2 is checkpointed

说明

进一步研究后更新.

如所指出的那样,直接检查DataFrame是不可能的(Spark 1.6.1),尽管在Spark的Jira上有一个issue.

所以,一个可能的解决方法是另一个答案建议:

df.rdd.checkpoint // Assuming the checkpoint dir has already been set
df.count // An action to compute the checkpoint

但是,通过这种方法,只有df.rdd对象将被检查点.这可以通过调用toDebugString到df.rdd来验证:

scala> df.rdd.toDebugString
 (32) MapPartitionsRDD[1] at rdd at <console>:38 []
  |   ReliableCheckpointRDD[2] at count at <console>:38 []

然后在快速转换为df后调用toDebugString(请注意,我从JDBC源创建了我的DataFrame),返回以下内容:

scala> df.withColumn("new_column",lit(0)).rdd.toDebugString
res4: String =
(32) MapPartitionsRDD[5] at rdd at <console>:38 []
 |   MapPartitionsRDD[4] at rdd at <console>:38 []
 |   JDBCRDD[3] at rdd at <console>:38 []

df.explain还显示一个提示:

scala> df.explain
== Physical Plan ==
Scan JDBCRelation (...)

所以,要真正实现一个“检查点”的DataFrame,我只能想到从检查点的RDD创建一个新的:

val newDF = sqlContext.createDataFrame(df.rdd,df.schema)
// or
val newDF = df.rdd.map { 
  case Row(val1: Int,...,valN: Int) => (val1,valN)
}.toDF("col1","colN")

然后我们可以验证新的DataFrame是“checkpointed”:

1)newDF.explain:

scala> newDF.explain
== Physical Plan ==
Scan PhysicalRDD[col1#5,col2#6,col3#7]

2)newDF.rdd.toDebugString:

scala> newDF.rdd.toDebugString
res7: String =
(32) MapPartitionsRDD[10] at rdd at <console>:40 []
 |   MapPartitionsRDD[8] at createDataFrame at <console>:37 []
 |   MapPartitionsRDD[1] at rdd at <console>:38 []
 |   ReliableCheckpointRDD[2] at count at <console>:38 []

3)随着转型:

scala> newDF.withColumn("new_column",lit(0)).rdd.toDebugString
res9: String =
(32) MapPartitionsRDD[12] at rdd at <console>:40 []
 |   MapPartitionsRDD[11] at rdd at <console>:40 []
 |   MapPartitionsRDD[8] at createDataFrame at <console>:37 []
 |   MapPartitionsRDD[1] at rdd at <console>:38 []
 |   ReliableCheckpointRDD[2] at count at <console>:38 []

此外,我尝试了一些更复杂的变换,我实际上可以检查newDF对象是否被检查点.

因此,我发现可靠地检查DataFrame的唯一方法是通过检查其关联的RDD并从中创建一个新的DataFrame对象.

我希望它有帮助.干杯.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读