Производитель Kafka не собирает новые разделы

Я новичок в Kafka и пытаюсь создать на нем сервис для обслуживания платформы обмена сообщениями. Вот моя установка:

Kafka 0.9.0.1
Zookeeper 3.4.8
kafka-python 1.3 .3

Мое приложение создает KafkaProducer, из которого я отправляю поток сообщений в одну тему с 6 разделами. Я также создаю 7 KafkaConsumer (под одним group_id, 6 из которых назначаются 6 разделам, а один остается в состоянии ожидания (что ожидается). Пока производитель выполняет потоковую передачу, я увеличиваю количество разделов до 7 с ожидание того, что поток не будет распределен по 7 разделам и разбудит бездействующего потребителя. Однако, похоже, что производитель не берет только что добавленный раздел, пока я не повторно инициализирую его, перезагружая приложение. Я масштабирую раздел подсчитать, запустив это:

kafka-topics --alter --zookeeper localhost:2181 --topic test --partitions 7

Есть ли способ у производителя уловить изменение количества разделов без его повторной инициализации?

Вот связанные фрагменты кода:

Продюсер

class Producer(threading.Thread):
daemon = True

def __init__(self, name, manager):
    super(Producer, self).__init__()
    self.producer = KafkaProducer(bootstrap_servers='localhost:9092')

def run(self):
    while not self.killed:
        if not self.q.empty():
            self._busy()
            self.producer.send('test', value=self.q.get())
        else:
            self._free()

Потребители

class Consumer(threading.Thread):
    daemon = True

    def __init__(self, name, manager):
        super(Consumer, self).__init__()
        self.consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
                                 group_id='test_group',
                                 client_id="Consumer " + self.name)
        self.consumer.subscribe(['test'])

    def run(self):
        while not self.killed:
            messages = self.consumer.poll()

            for topic, records in messages.iteritems():
                print self.consumer.config['client_id'] + ": " + str(records)

person Saksham Gupta    schedule 18.05.2017    source источник
comment
не могли бы вы поделиться, как вы делали назначение разделов для producer. Также, когда ваши разделы увеличиваются в producer, требуется соответствующее изменение и в consumer.   -  person Krishna Oza    schedule 28.06.2018


Ответы (1)


Я столкнулся с, возможно, похожей проблемой и смог найти решение. Я написал это здесь: Как узнает ли производитель librdkafka о новых разделах тем в Kafka?

Если ваш тест был слишком коротким, вероятно, поэтому производитель не узнал о новых разделах. Параметр topic.metadata.refresh.interval.ms по умолчанию равен 300000 (в мс), поэтому брокер будет обновлять метаданные в производителях каждые 5 минут. Если после добавления разделов ваша проверка заняла более 5 минут, то причина не в этом.

person rodolk    schedule 06.02.2019