Samza/Kafka не удалось обновить метаданные

В настоящее время я работаю над написанием Samza Script, который будет просто брать данные из темы Kafka и выводить данные в другую тему Kafka. Я написал очень простой StreamTask, однако при выполнении я столкнулся с ошибкой.

Ошибка ниже:

Exception in thread "main" org.apache.samza.SamzaException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 193 ms.
    at org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer.send(CoordinatorStreamSystemProducer.java:112)
    at org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer.writeConfig(CoordinatorStreamSystemProducer.java:129)
    at org.apache.samza.job.JobRunner.run(JobRunner.scala:79)
    at org.apache.samza.job.JobRunner$.main(JobRunner.scala:48)
    at org.apache.samza.job.JobRunner.main(JobRunner.scala)
 Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 193 ms

Я не совсем уверен, как настроить или заставить скрипт писать необходимые метаданные Kafka. Ниже приведен мой код для StreamTask и файла свойств. В файле свойств я добавил раздел «Метаданные», чтобы посмотреть, поможет ли это в последующем процессе, но безрезультатно. Это правильное направление или я что-то совсем упустил?

import org.apache.samza.task.StreamTask;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.TaskCoordinator;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.OutgoingMessageEnvelope;

/*
*   Take all messages received and send them to
*   a Kafka topic called "words"
*/

public class TestStreamTask implements StreamTask{

    private static final SystemStream OUTPUT_STREAM = new SystemStream("kafka" , "words");  // create new system stream for kafka topic "words"

    @Override
    public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator){

        String message = (String) envelope.getMessage();    // pull message from stream

        for(String word : message.split(" "))
            collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, word, 1));    // output messsage to new system stream for kafka topic "words"
    }   
}

# Job
job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
job.name=test-words

# YARN
yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz

# Task
task.class=samza.examples.wikipedia.task.TestStreamTask
task.inputs=kafka.test
task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
task.checkpoint.system=kafka
task.checkpoint.replication.factor=1

# Metrics
metrics.reporters=snapshot,jmx
metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactory
metrics.reporter.snapshot.stream=kafka.metrics
metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterFactory

# Serializers
serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
serializers.registry.metrics.class=org.apache.samza.serializers.MetricsSnapshotSerdeFactory

# Systems
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
systems.kafka.samza.msg.serde=string
systems.kafka.consumer.zookeeper.connect=localhost:2181/
systems.kafka.consumer.auto.offset.reset=largest
systems.kafka.producer.bootstrap.servers=localhost:9092

# Metadata
systems.kafka.metadata.bootstrap.servers=localhost:9092

person Zerbraxi    schedule 04.06.2015    source источник
comment
У вас есть кафка (и зоопарк), работающая на локальном хосте? В зависимости от того, как вы их запустили, вы можете использовать скрипт kafka-console-consumer.sh, который поставляется с kafka, и скрипт zkCli.sh, который поставляется с zookeeper, для устранения неполадок.   -  person palimpsestor    schedule 06.06.2015
comment
@palimpsestor У меня есть кафка и смотритель зоопарка. Я могу использовать kafka-console-consumer.sh и kafka-console-producer.sh для создания и потребления данных. Только при запуске Samza StreamTask я получаю указанную выше ошибку. Hello-Samza от Apache, которую я смог полностью запустить.   -  person Zerbraxi    schedule 08.06.2015
comment
Я не смог воспроизвести точное сообщение об ошибке, которое вы опубликовали, но я думаю, что это может быть отвлекающим маневром. В своем OutgoingMessageEnvelope вы пытаетесь опубликовать сообщение с целочисленным значением (1), не сообщая samza, что следует использовать для сериализации этого целого числа. Я смог заставить ваш пример работать, изменив эту строку на collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, "string", "string", word, word, "1"));   -  person palimpsestor    schedule 08.06.2015
comment
@palimpsestor Спасибо за помощь. Я изменил эту строку и, к сожалению, все еще сталкиваюсь с той же проблемой. Вы сказали, что смогли заставить его работать, так что проблема определенно во мне. В настоящее время я запускаю код через самзу run-job.sh, что, как я полагаю, является правильным способом сделать это. Мой вопрос в том, как или что конкретно вы используете для запуска приложения? Я просто использую Samza, представленную в тесте Hello-Samza от Apache. Может ли это быть моей проблемой? Еще раз спасибо за ваше терпение и помощь!   -  person Zerbraxi    schedule 08.06.2015
comment
Какая версия кафки у вас стоит? У вас есть auto.create.topics.enable значение true? Просто интересно, связана ли ваша проблема с чем-то вроде этого: issues.apache.org/jira/ просматривать/КАФКА-1124. Можете ли вы создавать и потреблять сообщения на тему ваших «слов» независимо от самзы?   -  person palimpsestor    schedule 08.06.2015
comment
@palimpsestor Я использую Kafka версии 0.8.2.1. Я также смог заставить кого-то другого успешно выполнить мою работу, так что это либо ошибка пользователя, что вполне возможно, либо что-то неправильное в моей конфигурации. Я не могу создавать и потреблять сообщения из этой темы только через Kafka, но могу делать это для других тем. Я изменил тему на существующую, через которую я могу отправлять сообщения только с Кафкой. Однако, когда я пытаюсь выполнить сценарий Samza с этой темой, возникает та же проблема. Я полностью закрыл Кафку и начал с нуля, но все равно не повезло.   -  person Zerbraxi    schedule 09.06.2015
comment
Если это не слишком сложно, не могли бы вы объяснить процесс, через который вы проходите, когда создаете файл java и файл свойств и выполняете его. Я просто хочу убедиться, что я не выполняю скрипт неправильно   -  person Zerbraxi    schedule 09.06.2015
comment
Трудно описать мой процесс, так как я немного настроил его. Только что я провел чистую проверку проекта hello-samza, вставил TestStreamTask.java (с моей модификацией), вставил файл свойств (как есть), добавил файл свойств в assembly/src.xml, чтобы включить его в пакет, и закомментировал rat-plugin от pom. Затем удалось запустить zk, kafka и yarn с помощью bin/grid start all и развернуть задачу с помощью run-job.sh. Затем отправил несколько сообщений в тему «тест», используя kafka-console-producer.sh, и увидел кучу «1» в теме «слова».   -  person palimpsestor    schedule 12.06.2015
comment
ты решил эту проблему? я сталкиваюсь почти с тем же   -  person Rodrigo Montano    schedule 30.10.2015


Ответы (1)


Этот вопрос касается Kafka 0.8, который, если я не ошибаюсь, не должен поддерживаться.

Этот факт в сочетании с тем фактом, что люди сталкиваются с этой проблемой только иногда, но не постоянно (и, кажется, никто не борется с этим в последние годы), вселяет в меня большую уверенность в том, что обновление до более поздней версии Kafka решит проблему. проблема.

person Dennis Jaheruddin    schedule 15.07.2019