04 Trident聚合
发布时间:2020-12-13 22:29:17 所属栏目:百科 来源:网络整理
导读:?聚合是流计算的常见场景。Trident提供了两种聚合方式 1:Aggregate 2: partitionAggregate ?三种聚合器来实现聚合任务。如sum 1:CombinerAggregator 2:ReducerAggregator 3: Aggregator 这3个聚合器一般是,配合groupBy()来做分区聚合 比如:做如下聚合
?聚合是流计算的常见场景。Trident提供了两种聚合方式
1:Aggregate 2:partitionAggregate
?三种聚合器来实现聚合任务。如sum
1:CombinerAggregator 2:ReducerAggregator 3: Aggregator 这3个聚合器一般是,配合groupBy()来做分区聚合 比如:做如下聚合,每个手机号的费用总合 SELECT tel,SUM(money) FROM u GROUP BY tel
-----------------配套视频----------------------------------------
http://pan.baidu.com/s/1kT5kecn
------------------配套代码----------------------------------------------------------------------------------------------------------------------
package storm.test.trident; import java.util.Map; import com.sun.corba.se.spi.orb.StringPair; import storm.trident.Stream; import storm.trident.TridentTopology; import storm.trident.operation.Aggregator; import storm.trident.operation.BaseFunction; import storm.trident.operation.Filter; import storm.trident.operation.TridentCollector; import storm.trident.operation.TridentOperationContext; import storm.trident.spout.IBatchSpout; import storm.trident.testing.FixedBatchSpout; import storm.trident.tuple.TridentTuple; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.generated.StormTopology; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; public class _04Aggregate { private static IBatchSpout getBatch() { // SELECT tel,SUM(money) FROM u GROUP BY tel FixedBatchSpout batchSpout = new FixedBatchSpout(new Fields("tels"),3, new Values("189111 4"),new Values("135111 9"),new Values( "189111 8"),new Values("158111 3"),new Values( "158111 3")); return batchSpout; } public static void main(String[] args) { IBatchSpout batchSpout = getBatch(); TridentTopology tt = new TridentTopology(); Stream stream = tt.newStream("_01Stream",batchSpout); // SELECT tel,SUM(money) FROM u GROUP BY tel //final String dburl = "jdbc:mysql://localhost:3306/tel?user=root&password=root"; stream.each(new Fields("tels"),new KeyValueFunction(), new Fields("tel","money")).groupBy(new Fields("tel"))// .partitionAggregate(new Fields("money","tel"),new SumAgg(), new Fields("sum")); // 转化成第一个编程模型 StormTopology stormTopology = tt.build(); Config config = new Config(); LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("_02Filter",config,stormTopology); } } class SumState { int state = 0; } class SumAgg implements Aggregator<SumState> { // "189111 4") 1 // ("135111 9") 2 // "189111 8"),3 String tel = ""; @Override public void aggregate(SumState val,TridentTuple tuple, TridentCollector collector) { tel = tuple.getStringByField("tel"); String money = tuple.getStringByField("money"); Integer m = Integer.parseInt(money); val.state += m; } @Override public void complete(SumState val,TridentCollector collector) { System.err.println(tel + ",聚合结果:" + val.state); collector.emit(new Values(val.state)); } @Override public SumState init(Object batchId,TridentCollector collector) { // TODO Auto-generated method stub return new SumState(); } @Override public void cleanup() { // TODO Auto-generated method stub } @Override public void prepare(Map conf,TridentOperationContext context) { // TODO Auto-generated method stub } }
-----------------配套视频----------------------------------------
http://pan.baidu.com/s/1kT5kecn#path=%252Fstorm%25E5%2588%2586%25E4%25BA%25AB%25E8%25A7%2586%25E9%25A2%2591
(编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |