Я новичок в Kafka и пытаюсь создать на нем сервис для обслуживания платформы обмена сообщениями. Вот моя установка:
Kafka 0.9.0.1
Zookeeper 3.4.8
kafka-python 1.3 .3
Мое приложение создает KafkaProducer
, из которого я отправляю поток сообщений в одну тему с 6 разделами. Я также создаю 7 KafkaConsumer
(под одним group_id
, 6 из которых назначаются 6 разделам, а один остается в состоянии ожидания (что ожидается). Пока производитель выполняет потоковую передачу, я увеличиваю количество разделов до 7 с ожидание того, что поток не будет распределен по 7 разделам и разбудит бездействующего потребителя. Однако, похоже, что производитель не берет только что добавленный раздел, пока я не повторно инициализирую его, перезагружая приложение. Я масштабирую раздел подсчитать, запустив это:
kafka-topics --alter --zookeeper localhost:2181 --topic test --partitions 7
Есть ли способ у производителя уловить изменение количества разделов без его повторной инициализации?
Вот связанные фрагменты кода:
Продюсер
class Producer(threading.Thread):
daemon = True
def __init__(self, name, manager):
super(Producer, self).__init__()
self.producer = KafkaProducer(bootstrap_servers='localhost:9092')
def run(self):
while not self.killed:
if not self.q.empty():
self._busy()
self.producer.send('test', value=self.q.get())
else:
self._free()
Потребители
class Consumer(threading.Thread):
daemon = True
def __init__(self, name, manager):
super(Consumer, self).__init__()
self.consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
group_id='test_group',
client_id="Consumer " + self.name)
self.consumer.subscribe(['test'])
def run(self):
while not self.killed:
messages = self.consumer.poll()
for topic, records in messages.iteritems():
print self.consumer.config['client_id'] + ": " + str(records)
producer
. Также, когда ваши разделы увеличиваются вproducer
, требуется соответствующее изменение и вconsumer
. - person Krishna Oza   schedule 28.06.2018