Не могу отправить в Кафку из NiFi

Я работаю в Docker для Windows, и вот моя установка NiFi:

введите здесь описание изображения

Подробная информация о процессоре PublishKafka:  введите описание изображения здесь

Подробная информация о процессоре ConsumeKafka:  введите описание изображения здесь

Вот мой файл для создания докеров (примечание: 192.168.1.50 - это мой статический IP-адрес внутреннего хоста):

version: '3'
services:
  Jenkins:
    container_name: Jenkins
    restart: on-failure
    depends_on:
    - NiFi
    image: jenkins:latest
    ports:
      - "32779:50000"
      - "32780:8080"
  NiFi:
    container_name: NiFi
    image: xemuliam/nifi:latest
    restart: on-failure
    depends_on:
    - kafka
    ports:
      - "32784:8089"
      - "32783:8080"
      - "32782:8081"
      - "32781:8443"
    labels:
      com.foo: myLabel
  zookeeper:
    container_name: Zookeeper
    image: wurstmeister/zookeeper
    restart: on-failure
    #network_mode: host
    ports:
      - "2181:2181"
  kafka:
    #container_name: Kafka
    image: wurstmeister/kafka
    depends_on:
    - zookeeper
    #restart: on-failure
    #network_mode: host
    ports:
      - "9092"
    environment:
      #KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.1.50:9092
      #KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
      
      KAFKA_CREATE_TOPICS: "MainIngestionTopic:1:1"
      KAFKA_ZOOKEEPER_CONNECT: 192.168.1.50:2181
      KAFKA_ADVERTISED_LISTENERS: INSIDE://:9092,OUTSIDE://192.168.1.50:9094
      KAFKA_LISTENERS: INSIDE://:9092,OUTSIDE://:9094
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
      
    volumes:
      - ./var/run/docker.sock:/var/run/docker.sock

Когда я просматриваю журнал контейнера Kafka, я вижу, что моя тема была успешно создана из docker-compose.

Сообщения успешно доставляются в процессор PublishKafka в NiFi, но затем не публикуются. Процессор ConsumeKafka, который подписан на ту же тему, никогда не получает сообщение.

Журнал контейнера NiFi показывает следующее:

2018-05-28 19:46:18,792 ERROR [Timer-Driven Process Thread-1] o.a.n.p.kafka.pubsub.PublishKafka PublishKafka[id=b2503f49-acc9-38f5-86f9-5029e2768b68] Failed to send all message for StandardFlowFileRecord[uuid=b3f6f818-34d3-42a9-9d6e-636cf17eb138,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1527533792820-1, container=default, section=1], offset=5, length=5],offset=0,name=8151630985100,size=5] to Kafka; routing to failure due to org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 5000 ms.: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 5000 ms.


org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 5000 ms.


2018-05-28 19:46:18,792 INFO [Timer-Driven Process Thread-1] o.a.kafka.clients.producer.KafkaProducer Closing the Kafka producer with timeoutMillis = 5000 ms.

Я попытался опубликовать тему из самого контейнера Kafka, но это тоже не удалось:

введите здесь описание изображения

Я просмотрел документацию и прочитал много тем, пытаясь решить эту проблему, но это все еще проблема. Любая помощь будет принята с благодарностью!


person afenkner    schedule 28.05.2018    source источник
comment
Привет. Я не уверен, почему потребитель не может прочитать, но вам нужно подключить ConsumeKafka к нижестоящему процессору, чтобы действительно получить сообщение. если у вас есть success отношения ConsumeKafka, настроенные на автоматическое завершение, вы не получите сообщения.   -  person Sivaprasanna Sethuraman    schedule 29.05.2018
comment
Привет. Вы пытались использовать свой IP 192.168.1.50 вместо localhost?   -  person Val Bonn    schedule 29.05.2018


Ответы (1)


Вы не можете использовать localhost в свойстве «Kafka Brokers» в NiFi, если только брокер фактически не работал на том же хосте, на котором работал NiFi. Поскольку у вас есть каждая служба внутри контейнера докеров, контейнер для kafka должен иметь конкретное имя хоста или IP-адрес, который можно использовать.

person Bryan Bende    schedule 29.05.2018
comment
В этом есть смысл. Я обновил настройку Kafka Brokers в PublishKafka, чтобы смотреть на машину kafka вместо localhost. Это помогло мне преодолеть ошибку обновления метаданных, но теперь я получаю org.apache.kafka.common.errors.TimeoutException: Batch Expired - person afenkner; 30.05.2018