加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 一个RDD中的部分/完全匹配值到另一个RDD中的值

发布时间:2020-12-16 18:13:28 所属栏目:安全 来源:网络整理
导读:我有两个RDD,其中第一个RDD具有表单的记录 RDD1 = (1,2017-2-13,"ABX-3354 gsfette" 2,2017-3-18,"TYET-3423 asdsad" 3,2017-2-09,"TYET-3423 rewriu" 4,"ABX-3354 42324" 5,2017-4-01,"TYET-3423 aerr") 第二个RDD有表格记录 RDD2 = ('mfr1',"ABX-3354") ('
我有两个RDD,其中第一个RDD具有表单的记录

RDD1 = (1,2017-2-13,"ABX-3354 gsfette"
        2,2017-3-18,"TYET-3423 asdsad"
        3,2017-2-09,"TYET-3423 rewriu"
        4,"ABX-3354 42324"
        5,2017-4-01,"TYET-3423 aerr")

第二个RDD有表格记录

RDD2 = ('mfr1',"ABX-3354")
       ('mfr2',"TYET-3423")

我需要找到RDD1中的所有记录,RDD2中的每个值都匹配RDD2的第3列和RDD2的第2列,并且得到计数

对于此示例,最终结果将是:

ABX-3354  2
TYET-3423 3

做这个的最好方式是什么?

解决方法

我正在使用Spark SQL发布几个解决方案,并且更专注于在给定文本中准确匹配搜索字符串的模式.

1:使用CrossJoin

import spark.implicits._

val df1 = Seq(
  (1,"2017-2-13","ABX-3354 gsfette"),(2,"2017-3-18","TYET-3423 asdsad"),(3,"2017-2-09","TYET-3423 rewriu"),(4,"ABX-335442324"),//changed from "ABX-3354 42324"
  (5,"2017-4-01","aerrTYET-3423") //changed from "TYET-3423 aerr"
).toDF("id","dt","txt")

val df2 = Seq(
  ("mfr1","ABX-3354"),("mfr2","TYET-3423")
).toDF("col1","key")

//match function for filter
def matcher(row: Row): Boolean = row.getAs[String]("txt")
  .contains(row.getAs[String]("key"))

val join = df1.crossJoin(df2)

import org.apache.spark.sql.functions.count

val result = join.filter(matcher _)
  .groupBy("key")
  .agg(count("txt").as("count"))

2:使用Broadcast变量

import spark.implicits._

val df1 = Seq(
  (1,"ABX-3354 42324"),(5,"aerrTYET-3423"),(6,"aerrYET-3423")
).toDF("id","pattern")

//small dataset to broadcast
val df2 = Seq(
  ("mfr1","TYET-3423")
).map(_._2) // considering only 2 values in pair

//Lookup to use in UDF
val lookup = spark.sparkContext.broadcast(df2)

//Udf
import org.apache.spark.sql.functions._
val matcher = udf((txt: String) => {
  val matches: Seq[String] = lookup.value.filter(txt.contains(_))
  if (matches.size > 0) matches.head else null
})

val result = df1.withColumn("match",matcher($"pattern"))
  .filter($"match".isNotNull) // not interested in non matching records
  .groupBy("match")
  .agg(count("pattern").as("count"))

两种解决方案都会产生相同

result.show()

+---------+-----+
|      key|count|
+---------+-----+
|TYET-3423|    3|
| ABX-3354|    2|
+---------+-----+

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读