scala – Spark数据帧中的序列
我在Spark中有数据框.看起来像这样:
+-------+----------+-------+ | value| group| ts| +-------+----------+-------+ | A| X| 1| | B| X| 2| | B| X| 3| | D| X| 4| | E| X| 5| | A| Y| 1| | C| Y| 2| +-------+----------+-------+ Endgoal:我想找到有多少序列A-B-E(一个序列只是后续行的列表).增加的约束条件是序列的后续部分最多可以相隔n行.让我们考虑这个例子,n是2. 考虑组X. 我曾考虑使用collect_list(),创建一个字符串(如DNA)并使用带有正则表达式的子字符串搜索.但我想知道是否有更优雅的分布式方式,也许使用窗口函数? 编辑: 请注意,提供的数据框只是一个示例.真实的数据帧(以及组)可以是任意长的. 解决方法
编辑回答@Tim的评论修复模式“AABE”
是的,使用窗口函数有帮助,但我创建了一个id来订购: val df = List( (1,"A","X",1),(2,"B",2),(3,3),(4,"D",4),(5,"E",5),(6,"Y",(7,"C",2) ).toDF("id","value","group","ts") import org.apache.spark.sql.expressions.Window val w = Window.partitionBy('group).orderBy('id) 然后lag将收集所需的内容,但是需要一个函数来生成Column表达式(注意拆分以消除“AABE”的重复计数.警告:这会拒绝“ABAEXX”类型的模式): def createSeq(m:Int) = split( concat( (1 to 2*m) .map(i => coalesce(lag('value,-i).over(w),lit(""))) :_*),"A")(0) val m=2 val tmp = df .withColumn("seq",createSeq(m)) +---+-----+-----+---+----+ | id|value|group| ts| seq| +---+-----+-----+---+----+ | 6| A| Y| 1| C| | 7| C| Y| 2| | | 1| A| X| 1|BBDE| | 2| B| X| 2| BDE| | 3| B| X| 3| DE| | 4| D| X| 4| E| | 5| E| X| 5| | +---+-----+-----+---+----+ 由于Column API中可用的集合函数很少,因此使用UDF可以更容易地避免使用正则表达式 def patternInSeq(m: Int) = udf((str: String) => { var notFound = str .split("B") .filter(_.contains("E")) .filter(_.indexOf("E") <= m) .isEmpty !notFound }) val res = tmp .filter(('value === "A") && (locate("B",'seq) > 0)) .filter(locate("B",'seq) <= m && (locate("E",'seq) > 1)) .filter(patternInSeq(m)('seq)) .groupBy('group) .count res.show +-----+-----+ |group|count| +-----+-----+ | X| 1| +-----+-----+ 泛化(超出范围) 如果你想推广更长的字母序列,那么问题必须推广.这可能是微不足道的,但在这种情况下,应拒绝类型(“ABAE”)的模式(见注释).因此,最简单的推广方法是在下面的实现中使用成对规则(我添加了一个组“Z”来说明这个算法的行为) val df = List( (1,( 8,"Z",( 9,(10,(11,(12,5) ).toDF("id","ts") 首先,我们定义一对的逻辑 import org.apache.spark.sql.DataFrame def createSeq(m:Int) = array((0 to 2*m).map(i => coalesce(lag('value,lit(""))):_*) def filterPairUdf(m: Int,t: (String,String)) = udf((ar: Array[String]) => { val (a,b) = t val foundAt = ar .dropWhile(_ != a) .takeWhile(_ != a) .indexOf(b) foundAt != -1 && foundAt <= m }) 然后我们定义一个应用这个逻辑的函数迭代地应用于数据帧 def filterSeq(seq: List[String],m: Int)(df: DataFrame): DataFrame = { var a = seq(0) seq.tail.foldLeft(df){(df: DataFrame,b: String) => { val res = df.filter(filterPairUdf(m,(a,b))('seq)) a = b res }} } 由于我们首先对从第一个字符开始的序列进行过滤,因此获得了简化和优化 val m = 2 val tmp = df .filter('value === "A") // reduce problem .withColumn("seq",createSeq(m)) scala> tmp.show() +---+-----+-----+---+---------------+ | id|value|group| ts| seq| +---+-----+-----+---+---------------+ | 6| A| Y| 1| [A,C,]| | 8| A| Z| 1|[A,B,D,E]| | 1| A| X| 1|[A,E]| +---+-----+-----+---+---------------+ val res = tmp.transform(filterSeq(List("A","E"),m)) scala> res.show() +---+-----+-----+---+---------------+ | id|value|group| ts| seq| +---+-----+-----+---+---------------+ | 1| A| X| 1|[A,E]| +---+-----+-----+---+---------------+ (变换是DataFrame => DataFrame转换的简单糖涂层) res .groupBy('group) .count .show +-----+-----+ |group|count| +-----+-----+ | X| 1| +-----+-----+ 正如我所说,在扫描序列时有不同的方法来概括“重置规则”,但是这个例子有望帮助实现更复杂的序列. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
- jboss中调用webservice时报java.lang.RuntimeException: Ca
- angularjs – 在TypeScript中指令,编译(前/后)的等价物是什
- angularjs – 使用ng-repeat和ng-include创建递归表
- scala – 了解Play的请求生命周期!应用
- scala – 调度功能
- linux redis 安装配置, 以及redis php扩展
- scala – 让SBT在Mac OS X上运行的问题
- 斯卡拉 – varargs拼图?
- angularjs之ui-bootstrap的Datepicker Popup不使用JS实现双
- 使用netcat的方向shell