加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 如何在给定多个条件的情况下对Spark数据帧执行“查找”

发布时间:2020-12-16 19:13:01 所属栏目:安全 来源:网络整理
导读:我是Spark的新手(我的版本是1.6.0),现在我正在尝试解决下面给出的问题: 假设有两个源文件: 第一个(简称A)是一个大的,包含名为A1,B1,C1和其他80列的列.里面有230K的记录. 第二个(简称B)是一个小型查找表,其中包含名为A2,B2,C2和D2的列.里面有250条记录. 现
我是Spark的新手(我的版本是1.6.0),现在我正在尝试解决下面给出的问题:

假设有两个源文件:

>第一个(简称A)是一个大的,包含名为A1,B1,C1和其他80列的列.里面有230K的记录.
>第二个(简称B)是一个小型查找表,其中包含名为A2,B2,C2和D2的列.里面有250条记录.

现在我们需要在A中插入一个新列,给出以下逻辑:

>首先在B中查找A1,B1和C1(对应列为A2,B2和C2),如果成功,则返回D2作为新添加列的值.如果没有找到……
>然后在B中查找A1,B1.如果成功,则返回D2.如果没有找到……
>设置默认值“NA”

我已经读过文件并将它们转换成数据框.对于第一种情况,我得到的结果是左外连接在一起.但是我在下一步找不到好的方法.

我目前的尝试是通过使用不太严格的条件连接A和B来构建新的数据框架.但是我不知道如何从另一个更新当前数据帧.或者还有其他更直观,更有效的方法来解决整个问题吗?

感谢所有的答案.

—————————–更新于20160309 —————— ————–

终于接受了@mlk的回答.仍然非常感谢@ zero323对于他/她对UDF和加入的好评,Tungsten代码生成确实是我们现在面临的另一个问题.但是,由于我们需要为每次查找执行大量查找和平均4个条件,因此前一种解决方案更适合…

最终的解决方案看起来像下面的片段:

```
import sqlContext.implicits._
import com.github.marklister.collections.io._

case class TableType(A: String,B: String,C: String,D: String)
val tableBroadcast = sparkContext.broadcast(CsvParser(TableType).parseFile("..."))
val lkupD = udf {
  (aStr: String,bStr: String,cStr: String) =>
    tableBroadcast.value.find {
      case TableType(a,b,c,_) =>
        (a == aStr && b == bStr && c == cStr) ||
        (a == aStr && b == bStr)
    }.getOrElse(TableType("","","NA")).D
}
df = df.withColumn("NEW_COL",lkupD($"A",$"B",$"C"))
```

解决方法

由于B很小,我认为最好的方法是广播变量和用户定义的功能.

// However you get the data...
case class BType( A2: Int,B2: Int,C2 : Int,D2 : String)
val B = Seq(BType(1,1,"B111"),BType(1,2,"B112"),BType(2,"B200"))

val A = sc.parallelize(Seq((1,"DATA"),(1,(2,"NONE"),(3,"NONE"))).toDF("A1","B1","C1","OTHER")


// Broadcast B so all nodes have a copy of it.
val Bbradcast = sc.broadcast(B)

// A user defined function to find the value for D2. This I'm sure could be improved by whacking it into maps. But this is a small example. 
val findD = udf {( a: Int,b : Int,c: Int) => Bbradcast.value.find(x => x.A2 == a && x.B2 == b && x.C2 == c).getOrElse(Bbradcast.value.find(x => x.A2 == a && x.B2 == b).getOrElse(BType(0,"NA"))).D2 }

// Use the UDF in a select
A.select($"A1",$"B1",$"C1",$"OTHER",findD($"A1",$"C1").as("D")).show

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读