Невозможно делать запросы к контейнеру Kafka из другого контейнера с помощью kafka-python

Окружение:

services:  
  zookeeper:
      image: wurstmeister/zookeeper
      ports:
        - 2181
  kafka:
    image: wurstmeister/kafka
    ports:
      - 9092:9092
      #- 8004:8004
    links:
      - zookeeper
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_HOST_NAME: kafka
      KAFKA_ADVERTISED_PORT: 9092
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
      KAFKA_CREATE_TOPICS: "foo:10:1"
      # JMX_PORT: 8004
  clickhouse-01:
      image: yandex/clickhouse-server
      hostname: clickhouse-01
      container_name: clickhouse-01
      ports:
          - 9001:9000
      volumes:
          - ./config/config.xml:/etc/clickhouse-server/config.xml
          - ./config/metrika.xml:/etc/clickhouse-server/metrika.xml
          - ./config/macros/macros-01.xml:/etc/clickhouse-server/config.d/macros.xml

      ulimits:
          nofile:
              soft: 262144
              hard: 262144
      depends_on:
          - "zookeeper"

  clickhouse-02:
      image: yandex/clickhouse-server
      hostname: clickhouse-02
      container_name: clickhouse-02
      ports:
          - 9002:9000
      volumes:
          - ./config/config.xml:/etc/clickhouse-server/config.xml
          - ./config/metrika.xml:/etc/clickhouse-server/metrika.xml
          - ./config/macros/macros-02.xml:/etc/clickhouse-server/config.d/macros.xml

      ulimits:
          nofile:
              soft: 262144
              hard: 262144
      depends_on:
          - "zookeeper"

  clickhouse-03:
      image: yandex/clickhouse-server
      hostname: clickhouse-03
      container_name: clickhouse-03
      ports:
          - 9003:9000
      volumes:
          - ./config/config.xml:/etc/clickhouse-server/config.xml
          - ./config/metrika.xml:/etc/clickhouse-server/metrika.xml
          - ./config/macros/macros-03.xml:/etc/clickhouse-server/config.d/macros.xml

      ulimits:
          nofile:
              soft: 262144
              hard: 262144
      depends_on:
          - "zookeeper"

Запрос Kafka через контейнер Zookeeper:

bash-4.4# /opt/kafka/bin/kafka-topics.sh --list --zookeeper zookeeper:2181
__consumer_offsets
foo
raw_trap

Результаты Netstat из контейнера zookeeper:

root@0a5f9a441da3:/opt/zookeeper-3.4.13# netstat
Active Internet connections (w/o servers)
Proto Recv-Q Send-Q Local Address           Foreign Address         State      
tcp        0      0 0a5f9a441da3:2181       kafka_1:58622 ESTABLISHED
tcp        0      0 0a5f9a441da3:2181       clickhouse-02.cli:60728 ESTABLISHED
tcp        0      0 0a5f9a441da3:2181       clickhouse-01.cli:56448 ESTABLISHED
tcp        0      0 0a5f9a441da3:2181       clickhouse-03.cli:39656 ESTABLISHED

Telnet от контейнера с kafka-python к брокеру:

root@f10fe1b58fa9:~# telnet kafka 9092
Trying 172.18.0.8...
Connected to kafka.
Escape character is '^]'.

Ошибка Кафки из телнета:

kafka_1           | [2019-06-23 13:38:05,350] WARN [SocketServer brokerId=1019] Unexpected error from /172.18.0.5; closing connection (org.apache.kafka.common.network.Selector)
kafka_1           | org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = 1903520116 larger than 104857600)
kafka_1           |     at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:104)
kafka_1           |     at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
kafka_1           |     at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
kafka_1           |     at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
kafka_1           |     at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
kafka_1           |     at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
kafka_1           |     at kafka.network.Processor.poll(SocketServer.scala:830)
kafka_1           |     at kafka.network.Processor.run(SocketServer.scala:730)
kafka_1           |     at java.lang.Thread.run(Thread.java:748)

Ошибка при попытке отправить данные в тему кафки с помощью python:

>>> from kafka import KafkaProducer
>>> producer = KafkaProducer(bootstrap_servers=['kafka:9092'])
>>> producer
<kafka.producer.kafka.KafkaProducer object at 0x7ff84417b320>
>>> producer.send('foo', b'raw_bytes')
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/lib/python3.7/site-packages/kafka/producer/kafka.py", line 564, in send
    self._wait_on_metadata(topic, self.config['max_block_ms'] / 1000.0)
  File "/usr/local/lib/python3.7/site-packages/kafka/producer/kafka.py", line 691, in _wait_on_metadata
    "Failed to update metadata after %.1f secs." % (max_wait,))
kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.

Я несколько раз лазил по сети, пытаясь найти решение. Я начал с того, что убедился, что KAFKA_ADVERTISED_HOST_NAME из контейнера правильный, и поэкспериментировал с его изменением, но ничего не добился. Когда я изменяю конечную точку записи bootstrap_servers=['kafka:9092'], я получаю сообщение об ошибке:

>>> consumer = KafkaConsumer('foo', 
...                          group_id='test-group',
...                          bootstrap_servers=['localhost:9092'])
Traceback (most recent call last):
  File "<stdin>", line 3, in <module>
  File "/usr/local/lib/python3.7/site-packages/kafka/consumer/group.py", line 353, in __init__
    self._client = KafkaClient(metrics=self._metrics, **self.config)
  File "/usr/local/lib/python3.7/site-packages/kafka/client_async.py", line 239, in __init__
    self.config['api_version'] = self.check_version(timeout=check_timeout)
  File "/usr/local/lib/python3.7/site-packages/kafka/client_async.py", line 865, in check_version
    raise Errors.NoBrokersAvailable()
kafka.errors.NoBrokersAvailable: NoBrokersAvailable

Таким образом, кажется, что я могу установить связь, но могу в корне неправильно понять что-то о запросе (запросах), который я пытаюсь сделать с производителем.

Вот документы и пример, который я использую для библиотеки Python, которую я сейчас тестирую https://kafka-python.readthedocs.io/en/master/usage.html#kafkaconsumer

РЕДАКТИРОВАТЬ: я успешно вернул сообщения из нашей производственной среды kafka, которая работает на голом железе с использованием потребителя.


person Nicholas Martinez    schedule 23.06.2019    source источник


Ответы (2)


Цитируя отличный пост Робина Моффета в блоге о слушателях Kafka и докере:

Если вы используете докер, вам нужно установить KAFKA_ADVERTISED_LISTENERS на внешний адрес (хост или IP), чтобы клиенты могли правильно к нему подключаться. В противном случае они попытаются подключиться к внутреннему адресу хоста, и если он недоступен, возникнут проблемы.

https://rmoff.net/2018/08/02/kafka-listeners-explained/

Клиентские подключения Kafka на самом деле представляют собой двухэтапный процесс, который включает сначала подключение к загрузочному серверу для запроса метаданных обо всем кластере, а затем подключение к одному или нескольким узлам кластера с использованием объявленных имен прослушивателей и портов.

person Hans Jespersen    schedule 23.06.2019
comment
Спасибо за информацию. Я изучил это и, вероятно, не понял, что я получаю доступ к контейнеру kafka из сети докеров, а не с моего локального хоста. - person Nicholas Martinez; 23.06.2019

Кажется, я решил проблему, используя другую версию контейнера:

  kafka:
    image: wurstmeister/kafka:2.11-0.11.0.3

Мой потребительский объект теперь может получить список тем от брокера. Что-то, что я не мог сделать раньше.

person Nicholas Martinez    schedule 23.06.2019
comment
последняя версия kafka на сегодняшний день (июнь 2019 г.) — 2.2.1, а эта версия — 0.11.0.3 (часть 2.11 — это версия scala), поэтому рассмотрите возможность использования более актуального образа, если он доступен. - person Hans Jespersen; 23.06.2019
comment
@HansJespersen, когда я загружаю последнюю версию, я получаю ту же ошибку. Я собираюсь вытащить каждый и посмотреть, что произойдет. - person Nicholas Martinez; 23.06.2019
comment
или, возможно, попробуйте образы докеров confluentinc, хотя некоторые параметры могут немного отличаться - person Hans Jespersen; 23.06.2019