加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 如何将List [Double]转换为列?

发布时间:2020-12-16 18:18:56 所属栏目:安全 来源:网络整理
导读:我有List [Double],如何将其转换为org.apache.spark.sql.Column.我试图使用.withColumn()将其作为列插入现有的DataFrame. 解决方法 它不能直接完成. Column不是数据结构,而是特定SQL表达式的表示.它不受特定数据的约束.您必须先转换数据.解决此问题的一种方
我有List [Double],如何将其转换为org.apache.spark.sql.Column.我试图使用.withColumn()将其作为列插入现有的DataFrame.

解决方法

它不能直接完成. Column不是数据结构,而是特定SQL表达式的表示.它不受特定数据的约束.您必须先转换数据.解决此问题的一种方法是按索引进行并行化和连接:

import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField,DoubleType}

val df = Seq(("a",2),("b",1),("c",0)).toDF("x","y")
val aList = List(1.0,-1.0,0.0)

val rows = df.rdd.zipWithIndex.map(_.swap)
  .join(sc.parallelize(aList).zipWithIndex.map(_.swap))
  .values
  .map { case (row: Row,x: Double) => Row.fromSeq(row.toSeq :+ x) }

sqlContext.createDataFrame(rows,df.schema.add("z",DoubleType,false))

另一种类似的方法是索引和使用UDF来处理其余的:

import scala.util.Try

val indexedDf = sqlContext.createDataFrame(
  df.rdd.zipWithIndex.map {
    case (row: Row,i: Long) => Row.fromSeq(row.toSeq :+ i)
  },df.schema.add("idx_","long")
)

def addValue(vs: Vector[Double]) = udf((i: Long) => Try(vs(i.toInt)).toOption)

indexedDf.withColumn("z",addValue(aList.toVector)($"idx_"))

不幸的是,两种解决方案都会遇到问题.首先通过驱动程序传递本地数据会在程序中引入严重的瓶颈.通常,数据应直接从执行程序访问.如果要迭代地执行此操作,另一个问题是增加RDD谱系.

虽然第二个问题可以通过检查点解决,但第一个问题使得这个想法一般无用.我强烈建议您先构建完整的结构,然后在Spark上读取它,或者以可以利用Spark架构的方式重建管道.例如,如果数据来自外部源,则使用map / mapPartitions直接读取每个数据块.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读