加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 在Spark Dataframe中将一列转换为多列

发布时间:2020-12-16 18:34:39 所属栏目:安全 来源:网络整理
导读:这个结构我有一个很大的数据帧(1.2GB或多或少): +---------+--------------+------------------------------------------------------------------------------------------------------+| country | date_data | text |+---------+--------------+--------
这个结构我有一个很大的数据帧(1.2GB或多或少):

+---------+--------------+------------------------------------------------------------------------------------------------------+
| country |  date_data   |                                                 text                                                 |
+---------+--------------+------------------------------------------------------------------------------------------------------+
| "EEUU"  | "2016-10-03" | "T_D: QQWEnT_NAME: name_1nT_IN: ind_1nT_C: c1ws12nT_ADD: Sec_1_Pn ...........nT_R: 45ee"       |
| "EEUU"  | "2016-10-03" | "T_D: QQAAnT_NAME: name_2nT_IN: ind_2nT_C: c1ws12nT_ADD: Sec_1_Pn ...........nT_R: 46ee"       |
| .       | .            | .                                                                                                    |
| .       | .            | .                                                                                                    |
| "EEUU"  | "2016-10-03" | "T_D: QQWEnT_NAME: name_300000nT_IN: ind_65nT_C: c1ws12nT_ADD: Sec_1_Pn ...........nT_R: 47aa" |
+---------+--------------+------------------------------------------------------------------------------------------------------+

行数为300.000,“text”字段为大约5000个字符的字符串.

我想在这个新领域中分离字段“text”:

+---------+------------+------+-------------+--------+--------+---------+--------+------+
| country | date_data  | t_d  |   t_name    |  t_in  |  t_c   |  t_add  | ...... | t_r  |
+---------+------------+------+-------------+--------+--------+---------+--------+------+
| EEUU    | 2016-10-03 | QQWE | name_1      | ind_1  | c1ws12 | Sec_1_P | ...... | 45ee |
| EEUU    | 2016-10-03 | QQAA | name_2      | ind_2  | c1ws12 | Sec_1_P | ...... | 45ee |
| .       | .          | .    | .           | .      | .      | .       | .      |      |
| .       | .          | .    | .           | .      | .      | .       | .      |      |
| .       | .          | .    | .           | .      | .      | .       | .      |      |
| EEUU    | 2016-10-03 | QQWE | name_300000 | ind_65 | c1ws12 | Sec_1_P | ...... | 47aa |
+---------+------------+------+-------------+--------+--------+---------+--------+------+

目前,我使用正则表达式来解决这个问题.首先,我编写常规表达式并创建一个函数来从文本中提取单个字段(总共90个正则表达式):

val D_text = "((?<=T_D: ).*?(?=\n))".r
val NAME_text = "((?<=nT_NAME: ).*?(?=\n))".r
val IN_text = "((?<=T_IN: ).*?(?=\n))".r
val C_text = "((?<=T_C: ).*?(?=\n))".r
val ADD_text = "((?<=T_ADD: ).*?(?=\n))".r
        .
        .
        .
        .
val R_text = "((?<=T_R: ).*?(?=\n))".r   

//UDF function:
 def getFirst(pattern2: scala.util.matching.Regex) = udf(
          (url: String) => pattern2.findFirstIn(url) match { 
              case Some(texst_new) => texst_new
              case None => "NULL"
              case null => "NULL"
          }
   )

然后,我创建一个新的Dataframe(tbl_separate_fields)作为将函数与正则表达式一起应用以从文本中提取每个新字段的结果.

val tbl_separate_fields = hiveDF.select(
          hiveDF("country"),hiveDF("date_data"),getFirst(D_text)(hiveDF("texst")).alias("t_d"),getFirst(NAME_text)(hiveDF("texst")).alias("t_name"),getFirst(IN_text)(hiveDF("texst")).alias("t_in"),getFirst(C_text)(hiveDF("texst")).alias("t_c"),getFirst(ADD_text)(hiveDF("texst")).alias("t_add"),.
                            .
                            .
                            .

        getFirst(R_text)(hiveDF("texst")).alias("t_r") 

        )

最后,我将此数据框插入到Hive表中:

tbl_separate_fields.registerTempTable("tbl_separate_fields") 
hiveContext.sql("INSERT INTO TABLE TABLE_INSERT PARTITION (date_data)  SELECT * FROM tbl_separate_fields")

此解决方案对整个数据帧持续1小时,因此我希望优化并缩短执行时间.有什么解决方案吗?

我们正在使用Hadoop 2.7.1和Apache-Spark 1.5.1. Spark的配置是:

val conf = new SparkConf().set("spark.storage.memoryFraction","0.1")
val sc = new SparkContext(conf)
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)

提前致谢.

编辑数据:

+---------+--------------+------------------------------------------------------------------------------------------------------+
| country |  date_data   |                                                 text                                                 |
+---------+--------------+------------------------------------------------------------------------------------------------------+
| "EEUU"  | "2016-10-03" | "T_D: QQWEnT_NAME: name_1nT_IN: ind_1nT_C: c1ws12nT_ADD: Sec_1_Pn ...........nT_R: 45ee"       |
| "EEUU"  | "2016-10-03" | "T_NAME: name_2nT_D: QQAAnT_IN: ind_2nT_C: c1ws12 ...........nT_R: 46ee"                         |
| .       | .            | .                                                                                                    |
| .       | .            | .                                                                                                    |
| "EEUU"  | "2016-10-03" | "T_NAME: name_300000nT_ADD: Sec_1_PnT_IN: ind_65nT_C: c1ws12n ...........nT_R: 47aa"            |
+---------+--------------+------------------------------------------------------------------------------------------------------+

解决方法

在这种情况下使用正则表达式是缓慢且脆弱的.

如果您知道所有记录具有相同的结构,即所有“文本”值具有相同的“部分”的数量和顺序,则以下代码将起作用(对于任意数量的列),主要利用分割函数org.apache.spark.sql.functions:

import org.apache.spark.sql.functions._

// first - split "text" column values into Arrays
val textAsArray: DataFrame = inputDF
  .withColumn("as_array",split(col("text"),"n"))
  .drop("text")
  .cache()

// get a sample (first row) to get column names,can be skipped if you want to hard-code them:
val sampleText = textAsArray.first().getAs[mutable.WrappedArray[String]]("as_array").toArray
val columnNames: Array[(String,Int)] = sampleText.map(_.split(": ")(0)).zipWithIndex

// add Column per columnName with the right value and drop the no-longer-needed as_array column
val withValueColumns: DataFrame = columnNames.foldLeft(textAsArray) {
  case (df,(colName,index)) => df.withColumn(colName,split(col("as_array").getItem(index),": ").getItem(1))
}.drop("as_array")

withValueColumns.show()
// for the sample data I created,// with just 4 "parts" in "text" column,this prints:
// +-------+----------+----+------+-----+------+
// |country| date_data| T_D|T_NAME| T_IN|   T_C|
// +-------+----------+----+------+-----+------+
// |   EEUU|2016-10-03|QQWE|name_1|ind_1|c1ws12|
// |   EEUU|2016-10-03|QQAA|name_2|ind_2|c1ws12|
// +-------+----------+----+------+-----+------+

或者,如果上述假设不成立,则可以使用将文本列转换为Map的UDF,然后对所需列的硬编码列表执行类似的reduceLeft操作:

import sqlContext.implicits._

// sample data: not the same order,not all records have all columns:
val inputDF: DataFrame = sc.parallelize(Seq(
  ("EEUU","2016-10-03","T_D: QQWEnT_NAME: name_1nT_IN: ind_1nT_C: c1ws12"),("EEUU","T_D: QQAAnT_IN: ind_2nT_NAME: name_2")
)).toDF("country","date_data","text")

// hard-coded list of expected column names:
val columnNames: Seq[String] = Seq("T_D","T_NAME","T_IN","T_C")

// UDF to convert text into key-value map
val asMap = udf[Map[String,String],String] { s =>
  s.split("n").map(_.split(": ")).map { case Array(k,v) => k -> v }.toMap
}


val textAsMap = inputDF.withColumn("textAsMap",asMap(col("text"))).drop("text")

// for each column name - lookup the value in the map
val withValueColumns: DataFrame = columnNames.foldLeft(textAsMap) {
  case (df,colName) => df.withColumn(colName,col("textAsMap").getItem(colName))
}.drop("textAsMap")

withValueColumns.show()
// prints:
// +-------+----------+----+------+-----+------+
// |country| date_data| T_D|T_NAME| T_IN|   T_C|
// +-------+----------+----+------+-----+------+
// |   EEUU|2016-10-03|QQWE|name_1|ind_1|c1ws12|
// |   EEUU|2016-10-03|QQAA|name_2|ind_2|  null|
// +-------+----------+----+------+-----+------+

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读