Штормовой Носик не получает подтверждение

Я начал использовать storm, поэтому создаю простую топологию с помощью этого руководства

Когда я запускаю свою топологию с LocalCluster и все выглядит нормально, моя проблема заключается в том, что я не получаю ACK на кортеж, что означает, что мой носик ack никогда не вызывается.

мой код ниже - вы знаете, почему ack не вызывается?

поэтому моя топология выглядит так

public StormTopology build() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout(HelloWorldSpout.class.getSimpleName(), 
             helloWorldSpout, spoutParallelism);

        HelloWorldBolt bolt = new  HelloWorldBolt();

        builder.setBolt(HelloWorldBolt.class.getSimpleName(), 
                   bolt, boltParallelism)
              .shuffleGrouping(HelloWorldSpout.class.getSimpleName());
}

Мой носик выглядит так

public class HelloWorldSpout  extends BaseRichSpout implements ISpout {
    private SpoutOutputCollector collector;

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("int"));
    }

    public void open(Map conf, TopologyContext context,
            SpoutOutputCollector collector) {
        this.collector = collector;
    }

    private static Boolean flag = false;
    public void nextTuple() {
        Utils.sleep(5000);

            //emit only 1 tuple - for testing
        if (!flag){
            this.collector.emit(new Values(6));
            flag = true;
        }
    }

    @Override
    public void ack(Object msgId) {
        System.out.println("[HelloWorldSpout] ack on msgId" + msgId);
    }

    public void fail(Object msgId){
        System.out.println("[HelloWorldSpout] fail on msgId" + msgId);
    }
}

а мой болт выглядит вот так

@SuppressWarnings("serial")
public class HelloWorldBolt extends BaseRichBolt{
    private OutputCollector collector;

    public void prepare(Map conf, TopologyContext context, 
                    OutputCollector collector) {
        this.collector = collector;
        logger.info("preparing HelloWorldBolt");
    }

    public void execute(Tuple tuple) {
        System.out.println("[HelloWorldBolt] got" + tuple.getInteger(0));
        this.collector.ack(tuple);
    }

    public void cleanup() {
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // TODO Auto-generated method stub

    }
}

person Mzf    schedule 21.01.2014    source источник
comment
+1 Мне очень нравится ваш шаблон HelloWorldSpout.class.getSimpleName() в группировке в случайном порядке. Я не понимаю, почему так много API-интерфейсов Java зависят от магических строк и магических чисел (в отличие от перечислений), но ваш шаблон - хороший способ не обжечься.   -  person Steven Magana-Zook    schedule 03.07.2014


Ответы (1)


Ваш метод emit() в носике имеет только один аргумент, поэтому кортеж не привязан. Вот почему вы не получаете обратный вызов метода ack() в носике, даже если вы подтверждаете кортеж в болте.

Чтобы заставить это работать, вам нужно изменить носик, чтобы он выдавал второй аргумент, который является идентификатором сообщения. Именно этот идентификатор передается обратно методу ack() в носике:

public void nextTuple() {
    Utils.sleep(5000);

        //emit only 1 tuple - for testing
    if (!flag){
        Object msgId = "ID 6";  // this can be any object
        this.collector.emit(new Values(6), msgId);
        flag = true;
    }
}


@Override
public void ack(Object msgId) {
    //  msgId should be "ID 6"
    System.out.println("[HelloWorldSpout] ack on msgId" + msgId);
}
person Chris Gerken    schedule 21.01.2014