Apache Beam Python SDK ReadFromKafka не получает данные

Я пробую простой пример чтения данных из темы Kafka в Apache Beam. Вот соответствующий фрагмент:

  with beam.Pipeline(options=pipeline_options) as pipeline:
    _ = (
        pipeline
        | 'Read from Kafka' >> ReadFromKafka(
            consumer_config={'bootstrap.servers': 'localhost:29092'},
            topics=['test'])
        | 'Print' >> beam.Map(print))

Используя приведенный выше фрагмент конвейера Beam, я не вижу входящих сообщений. Kafka работает локально в контейнере докеров, и я могу использовать kafkacat с хоста (вне контейнера) для публикации и подписки на сообщения. Так что, думаю, на этом фронте нет никаких проблем.

Похоже, что Beam может подключаться к Kafka и получать уведомления о новых сообщениях, поскольку я вижу изменения смещения в журналах Beam по мере публикации данных из kafkacat:

INFO:root:severity: INFO
timestamp {
  seconds: 1612886861
  nanos: 534000000
}
message: "[Consumer clientId=consumer-Reader-0_offset_consumer_1692125327_none-3, groupId=Reader-0_offset_consumer_1692125327_none] Seeking to LATEST offset of partition test-0"
log_location: "org.apache.kafka.clients.consumer.internals.SubscriptionState"
thread: "22"

INFO:root:severity: INFO
timestamp {
  seconds: 1612886861
  nanos: 537000000
}
message: "[Consumer clientId=consumer-Reader-0_offset_consumer_1692125327_none-3, groupId=Reader-0_offset_consumer_1692125327_none] Resetting offset for partition test-0 to offset 29."
log_location: "org.apache.kafka.clients.consumer.internals.SubscriptionState"
thread: "22"

Вот как я публикую данные с помощью kafkacat:

$ kafkacat -P -b localhost:29092 -t test -K:
1:foo
1:bar

и я могу подтвердить, что он получен, снова используя kafkacat:

$ kafkacat -C -b localhost:29092 -t test -f 'Key: %k Value: %s\n'
Key: 1 Value: foo
Key: 1 Value: bar

Но, несмотря на это, я не вижу, чтобы Beam напечатал фактическое сообщение, как я ожидал. Любые указатели на то, что здесь отсутствует, приветствуются. Я подозреваю, что это может быть проблема декодирования на стороне конвейера Beam, но это может быть неверно.

Изменить (17 марта 2021 г.):

После рассмотрения этой проблемы с разработчиками коннектора Beam Python Kafka основная причина, по которой Python ReadFromKafka не ведет себя так, как ожидалось, заключается в том, что переносимый бегун Flink не может выполнять неограниченные Splittable DoFns (SDF), поскольку он поддерживает только самоконтрольные точки. Портативная потоковая передача Flink не выдает регулярные запросы контрольных точек к SDK. Поэтому все записи Kafka буферизуются на первом ReadFromKafka этапе. Jira отслеживает эту проблему: https://issues.apache.org/jira/browse/BEAM-11991. Кроме того, существует другая Jira, которая отслеживает запрос функции для поддержки этого: https://issues.apache.org/jira/browse/BEAM-11998. Надеюсь это поможет!


person sumeetkm    schedule 11.02.2021    source источник
comment
LocalRunner также страдает от этой проблемы.   -  person highfly22    schedule 23.04.2021


Ответы (1)


У меня точно такая же проблема, и я не могу понять, почему операция ReadKafka не работает должным образом.

Это как если бы мне нужно было извлечь значения в другую коллекцию PCollection ...

Заблуждение заключается в том, что все примеры с использованием Python и Kafka (например: KafkaTaxi) выглядят очень просто, но с моей стороны это не работает.

Мне интересно, появилась ли ошибка после последней версии (2.28 .0). Проверим на предыдущих версиях apache-beam

person Matt D    schedule 25.02.2021
comment
У меня такая же проблема, какую версию Apache Beam вы используете? - person Müller; 05.03.2021