scala – 将StringType列添加到现有Spark DataFrame,然后应用默
发布时间:2020-12-16 18:45:26 所属栏目:安全 来源:网络整理
导读:Scala 2.10在这里使用Spark 1.6.2.我有一个与 this one类似(但不一样)的问题,然而,接受的答案不是 SSCCE,而是假设有一定数量的关于Spark的“前期知识”;因此我无法复制或理解它.更重要的是,该问题也仅限于向现有数据框添加新列,而我需要为数据框中的所有现有
Scala 2.10在这里使用Spark 1.6.2.我有一个与
this one类似(但不一样)的问题,然而,接受的答案不是
SSCCE,而是假设有一定数量的关于Spark的“前期知识”;因此我无法复制或理解它.更重要的是,该问题也仅限于向现有数据框添加新列,而我需要为数据框中的所有现有行添加列和值.
所以我想在现有的Spark DataFrame中添加一个列,然后将该新列的初始(‘default’)值应用于所有行. val json : String = """{ "x": true,"y": "not true" }""" val rdd = sparkContext.parallelize(Seq(json)) val jsonDF = sqlContext.read.json(rdd) jsonDF.show() 当我运行时,我得到以下作为输出(通过.show()): +----+--------+ | x| y| +----+--------+ |true|not true| +----+--------+ 现在我想在jsonDF创建之后添加一个新字段,而不修改json字符串,这样得到的DF看起来像这样: +----+--------+----+ | x| y| z| +----+--------+----+ |true|not true| red| +----+--------+----+ 意思是,我想在DF类型中添加一个新的“z”列,类型为StringType,然后默认所有行包含z值“red”. 从另一个问题我将以下伪代码拼凑在一起: val json : String = """{ "x": true,"y": "not true" }""" val rdd = sparkContext.parallelize(Seq(json)) val jsonDF = sqlContext.read.json(rdd) //jsonDF.show() val newDF = jsonDF.withColumn("z",jsonDF("col") + 1) newDF.show() 但是当我运行它时,我在.withColumn(…)方法上遇到编译器错误: org.apache.spark.sql.AnalysisException: Cannot resolve column name "col" among (x,y); at org.apache.spark.sql.DataFrame$$anonfun$resolve$1.apply(DataFrame.scala:152) at org.apache.spark.sql.DataFrame$$anonfun$resolve$1.apply(DataFrame.scala:152) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.sql.DataFrame.resolve(DataFrame.scala:151) at org.apache.spark.sql.DataFrame.col(DataFrame.scala:664) at org.apache.spark.sql.DataFrame.apply(DataFrame.scala:652) 我也没有看到任何允许我将“red”设置为默认值的API方法.关于我哪里出错的想法? 解决方法
你可以使用点亮功能.首先你必须导入它
import org.apache.spark.sql.functions.lit 并使用它如下所示 jsonDF.withColumn("z",lit("red")) 将自动推断列的类型. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |