加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – Spark上的动态集代数

发布时间:2020-12-16 18:34:34 所属栏目:安全 来源:网络整理
导读:考虑以下问题.鉴于: 一组集合 动态接收的布尔表达式 返回结果集. Spark有没有任何有效的算法或库来解决这个一般问题? 这是一个玩具示例,用于概念性地说明问题: val X = Set("A1","A2","A3","A4")val Y = Set("A2","A4","A5")val collection = Set(X,Y)val
考虑以下问题.鉴于:

>一组集合
>动态接收的布尔表达式

返回结果集.

Spark有没有任何有效的算法或库来解决这个一般问题?

这是一个玩具示例,用于概念性地说明问题:

val X  = Set("A1","A2","A3","A4")
val Y  = Set("A2","A4","A5")

val collection = Set(X,Y)
val expression = "X and Y"

我正在寻找一种实现一般solve_expression的方法,以便在上面的例子中:

output = solve_expression(expression,collection)

结果是:

Set("A2","A5")

我正在处理具有数百万项的集合,以及作为字符串的布尔表达式.重要的是表达式中的每个原子(例如上面的“X”和“Y”)都是集合.表达式和集合是动态的(操作不能硬编码,因为我们将它们作为输入接收,我们事先不知道它们是什么).

我对问题的表现很灵活.实际集合可以是Set类型,例如保持字符串(例如“A1”,“A2”),编码为二进制向量,或任何其他使其适合Spark的东西.

Spark有没有任何库来解析和解决集合上的一般布尔表达式?

解决方法

好的.让我们假设您想在Spark中执行此操作.此外,由于这些是巨型集合,我们假设它们不在内存中,它们每个都在一个文件中 – 文件中的每一行表示集合中的条目.

我们将使用RDD表示集合 – Spark的标准存储数据方式.

使用此解析器(从here改编并修复)

import scala.util.parsing.combinator.JavaTokenParsers
import org.apache.spark.rdd.RDD

case class Query[T](setMap: Map[String,RDD[T]]) extends JavaTokenParsers {
  private lazy val expr: Parser[RDD[T]]
    = term ~ rep("union" ~ term) ^^ { case f1 ~ fs => (f1 /: fs)(_ union _._2) }
  private lazy val term: Parser[RDD[T]]
    = fact ~ rep("inter" ~ fact) ^^ { case f1 ~ fs => (f1 /: fs)(_ intersection _._2) }
  private lazy val fact: Parser[RDD[T]]
    = vari | ("(" ~ expr ~ ")" ^^ { case "(" ~ exp ~ ")" => exp })
  private lazy val vari: Parser[RDD[T]]
    = setMap.keysIterator.map(Parser(_)).reduceLeft(_ | _) ^^ setMap

  def apply(expression: String) = this.parseAll(expr,expression).get.distinct
}

将上述内容粘贴到shell中后,观察以下spark-shell交互(为简洁起见,我省略了一些回复):

> val x = sc.textFile("X.txt").cache  contains "1n2n3n4n5"
> val y = sc.textFile("Y.txt").cache  contains "3n4n5n6n7"
> val z = sc.textFile("Z.txt").cache  contains "3n9n10"
> val sets = Map("x" -> x,"y" -> y,"z" -> z)
> val query = Query[Int](sets)

现在,我可以用不同的表达式调用查询.请注意,我在这里使用collect来触发评估(所以我们看到集合内部的内容),但是如果集合非常大,通常只是将RDD保持原样(并将其保存到磁盘).

> query("a union b").collect
res: Array[Int] = Array("1","2","3","4","5","6","7")
> query("a inter b").collect
res: Array[Int] = Array("3","5")
> query("a inter b union ((a inter b) union a)").collect
res: Array[Int] = Array("1","5")
> query("c union a inter b").collect
res: Array[Int] = Array("3","9","10")
> query("(c union a) inter b").collect
res: Array[Int] = Array("3","5")

虽然我没有费心去实现它,但是设置差异应该是一行添加(非常类似于union和inter).我认为设置补充是一个坏主意……它们并不总是有意义(空集的补充是什么,你如何表示它?).

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读