java – 在一个简单的聚合风暴拓扑中进行分组
发布时间:2020-12-14 05:24:39 所属栏目:Java 来源:网络整理
导读:我正在尝试编写一个执行以下操作的拓扑: 订阅一个twitter feed(基于关键字) 一个聚合螺栓,用于聚合收集中的一些tweets(例如N),并将它们发送给打印机螺栓 一个简单的螺栓,将集合一次打印到控制台. 在现实中,我想对收藏进行一些更多的处理. 我在本地测试,看起
我正在尝试编写一个执行以下操作的拓扑:
>订阅一个twitter feed(基于关键字) 在现实中,我想对收藏进行一些更多的处理. 我在本地测试,看起来像是在工作.但是,我不知道我是否正确设置了螺栓上的分组,并且在部署在实际的风暴集群上时可以正常工作.如果有人可以帮助您查看此拓扑并提出任何错误,更改或改进,我将不胜感激. 谢谢. 这是我的拓扑结构. builder.setSpout("spout",new TwitterFilterSpout("pittsburgh")); builder.setBolt("sampleaggregate",new SampleAggregatorBolt()) .shuffleGrouping("spout"); builder.setBolt("printaggreator",new PrinterBolt()).shuffleGrouping("sampleaggregate"); 聚合螺栓 public class SampleAggregatorBolt implements IRichBolt { protected OutputCollector collector; protected Tuple currentTuple; protected Logger log; /** * Holds the messages in the bolt till you are ready to send them out */ protected List<Status> statusCache; @Override public void prepare(Map stormConf,TopologyContext context,OutputCollector collector) { this.collector = collector; log = Logger.getLogger(getClass().getName()); statusCache = new ArrayList<Status>(); } @Override public void execute(Tuple tuple) { currentTuple = tuple; Status currentStatus = null; try { currentStatus = (Status) tuple.getValue(0); } catch (ClassCastException e) { } if (currentStatus != null) { //add it to the status cache statusCache.add(currentStatus); collector.ack(tuple); //check the size of the status cache and pass it to the next stage if you have enough messages to emit if (statusCache.size() > 10) { collector.emit(new Values(statusCache)); } } } @Override public void cleanup() { } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("tweets")); } @Override public Map<String,Object> getComponentConfiguration() { return null; //To change body of implemented methods use File | Settings | File Templates. } protected void setupNonSerializableAttributes() { } } 打印机螺栓 public class PrinterBolt extends BaseBasicBolt { @Override public void execute(Tuple tuple,BasicOutputCollector collector) { System.out.println(tuple.size() + " " + tuple); } @Override public void declareOutputFields(OutputFieldsDeclarer ofd) { } } 解决方法
从我可以看到它看起来不错.魔鬼的细节虽然如此.我不知道你的聚合器螺栓是什么,但是如果对传递给它的值做出任何假设,那么你应该考虑一个适当的字段分组.当您使用默认的并行性提示1时,这可能不会有太大的差异,但是如果您决定使用多个聚合螺栓实例进行缩放隐式逻辑假设,则可能需要非随机分组.
(编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |