加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 每次转换/操作后,Spark Dataframe随机UUID都会发生变化

发布时间:2020-12-16 09:24:33 所属栏目:安全 来源:网络整理
导读:我有一个Spark数据帧,其中包含一个包含生成的UUID的列. 但是,每次我对数据帧执行操作或转换时,它都会更改每个阶段的UUID. 如何仅生成一次UUID并使UUID保持静态. 一些示例代码重新生成我的问题如下: def process(spark: SparkSession): Unit = { import spar
我有一个Spark数据帧,其中包含一个包含生成的UUID的列.
但是,每次我对数据帧执行操作或转换时,它都会更改每个阶段的UUID.

如何仅生成一次UUID并使UUID保持静态.

一些示例代码重新生成我的问题如下:

def process(spark: SparkSession): Unit = {

  import spark.implicits._

  val sc = spark.sparkContext
  val sqlContext = spark.sqlContext
  sc.setLogLevel("OFF")

  // create dataframe
  val df = spark.createDataset(Array(("a","1"),("b","2"),("c","3"))).toDF("col1","col2")
  df.createOrReplaceTempView("df")
  df.show(false)

  // register an UDF that creates a random UUID
  val generateUUID = udf(() => UUID.randomUUID().toString)

  // generate UUID for new column
  val dfWithUuid = df.withColumn("new_uuid",generateUUID())
  dfWithUuid.show(false)
  dfWithUuid.show(false)    // uuid is different

  // new transformations also change the uuid
  val dfWithUuidWithNewCol = dfWithUuid.withColumn("col3",df.col("col2")+1)
  dfWithUuidWithNewCol.show(false)
}

输出是:

+----+----+
|col1|col2|
+----+----+
|a   |1   |
|b   |2   |
|c   |3   |
+----+----+

+----+----+------------------------------------+
|col1|col2|new_uuid                            |
+----+----+------------------------------------+
|a   |1   |a414e73b-24b8-4f64-8d21-f0bc56d3d290|
|b   |2   |f37935e5-0bfc-4863-b6dc-897662307e0a|
|c   |3   |e3aaf655-5a48-45fb-8ab5-22f78cdeaf26|
+----+----+------------------------------------+

+----+----+------------------------------------+
|col1|col2|new_uuid                            |
+----+----+------------------------------------+
|a   |1   |1c6597bf-f257-4e5f-be81-34a0efa0f6be|
|b   |2   |6efe4453-29a8-4b7f-9fa1-7982d2670bd6|
|c   |3   |2f7ddc1c-3e8c-4118-8e2c-8a6f526bee7e|
+----+----+------------------------------------+

+----+----+------------------------------------+----+
|col1|col2|new_uuid                            |col3|
+----+----+------------------------------------+----+
|a   |1   |00b85af8-711e-4b59-82e1-8d8e59d4c512|2.0 |
|b   |2   |94c3f2c6-9234-4fb3-b1c4-273a37171131|3.0 |
|c   |3   |1059fff2-b8f9-4cec-907d-ea181d5003a2|4.0 |
+----+----+------------------------------------+----+

请注意,每个步骤的UUID都不同.

解决方法

这是一种预期的行为.用户定义的函数 have to be deterministic:

The user-defined functions must be deterministic. Due to optimization,
duplicate invocations may be eliminated or the function may even be
invoked more times than it is present in the query.

如果要包含非确定性函数并保留输出,则应将中间数据写入持久存储并读回.检查点或缓存可能在某些简单的情况下有效,但一般来说它不可靠.

如果上游进程是确定性的(对于初学者有shuffle)你可以尝试使用rand function with seed,转换为字节数组并传递给UUID.nameUUIDFromBytes.

另见:About how to add a new column to an existing DataFrame with random values in Scala

注意:SPARK-20586引入了确定性标志,它可以禁用某些优化,但不清楚数据持久化和丢失执行程序时的行为方式.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读