java – 级联 – 合并2个聚合
发布时间:2020-12-15 08:42:47 所属栏目:Java 来源:网络整理
导读:我有以下问题whicj我试图用级联解决:我有结构的记录的csv文件:o,a,f,i,c 我需要通过o,f来汇总记录,并将每组的i和c相加. 例如: 100,200,300,5,1 100,6,2 101,201,301,20,5 101,21,6 应该产量: 100,11,3 101,41,11 我无法理解如何合并我拥有的每个实例(我
我有以下问题whicj我试图用级联解决:我有结构的记录的csv文件:o,a,f,i,c
我需要通过o,f来汇总记录,并将每组的i和c相加. 例如: 100,200,300,5,1 100,6,2 101,201,301,20,5 101,21,6 应该产量: 100,11,3 101,41,11 我无法理解如何合并我拥有的每个实例(我可以在同一时间聚合两个字段吗?). 你有什么主意吗? Yosi public class CascMain { public static void main(String[] args){ Scheme sourceScheme = new TextLine(new Fields("line")); Tap source = new Lfs(sourceScheme,"/tmp/casc/group.csv"); Scheme sinkScheme = new TextDelimited(new Fields("o","a","f","ti","tc"),","); Tap sink = new Lfs(sinkScheme,"/tmp/casc/output/",SinkMode.REPLACE); Pipe assembly = new Pipe("agg-pipe"); Function function = new RegexSplitter(new Fields("o","i","c"),"); assembly = new Each(assembly,new Fields("line"),function); Pipe groupAssembly = new GroupBy("group",assembly,new Fields("o","f")); Sum impSum = new Sum(new Fields("ti")); Pipe i = new Every(groupAssembly,new Fields("i"),impSum); Sum clickSum = new Sum(new Fields("tc")); Pipe c = new Every(groupAssembly,new Fields("c"),clickSum); // WHAT SHOULD I DO HERE Properties properties = new Properties(); FlowConnector.setApplicationJarClass(properties,CascMain.class); FlowConnector flowConnector = new FlowConnector(properties); Flow flow = flowConnector.connect("agg",source,sink,assembly); flow.complete(); } } 解决方法
使用AggregateBy同时聚合多个字段:
SumBy impSum = new SumBy(new Fields("i"),new Fields("ti"),long.class); SumBy clickSum = new SumBy(new Fields("c"),new Fields("tc"),long.class); assembly = new AggregateBy("totals",Pipe.pipes(assembly),"f"),2,impSum,clickSum); (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |