scala – 每次转换/操作后,Spark Dataframe随机UUID都会发生变化
我有一个Spark数据帧,其中包含一个包含生成的UUID的列.
但是,每次我对数据帧执行操作或转换时,它都会更改每个阶段的UUID. 如何仅生成一次UUID并使UUID保持静态. 一些示例代码重新生成我的问题如下: def process(spark: SparkSession): Unit = { import spark.implicits._ val sc = spark.sparkContext val sqlContext = spark.sqlContext sc.setLogLevel("OFF") // create dataframe val df = spark.createDataset(Array(("a","1"),("b","2"),("c","3"))).toDF("col1","col2") df.createOrReplaceTempView("df") df.show(false) // register an UDF that creates a random UUID val generateUUID = udf(() => UUID.randomUUID().toString) // generate UUID for new column val dfWithUuid = df.withColumn("new_uuid",generateUUID()) dfWithUuid.show(false) dfWithUuid.show(false) // uuid is different // new transformations also change the uuid val dfWithUuidWithNewCol = dfWithUuid.withColumn("col3",df.col("col2")+1) dfWithUuidWithNewCol.show(false) } 输出是: +----+----+ |col1|col2| +----+----+ |a |1 | |b |2 | |c |3 | +----+----+ +----+----+------------------------------------+ |col1|col2|new_uuid | +----+----+------------------------------------+ |a |1 |a414e73b-24b8-4f64-8d21-f0bc56d3d290| |b |2 |f37935e5-0bfc-4863-b6dc-897662307e0a| |c |3 |e3aaf655-5a48-45fb-8ab5-22f78cdeaf26| +----+----+------------------------------------+ +----+----+------------------------------------+ |col1|col2|new_uuid | +----+----+------------------------------------+ |a |1 |1c6597bf-f257-4e5f-be81-34a0efa0f6be| |b |2 |6efe4453-29a8-4b7f-9fa1-7982d2670bd6| |c |3 |2f7ddc1c-3e8c-4118-8e2c-8a6f526bee7e| +----+----+------------------------------------+ +----+----+------------------------------------+----+ |col1|col2|new_uuid |col3| +----+----+------------------------------------+----+ |a |1 |00b85af8-711e-4b59-82e1-8d8e59d4c512|2.0 | |b |2 |94c3f2c6-9234-4fb3-b1c4-273a37171131|3.0 | |c |3 |1059fff2-b8f9-4cec-907d-ea181d5003a2|4.0 | +----+----+------------------------------------+----+ 请注意,每个步骤的UUID都不同. 解决方法
这是一种预期的行为.用户定义的函数
have to be deterministic:
如果要包含非确定性函数并保留输出,则应将中间数据写入持久存储并读回.检查点或缓存可能在某些简单的情况下有效,但一般来说它不可靠. 如果上游进程是确定性的(对于初学者有shuffle)你可以尝试使用 另见:About how to add a new column to an existing DataFrame with random values in Scala 注意:SPARK-20586引入了确定性标志,它可以禁用某些优化,但不清楚数据持久化和丢失执行程序时的行为方式. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |