Проблемы с производительностью: Kafka + Storm + Trident + OpaqueTridentKafkaSpout

Мы наблюдаем некоторые проблемы с производительностью с Kafka + Storm + Trident + OpaqueTridentKafkaSpout

Ниже указаны детали нашей установки:

Топология шторма:

Broker broker = Broker.fromString("localhost:9092")
    GlobalPartitionInformation info = new GlobalPartitionInformation()
    if(args[4]){
        int partitionCount = args[4].toInteger()
        for(int i =0;i<partitionCount;i++){
            info.addPartition(i, broker)
        }
    }
    StaticHosts hosts = new StaticHosts(info)
    TridentKafkaConfig tridentKafkaConfig = new TridentKafkaConfig(hosts,"test")
    tridentKafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme())


    OpaqueTridentKafkaSpout kafkaSpout = new OpaqueTridentKafkaSpout(tridentKafkaConfig)
    TridentTopology topology = new TridentTopology()
    Stream st  = topology.newStream("spout1", kafkaSpout).parallelismHint(args[2].toInteger())
            .each(kafkaSpout.getOutputFields(), new NEO4JTridentFunction(), new Fields("status"))
            .parallelismHint(args[1].toInteger())
    Map conf = new HashMap()
    conf.put(Config.TOPOLOGY_WORKERS, args[3].toInteger())
    conf.put(Config.TOPOLOGY_DEBUG, false)

    if (args[0] == "local") {
        LocalCluster cluster = new LocalCluster()
        cluster.submitTopology("mytopology", conf, topology.build())
    } else {
        StormSubmitter.submitTopology("mytopology", conf, topology.build())
        NEO4JTridentFunction.getGraphDatabaseService().shutdown()
    }

Storm.yaml, который мы используем для Storm, выглядит следующим образом:

########### These MUST be filled in for a storm configuration
storm.zookeeper.servers:
     - "localhost"
#     - "server2"
# 
storm.zookeeper.port : 2999


storm.local.dir: "/opt/mphrx/neo4j/stormdatadir"

nimbus.childopts: "-Xms2048m"
ui.childopts: "-Xms1024m"
logviewer.childopts: "-Xmx512m"
supervisor.childopts: "-Xms1024m"
worker.childopts: "-Xms2600m -Xss256k -XX:MaxPermSize=128m -XX:PermSize=96m
    -XX:NewSize=1000m -XX:MaxNewSize=1000m -XX:MaxTenuringThreshold=1 -XX:SurvivorRatio=6
    -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled
    -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly
    -server -XX:+AggressiveOpts -XX:+UseCompressedOops -Djava.awt.headless=true -Djava.net.preferIPv4Stack=true
    -Xloggc:logs/gc-worker-%ID%.log -verbose:gc
    -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=1m
    -XX:+PrintGCDetails -XX:+PrintHeapAtGC -XX:+PrintGCTimeStamps -XX:+PrintClassHistogram
    -XX:+PrintTenuringDistribution -XX:-PrintGCApplicationStoppedTime -XX:-PrintGCApplicationConcurrentTime
    -XX:+PrintCommandLineFlags -XX:+PrintFlagsFinal"

java.library.path: "/usr/lib/jvm/jdk1.7.0_25"

supervisor.slots.ports:
    - 6700
    - 6701
    - 6702
    - 6703

topology.trident.batch.emit.interval.millis: 100
topology.message.timeout.secs: 300
#topology.max.spout.pending: 10000
  • Размер каждого сообщения, созданного в Kafka: 11 КБ
  • Время выполнения каждого болта (NEO4JTridentFunction) для обработки данных: 500 мс
  • Кол-во штурмовиков: 1
  • Подсказка по параллелизму для Spout (OpaqueTridentKafkaSpout): 1
  • Подсказка по параллельности для болта / функции (NEO4JTridentFunction): 50

  • Мы видим, что пропускная способность от Spout составляет около 12 мс / сек.

  • Скорость отправки сообщений в Kafka: 150 мс / сек.

И Storm, и Kafka представляют собой развертывание с одним узлом. Мы читали о гораздо более высокой пропускной способности от Storm, но не можем добиться того же. Подскажите, пожалуйста, как настроить конфигурацию Storm + Kafka + OpaqueTridentKafkaSpout для достижения более высокой пропускной способности. Любая помощь в этом отношении нам очень поможет.

Спасибо,


person Siddharth    schedule 01.07.2014    source источник


Ответы (3)


Вы должны установить параллелизм носиков так же, как количество разделов для упомянутых тем. По умолчанию трезубец принимает один пакет для каждого выполнения, вам следует увеличить это количество, изменив свойство topology.max.spout.pending. Поскольку Trident принудительно управляет упорядоченными транзакциями, ваш метод выполнения (NEO4JTridentFunction) должен быть быстрым, чтобы достичь желаемого решения.

Кроме того, вы можете поиграть с "tridentConfig.fetchSizeBytes", изменив его, вы можете получать больше данных для каждого нового вызова emit в свой носик.

Также вы должны проверить свой журнал сборки мусора, он даст вам представление о реальной сути.

Вы можете включить журнал сборки мусора, добавив "-XX:+PrintGCDetails -XX:+PrintGCTimeStamps -verbose:gc -Xloggc:{path}/gc-storm-worker-%ID%.log" в настройках worker.childopts в конфигурации вашего worker.

И последнее, но не менее важное: вы можете использовать G1GC, если соотношение вашего молодого поколения выше, чем обычно.

person serkan kucukbay    schedule 17.03.2016

Установите worker.childopts в соответствии с конфигурацией вашей системы. Используйте SpoutConfig.fetchSizeBytes, чтобы увеличить количество байтов, втягиваемых в топологию. Увеличьте свой намек на параллелизм.

person Rocky    schedule 08.01.2016

мои расчеты: если 8 ядер и 500МС на болт -> ~ 16 сообщений / сек. если вы оптимизируете болт, вы увидите улучшения.

также для болтов, привязанных к процессору, попробуйте Parallelism hint = 'количество общих ядер' и увеличьте topology.trident.batch.emit.interval.millis до количества времени, необходимого для обработки всего пакета, разделенного на 2. установите для параметра topology.max.spout.pending значение 1.

person LiozM    schedule 19.03.2016