Каков правильный способ вручную зафиксировать смещение в теме kafka

У меня есть потребительский сценарий, который обрабатывает каждое сообщение и вручную вносит коррективы в тему.

CONSUMER = KafkaConsumer(
    KAFKA_TOPIC,
    bootstrap_servers=[KAFKA_SERVER],
    auto_offset_reset="earliest",
    max_poll_records=100,
    enable_auto_commit=False,
    group_id=CONSUMER_GROUP,
    # Use the RoundRobinPartition method
    partition_assignment_strategy=[RoundRobinPartitionAssignor],
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

while True:
    count += 1
    LOGGER.info("--------------Poll {0}---------".format(count))
    for msg in CONSUMER:
        # Process msg.value
        # Commit offset to topic
        tp = TopicPartition(msg.topic, msg.partition)
        offsets = {tp: OffsetAndMetadata(msg.offset, None)}
        CONSUMER.commit(offsets=offsets)

Время, затрачиваемое на обработку каждого сообщения, составляет ‹1 сек.

Я получаю эту ошибку Ошибка:

kafka.errors.CommitFailedError: CommitFailedError: Commit cannot be completed since the group has already
            rebalanced and assigned the partitions to another member.
            This means that the time between subsequent calls to poll()
            was longer than the configured max_poll_interval_ms, which
            typically implies that the poll loop is spending too much
            time message processing. You can address this either by
            increasing the rebalance timeout with max_poll_interval_ms,
            or by reducing the maximum size of batches returned in poll()
            with max_poll_records.


Process finished with exit code 1

Ожидание:

а) Как исправить эту ошибку?

б) Как я могу убедиться, что моя ручная фиксация работает правильно?

в) Правильный способ совершения смещения.

Я прошел через это, но Разница между session.timeout.ms и max.poll.interval.ms для Kafka 0.10.0.0 и более поздних версий, чтобы понять мою проблему, мы очень ценим любую помощь по настройке времени опроса, сеанса или пульса.

Apache кафка: 2.11-2.1.0 кафка-питон: 1.4.4


person Shakeel    schedule 07.02.2019    source источник


Ответы (1)


session.timeout.ms потребителя должно быть меньше group.max.session.timeout.ms, присутствующего на брокере Kafka.

person Selvaram G    schedule 07.02.2019
comment
Я не смог найти group.max.session.timeout.ms конфиг. Я нашел это в моем server.properties group.initial.rebalance.delay.ms=0 в настройках координатора группы. Вы имели в виду эту настройку на стороне брокера? - person Shakeel; 09.02.2019