Apache Flink - обработка дублированных сообщений во время развертывания заданий с ActiveMQ в качестве источника

Учитывая,

У меня есть задание Flink, которое читает из ActiveMQ источника и записывает в базу данных mysql - с ключом по идентификатору. Я включил контрольные точки для этой работы каждую секунду. Я указываю контрольные точки на экземпляр Minio, я проверял, что контрольные точки работают с jobid. Я развертываю эту работу с помощью Openshift (внизу Kubernetes) - я могу масштабировать эту работу вверх / вниз по мере и при необходимости.

Проблема

Когда задание развернуто (прокручивается) или задание было остановлено из-за ошибки / ошибки, и были ли какие-либо неиспользованные сообщения в ActiveMQ или неподтвержденные сообщения во Flink (но записанные в базу данных), когда задание восстанавливается (или новое задание развертывается) процесс задания уже обработал сообщения, в результате чего в базу данных вставляются повторяющиеся записи.

Вопрос

  • Разве контрольные точки не должны помочь восстановлению работы с того места, где она была остановлена?
  • Должен ли я пройти контрольно-пропускной пункт перед развертыванием нового задания?
  • Что произойдет, если задание завершится с ошибкой или отказом кластера?
  • Поскольку jobid меняется при каждом развертывании, как происходит восстановление?
  • Изменить. Поскольку я не могу ожидать идемпотентности от базы данных, чтобы избежать дублирования, сохраняемого в базе данных (Exactly-Once), могу ли я написать запрос для конкретной базы данных (upsert) для обновления, если данная запись присутствует, и вставить, если нет?

person Vijay Veeraraghavan    schedule 05.03.2020    source источник


Ответы (1)


В настоящее время JDBC поддерживает хотя бы один раз, что означает, что при восстановлении вы получаете повторяющиеся сообщения. В настоящее время существует проект добавления поддержки для ровно один раз, который, вероятно, будет выпущен с 1.11.

Разве контрольные точки не должны помочь восстановлению работы с того места, где она была остановлена?

Да, но время между последними успешными контрольными точками и восстановлением может привести к появлению наблюдаемых дубликатов. Я дал более подробный ответ на несколько похожую тему.

Должен ли я пройти контрольно-пропускной пункт перед развертыванием нового задания?

Абсолютно. На самом деле вы должны использовать отмену с точкой сохранения. Это единственный надежный способ изменить топологию. Кроме того, отмена с точками сохранения позволяет избежать дублирования данных, поскольку она корректно завершает задание.

Что произойдет, если задание завершится с ошибкой или отказом кластера?

Он должен автоматически перезапуститься (в зависимости от настроек перезапуска). Для восстановления будет использоваться последняя контрольная точка. Это наверняка приведет к дублированию.

Поскольку идентификатор задания постоянно меняется при каждом развертывании, как происходит восстановление?

Обычно вы явно указываете на тот же каталог контрольной точки (на S3?).

Поскольку я не могу ожидать идемпотентности от базы данных, является ли upsert единственным способом добиться обработки точно в один раз?

В настоящее время я не вижу выхода. Это должно измениться с 1.11.

person Arvid Heise    schedule 05.03.2020
comment
Я использую ту же корзину S3 для контрольной точки, и это не меняется при развертывании. Я вижу серию каталогов контрольных точек, созданных и удаленных в новом каталоге «jobid». Спасибо, Хайзе. - person Vijay Veeraraghavan; 06.03.2020
comment
Кроме того, когда планируется выпуск 1.11? - person Vijay Veeraraghavan; 06.03.2020
comment
Не могли бы вы расширить свой исходный вопрос, указав более подробную информацию о том, почему не работает upsert. 1.11 намечено на май. - person Arvid Heise; 06.03.2020
comment
Я обновил исходный вопрос, понятно? - person Vijay Veeraraghavan; 06.03.2020