Kafka S3 Sink Connector — как пометить раздел как завершенный

Я использую разъем приемника Kafka для записи данных из Kafka в s3. Выходные данные разбиты на почасовые сегменты — year=yyyy/month=MM/day=dd/hour=hh. Эти данные используются последующим пакетным заданием. Итак, прежде чем запускать подчиненное задание, я должен быть уверен, что никакие дополнительные данные не поступят в данный раздел после того, как начнется обработка для этого раздела.

Каков наилучший способ спроектировать это? Как пометить раздел как завершенный? то есть никакие дополнительные данные не будут записаны в него после того, как он будет помечен как завершенный.

EDIT: я использую RecordField как timestamp.extractor. Мои сообщения kafka гарантированно будут отсортированы в разделах по полю раздела




Ответы (1)


Зависит от того, какой экстрактор временных меток вы используете в конфигурации приемника.

Вы должны были бы гарантировать, что ни одна запись не может иметь отметку времени раньше, чем время, когда вы ее потребляете.

Насколько мне известно, единственный возможный способ - использовать экстрактор временных меток WallClock. В противном случае вы используете временную метку Kafka Record или какую-либо временную метку в каждом сообщении. Оба из них могут быть перезаписаны на стороне производителя каким-либо событием в прошлом.

person OneCricketeer    schedule 20.10.2020
comment
Я использую RecordField как timestamp.extractor. Мои сообщения kafka гарантированно будут отсортированы по этому полю в разделах kafka, кстати. - person nish; 20.10.2020
comment
Как вы это гарантируете? Если завтра кто-то установит это поле в своих записях как отметку времени -1, тогда это будет проблемой, верно? Если у вас есть полный контроль над тем, какие временные метки создаются, то, конечно, нет никаких проблем с разделением, кроме как иметь дело с задержкой потребителя. - person OneCricketeer; 20.10.2020
comment
Сообщения kafka соответствуют данным CDC от postgres. Таким образом, временная метка источника строго увеличивается. Таким образом, данные поступают предварительно отсортированными. Затем я использую это поле для создания разделов. - person nish; 20.10.2020
comment
Имеет смысл. Тем не менее, нет семантики для завершения префикса S3 в коннекторе, поскольку он не имеет контекста происхождения данных. - person OneCricketeer; 20.10.2020
comment
Итак, единственный вариант — создать специальное задание, которое читает, может быть, по 1 сообщению из каждого раздела kafka и проверяет завершение на всех разделах? - person nish; 21.10.2020
comment
Я предполагаю, что это сработает, при условии, что кластер запущен и исправен... У нас также есть аналогичный процесс обработки данных S3, и я думаю, что аналитики данных просто предполагают, что данные будут там. Другие люди увлеклись проверкой временной метки смещения самого последнего записанного файла (это в имени файла) - person OneCricketeer; 21.10.2020
comment
Спасибо за реальный взгляд на ваши системы - person nish; 22.10.2020