加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 编程开发 > Java > 正文

java – 级联 – 合并2个聚合

发布时间:2020-12-15 08:42:47 所属栏目:Java 来源:网络整理
导读:我有以下问题whicj我试图用级联解决:我有结构的记录的csv文件:o,a,f,i,c 我需要通过o,f来汇总记录,并将每组的i和c相加. 例如: 100,200,300,5,1 100,6,2 101,201,301,20,5 101,21,6 应该产量: 100,11,3 101,41,11 我无法理解如何合并我拥有的每个实例(我
我有以下问题whicj我试图用级联解决:我有结构的记录的csv文件:o,a,f,i,c

我需要通过o,f来汇总记录,并将每组的i和c相加.

例如:

100,200,300,5,1

100,6,2

101,201,301,20,5

101,21,6

应该产量:

100,11,3

101,41,11

我无法理解如何合并我拥有的每个实例(我可以在同一时间聚合两个字段吗?).

你有什么主意吗?

Yosi

public class CascMain {

public static void main(String[] args){

    Scheme sourceScheme = new TextLine(new Fields("line"));
    Tap source = new Lfs(sourceScheme,"/tmp/casc/group.csv");

    Scheme sinkScheme = new TextDelimited(new Fields("o","a","f","ti","tc"),",");
    Tap sink = new Lfs(sinkScheme,"/tmp/casc/output/",SinkMode.REPLACE);

    Pipe assembly = new Pipe("agg-pipe");

    Function function = new RegexSplitter(new Fields("o","i","c"),");
    assembly = new Each(assembly,new Fields("line"),function);

    Pipe groupAssembly = new GroupBy("group",assembly,new Fields("o","f"));

    Sum impSum = new Sum(new Fields("ti"));
    Pipe i = new Every(groupAssembly,new Fields("i"),impSum);

    Sum clickSum = new Sum(new Fields("tc"));
    Pipe c = new Every(groupAssembly,new Fields("c"),clickSum);

    // WHAT SHOULD I DO HERE

    Properties properties = new Properties();
    FlowConnector.setApplicationJarClass(properties,CascMain.class);

    FlowConnector flowConnector = new FlowConnector(properties);
    Flow flow = flowConnector.connect("agg",source,sink,assembly);
    flow.complete();

}

}

解决方法

使用AggregateBy同时聚合多个字段:

SumBy impSum = new SumBy(new Fields("i"),new Fields("ti"),long.class);
SumBy clickSum = new SumBy(new Fields("c"),new Fields("tc"),long.class);
assembly = new AggregateBy("totals",Pipe.pipes(assembly),"f"),2,impSum,clickSum);

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读