Я знаю о настройке kafka для чтения с самого раннего или последнего сообщения. Как мы можем включить дополнительную опцию, если мне нужно прочитать предыдущее смещение? Причина, по которой мне нужно сделать это, заключается в том, что более ранние сообщения, которые были прочитаны, должны быть обработаны снова из-за некоторой ошибки в логике обработки ранее.
Как настроить kafka так, чтобы у нас была возможность читать с самого раннего, самого последнего, а также с любого заданного смещения?
Ответы (2)
В клиенте java kafka есть несколько методов для потребителя kafka, которые можно использовать для указания следующей позиции потребления.
public void seek(раздел TopicPartition, длинное смещение)
Переопределяет смещения выборки, которые потребитель будет использовать при следующем опросе (тайм-аут). Если этот API вызывается для одного и того же раздела более одного раза, при следующем опросе() будет использовано самое последнее смещение. Обратите внимание, что вы можете потерять данные, если этот API будет произвольно использоваться в середине потребления для сброса смещений выборки.
Этого достаточно, а также есть seekToBeginning и seekToEnd.
Я пытаюсь ответить на похожий, но не совсем тот же вопрос, так что давайте посмотрим, может ли моя информация вам помочь.
Во-первых, Я работал над этим другим вопросом/ответом SO
Короче говоря, вы хотите зафиксировать свои смещения, и наиболее распространенным решением для этого является ZooKeeper. Поэтому, если ваш потребитель сталкивается с ошибкой или ему необходимо завершить работу, он может возобновить работу с того места, на котором остановился.
Сам я работаю с очень большим потоком большого объема, и мой потребитель (для теста) должен каждый раз начинать с самого хвоста. В документации указано, что я должен использовать KafkaConsumer seek, чтобы объявить мою отправную точку.
Я постараюсь обновить свои выводы здесь, как только они станут успешными и надежными. Наверняка это решаемая проблема.