Мы наблюдаем некоторые проблемы с производительностью с Kafka + Storm + Trident + OpaqueTridentKafkaSpout
Ниже указаны детали нашей установки:
Топология шторма:
Broker broker = Broker.fromString("localhost:9092")
GlobalPartitionInformation info = new GlobalPartitionInformation()
if(args[4]){
int partitionCount = args[4].toInteger()
for(int i =0;i<partitionCount;i++){
info.addPartition(i, broker)
}
}
StaticHosts hosts = new StaticHosts(info)
TridentKafkaConfig tridentKafkaConfig = new TridentKafkaConfig(hosts,"test")
tridentKafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme())
OpaqueTridentKafkaSpout kafkaSpout = new OpaqueTridentKafkaSpout(tridentKafkaConfig)
TridentTopology topology = new TridentTopology()
Stream st = topology.newStream("spout1", kafkaSpout).parallelismHint(args[2].toInteger())
.each(kafkaSpout.getOutputFields(), new NEO4JTridentFunction(), new Fields("status"))
.parallelismHint(args[1].toInteger())
Map conf = new HashMap()
conf.put(Config.TOPOLOGY_WORKERS, args[3].toInteger())
conf.put(Config.TOPOLOGY_DEBUG, false)
if (args[0] == "local") {
LocalCluster cluster = new LocalCluster()
cluster.submitTopology("mytopology", conf, topology.build())
} else {
StormSubmitter.submitTopology("mytopology", conf, topology.build())
NEO4JTridentFunction.getGraphDatabaseService().shutdown()
}
Storm.yaml, который мы используем для Storm, выглядит следующим образом:
########### These MUST be filled in for a storm configuration
storm.zookeeper.servers:
- "localhost"
# - "server2"
#
storm.zookeeper.port : 2999
storm.local.dir: "/opt/mphrx/neo4j/stormdatadir"
nimbus.childopts: "-Xms2048m"
ui.childopts: "-Xms1024m"
logviewer.childopts: "-Xmx512m"
supervisor.childopts: "-Xms1024m"
worker.childopts: "-Xms2600m -Xss256k -XX:MaxPermSize=128m -XX:PermSize=96m
-XX:NewSize=1000m -XX:MaxNewSize=1000m -XX:MaxTenuringThreshold=1 -XX:SurvivorRatio=6
-XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled
-XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly
-server -XX:+AggressiveOpts -XX:+UseCompressedOops -Djava.awt.headless=true -Djava.net.preferIPv4Stack=true
-Xloggc:logs/gc-worker-%ID%.log -verbose:gc
-XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=1m
-XX:+PrintGCDetails -XX:+PrintHeapAtGC -XX:+PrintGCTimeStamps -XX:+PrintClassHistogram
-XX:+PrintTenuringDistribution -XX:-PrintGCApplicationStoppedTime -XX:-PrintGCApplicationConcurrentTime
-XX:+PrintCommandLineFlags -XX:+PrintFlagsFinal"
java.library.path: "/usr/lib/jvm/jdk1.7.0_25"
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
topology.trident.batch.emit.interval.millis: 100
topology.message.timeout.secs: 300
#topology.max.spout.pending: 10000
- Размер каждого сообщения, созданного в Kafka: 11 КБ
- Время выполнения каждого болта (NEO4JTridentFunction) для обработки данных: 500 мс
- Кол-во штурмовиков: 1
- Подсказка по параллелизму для Spout (OpaqueTridentKafkaSpout): 1
Подсказка по параллельности для болта / функции (NEO4JTridentFunction): 50
Мы видим, что пропускная способность от Spout составляет около 12 мс / сек.
- Скорость отправки сообщений в Kafka: 150 мс / сек.
И Storm, и Kafka представляют собой развертывание с одним узлом. Мы читали о гораздо более высокой пропускной способности от Storm, но не можем добиться того же. Подскажите, пожалуйста, как настроить конфигурацию Storm + Kafka + OpaqueTridentKafkaSpout для достижения более высокой пропускной способности. Любая помощь в этом отношении нам очень поможет.
Спасибо,