scala – Spark上的动态集代数
考虑以下问题.鉴于:
>一组集合 返回结果集. Spark有没有任何有效的算法或库来解决这个一般问题? 这是一个玩具示例,用于概念性地说明问题: val X = Set("A1","A2","A3","A4") val Y = Set("A2","A4","A5") val collection = Set(X,Y) val expression = "X and Y" 我正在寻找一种实现一般solve_expression的方法,以便在上面的例子中: output = solve_expression(expression,collection) 结果是: Set("A2","A5") 我正在处理具有数百万项的集合,以及作为字符串的布尔表达式.重要的是表达式中的每个原子(例如上面的“X”和“Y”)都是集合.表达式和集合是动态的(操作不能硬编码,因为我们将它们作为输入接收,我们事先不知道它们是什么). 我对问题的表现很灵活.实际集合可以是Set类型,例如保持字符串(例如“A1”,“A2”),编码为二进制向量,或任何其他使其适合Spark的东西. Spark有没有任何库来解析和解决集合上的一般布尔表达式? 解决方法
好的.让我们假设您想在Spark中执行此操作.此外,由于这些是巨型集合,我们假设它们不在内存中,它们每个都在一个文件中 – 文件中的每一行表示集合中的条目.
我们将使用RDD表示集合 – Spark的标准存储数据方式. 使用此解析器(从here改编并修复) import scala.util.parsing.combinator.JavaTokenParsers import org.apache.spark.rdd.RDD case class Query[T](setMap: Map[String,RDD[T]]) extends JavaTokenParsers { private lazy val expr: Parser[RDD[T]] = term ~ rep("union" ~ term) ^^ { case f1 ~ fs => (f1 /: fs)(_ union _._2) } private lazy val term: Parser[RDD[T]] = fact ~ rep("inter" ~ fact) ^^ { case f1 ~ fs => (f1 /: fs)(_ intersection _._2) } private lazy val fact: Parser[RDD[T]] = vari | ("(" ~ expr ~ ")" ^^ { case "(" ~ exp ~ ")" => exp }) private lazy val vari: Parser[RDD[T]] = setMap.keysIterator.map(Parser(_)).reduceLeft(_ | _) ^^ setMap def apply(expression: String) = this.parseAll(expr,expression).get.distinct } 将上述内容粘贴到shell中后,观察以下spark-shell交互(为简洁起见,我省略了一些回复): > val x = sc.textFile("X.txt").cache contains "1n2n3n4n5" > val y = sc.textFile("Y.txt").cache contains "3n4n5n6n7" > val z = sc.textFile("Z.txt").cache contains "3n9n10" > val sets = Map("x" -> x,"y" -> y,"z" -> z) > val query = Query[Int](sets) 现在,我可以用不同的表达式调用查询.请注意,我在这里使用collect来触发评估(所以我们看到集合内部的内容),但是如果集合非常大,通常只是将RDD保持原样(并将其保存到磁盘). > query("a union b").collect res: Array[Int] = Array("1","2","3","4","5","6","7") > query("a inter b").collect res: Array[Int] = Array("3","5") > query("a inter b union ((a inter b) union a)").collect res: Array[Int] = Array("1","5") > query("c union a inter b").collect res: Array[Int] = Array("3","9","10") > query("(c union a) inter b").collect res: Array[Int] = Array("3","5") 虽然我没有费心去实现它,但是设置差异应该是一行添加(非常类似于union和inter).我认为设置补充是一个坏主意……它们并不总是有意义(空集的补充是什么,你如何表示它?). (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |