Apache storm Trident — динамическое создание топологий

Есть ли способ динамически создавать топологии в Trident? Может ли кто-нибудь привести примеры?


person Amber Kulkarni    schedule 20.06.2016    source источник
comment
Вы можете сохранить конфигурацию топологии в каком-либо файле свойств (JSON), и при развертывании топологии вы сможете прочитать ее из этого файла. но как только вы развернули его, вы не можете изменить его динамически   -  person Mahesh Madushanka    schedule 24.06.2016


Ответы (1)


Прежде всего, вы также можете знать, что создание топологий не является частью Trident. Trident — это просто API для микропакетов.

И создание новых топологий является динамическим по определению. Это то, что делает класс TopologyBuilder.

Итак, чтобы ответить на ваш вопрос, да, можно создавать новые топологии из трезубца или из простых трубок и болтов Storm. Единственное, что вам нужно, это чтобы ваша логика создания топологии имела доступ к кластеру Storm (классы и другие ресурсы), что опять же по определению удовлетворяет, если вы запускаете свою логику в Storm.

Последнее, что вам нужно, это найти способ отправить только что созданную топологию, и именно для этого был создан класс StormSubmitter, который снова (!сюрприз :) ) по определениям удовлетворен тем, что находится на вашем пути к классам, когда вы запускаете свой логика внутри трезубца или обычного носика/болта.

Из любопытства, почему вы планируете это сделать? Каковы ваши требования?

Пример:

import java.util.Map;

import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.spout.IBatchSpout;
import org.apache.storm.tuple.Fields;

public class DynamicTopologySpout implements IBatchSpout {

    private static final long serialVersionUID = -3269435263455830842L;

    @Override
    @SuppressWarnings("rawtypes")
    public void open(Map conf, TopologyContext context) {}

    @Override
    public void emitBatch(long batchId, TridentCollector collector) {
        if (newTopologyNeeded()) {
            TopologyBuilder builder = new TopologyBuilder();
            builder
            .setSpout("spout", new BaseRichSpout() {
                private static final long serialVersionUID = 1L;
                @Override public void declareOutputFields(OutputFieldsDeclarer declarer) {}
                @Override @SuppressWarnings("rawtypes") public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {}
                @Override public void nextTuple() {}
            }, 1)
            .setMaxSpoutPending(15)
            .setNumTasks(1);
            StormTopology topology = builder.createTopology();
            Config config = new Config();
            try {
                StormSubmitter.submitTopology("dynamic-topology", config, topology);
            } catch (Exception e) {
                e.printStackTrace();
                collector.reportError(e);
            }
        }
    }

    private boolean newTopologyNeeded() {
        // Check if topology needed ...
        return false;
    }

    @Override
    public void ack(long batchId) {}

    @Override
    public void close() {}

    @Override
    public Map<String, Object> getComponentConfiguration() { return null; }

    @Override
    public Fields getOutputFields() { return null; }

}
person Alma Alma    schedule 28.06.2016