加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 编程开发 > Java > 正文

java – Storm Spout没有得到Ack

发布时间:2020-12-14 16:35:54 所属栏目:Java 来源:网络整理
导读:我已经开始使用风暴,所以我使用 this tutorial创建简单的拓扑 当我使用LocalCluster运行我的拓扑结构,似乎都很好, 我的问题是我没有在元组上得到ACK,这意味着我的spout ack不会被调用. 我的代码在下面 – 你知道为什么不叫Ack吗? 所以我的拓扑看起来像这样
我已经开始使用风暴,所以我使用 this tutorial创建简单的拓扑

当我使用LocalCluster运行我的拓扑结构,似乎都很好,
我的问题是我没有在元组上得到ACK,这意味着我的spout ack不会被调用.

我的代码在下面 – 你知道为什么不叫Ack吗?

所以我的拓扑看起来像这样

public StormTopology build() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout(HelloWorldSpout.class.getSimpleName(),helloWorldSpout,spoutParallelism);

        HelloWorldBolt bolt = new  HelloWorldBolt();

        builder.setBolt(HelloWorldBolt.class.getSimpleName(),bolt,boltParallelism)
              .shuffleGrouping(HelloWorldSpout.class.getSimpleName());
}

我的水嘴看起来像这样

public class HelloWorldSpout  extends BaseRichSpout implements ISpout {
    private SpoutOutputCollector collector;

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("int"));
    }

    public void open(Map conf,TopologyContext context,SpoutOutputCollector collector) {
        this.collector = collector;
    }

    private static Boolean flag = false;
    public void nextTuple() {
        Utils.sleep(5000);

            //emit only 1 tuple - for testing
        if (!flag){
            this.collector.emit(new Values(6));
            flag = true;
        }
    }

    @Override
    public void ack(Object msgId) {
        System.out.println("[HelloWorldSpout] ack on msgId" + msgId);
    }

    public void fail(Object msgId){
        System.out.println("[HelloWorldSpout] fail on msgId" + msgId);
    }
}

我的螺栓看起来像这样

@SuppressWarnings("serial")
public class HelloWorldBolt extends BaseRichBolt{
    private OutputCollector collector;

    public void prepare(Map conf,OutputCollector collector) {
        this.collector = collector;
        logger.info("preparing HelloWorldBolt");
    }

    public void execute(Tuple tuple) {
        System.out.println("[HelloWorldBolt] got" + tuple.getInteger(0));
        this.collector.ack(tuple);
    }

    public void cleanup() {
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // TODO Auto-generated method stub

    }
}

解决方法

您的喷嘴中的emit()方法只有一个参数,因此元组不被锚定.这就是为什么你没有得到一个回调到ack()方法在喷口,即使你是在螺栓中的元组.

要使其正常工作,您需要修改您的喷口以发出第二个参数,即消息ID.正是这个id被传回给spout中的ack()方法:

public void nextTuple() {
    Utils.sleep(5000);

        //emit only 1 tuple - for testing
    if (!flag){
        Object msgId = "ID 6";  // this can be any object
        this.collector.emit(new Values(6),msgId);
        flag = true;
    }
}


@Override
public void ack(Object msgId) {
    //  msgId should be "ID 6"
    System.out.println("[HelloWorldSpout] ack on msgId" + msgId);
}

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读