java – Storm Spout没有得到Ack
发布时间:2020-12-14 16:35:54 所属栏目:Java 来源:网络整理
导读:我已经开始使用风暴,所以我使用 this tutorial创建简单的拓扑 当我使用LocalCluster运行我的拓扑结构,似乎都很好, 我的问题是我没有在元组上得到ACK,这意味着我的spout ack不会被调用. 我的代码在下面 – 你知道为什么不叫Ack吗? 所以我的拓扑看起来像这样
我已经开始使用风暴,所以我使用
this tutorial创建简单的拓扑
当我使用LocalCluster运行我的拓扑结构,似乎都很好, 我的代码在下面 – 你知道为什么不叫Ack吗? 所以我的拓扑看起来像这样 public StormTopology build() { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(HelloWorldSpout.class.getSimpleName(),helloWorldSpout,spoutParallelism); HelloWorldBolt bolt = new HelloWorldBolt(); builder.setBolt(HelloWorldBolt.class.getSimpleName(),bolt,boltParallelism) .shuffleGrouping(HelloWorldSpout.class.getSimpleName()); } 我的水嘴看起来像这样 public class HelloWorldSpout extends BaseRichSpout implements ISpout { private SpoutOutputCollector collector; public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("int")); } public void open(Map conf,TopologyContext context,SpoutOutputCollector collector) { this.collector = collector; } private static Boolean flag = false; public void nextTuple() { Utils.sleep(5000); //emit only 1 tuple - for testing if (!flag){ this.collector.emit(new Values(6)); flag = true; } } @Override public void ack(Object msgId) { System.out.println("[HelloWorldSpout] ack on msgId" + msgId); } public void fail(Object msgId){ System.out.println("[HelloWorldSpout] fail on msgId" + msgId); } } 我的螺栓看起来像这样 @SuppressWarnings("serial") public class HelloWorldBolt extends BaseRichBolt{ private OutputCollector collector; public void prepare(Map conf,OutputCollector collector) { this.collector = collector; logger.info("preparing HelloWorldBolt"); } public void execute(Tuple tuple) { System.out.println("[HelloWorldBolt] got" + tuple.getInteger(0)); this.collector.ack(tuple); } public void cleanup() { } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub } } 解决方法
您的喷嘴中的emit()方法只有一个参数,因此元组不被锚定.这就是为什么你没有得到一个回调到ack()方法在喷口,即使你是在螺栓中的元组.
要使其正常工作,您需要修改您的喷口以发出第二个参数,即消息ID.正是这个id被传回给spout中的ack()方法: public void nextTuple() { Utils.sleep(5000); //emit only 1 tuple - for testing if (!flag){ Object msgId = "ID 6"; // this can be any object this.collector.emit(new Values(6),msgId); flag = true; } } @Override public void ack(Object msgId) { // msgId should be "ID 6" System.out.println("[HelloWorldSpout] ack on msgId" + msgId); } (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
相关内容
- 错误“java.lang.NumberFormatException:对于输入字符串:
- setOpaque(真/假); Java的
- 在java和javascript之间传递JSON的有效方法
- 阶段1 语言基础+高级_1-3-Java语言高级_04-集合_07 Collect
- Java实现文件上传的两种方法(uploadify和Spring)
- java – Hibernate on Oracle:将String属性映射到CLOB列
- java – 如何检查类是否存在
- java – 使用GZIP,JSON响应和JQuery
- 可以在XPages中的Java内部执行@Unique()吗?
- Java中集合关系图及常见操作详解