прекратить обработку кортежа в определенном болте

Например, у меня есть топология, состоящая из 1 носика и 4 болтов.

spout A -> bolt B -> bolt C -> bolt E
                  -> bolt D

Только если какой-то условный оператор в болте B истинен, он передает кортеж болтам C и болтам D.

И только если какое-то условное выражение в болте C истинно, то он передает кортеж в болт E.

Таким образом, одиночный кортеж может достигать только болта B или (болта C и D).

Я использую BaseBasicBolt, который, насколько мне известно, автоматически срабатывает после вызова collect.emit.

Например, метод выполнения в болте B показан ниже.

public class boltB extends BaseBasicBolt {
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        ...some logic goes here
        if (response.getCount() > 0) {
            collector.emit(new Values(tuple.getString(0)));
        }
    }
}

Поэтому, если не вызывается collect.emit, я думаю, что кортеж из носика не выполнен, потому что я вижу из пользовательского интерфейса storm, что почти все кортежи из носика не выполнены.

В этом случае, где я должен вызвать 'ack', чтобы носик не считал его неудачным кортежем?


person Adrian Seungjin Lee    schedule 17.06.2014    source источник


Ответы (2)


То, что вы делаете, правильно для логики, которую вы реализуете. Вам не нужно явно вызывать ack(). При использовании BaseBasicBolt каждый кортеж подтверждается после метода execute() с помощью BasicBoltExecutor. Для неудачных кортежей вы должны проверить исключения. Также попробуйте посмотреть в пользовательском интерфейсе Storm на наличие аномалий в кортеже, выпущенном/выполненном/неудавшемся для каждого носика и болта.

person Ben Tse    schedule 17.06.2014

Когда у вас есть BaseBasicBolt - бэкинг делается за вас, даже если вы ничего не излучаете.

Экземпляр BaseBasicBolt выполняется в BasicBoltExecutor, чей метод execute() показан ниже:

public void execute(Tuple input) {
     _collector.setContext(input);
     try {
         _bolt.execute(input, _collector);
         _collector.getOutputter().ack(input);
     } catch(FailedException e) {
         if(e instanceof ReportedFailedException) {
             _collector.reportError(e);
         }
         _collector.getOutputter().fail(input);
     }
}

Поэтому, чтобы остановить обработку кортежа, просто не испускайте его, после вызова execute он будет подтвержден. Поскольку больше нет болтов для запуска, вызов ack в носике будет вызван

Надеюсь, это ответ на ваши вопросы

person Mzf    schedule 23.06.2014