Учитывая,
У меня есть задание Flink, которое читает из ActiveMQ
источника и записывает в базу данных mysql - с ключом по идентификатору. Я включил контрольные точки для этой работы каждую секунду. Я указываю контрольные точки на экземпляр Minio
, я проверял, что контрольные точки работают с jobid
. Я развертываю эту работу с помощью Openshift (внизу Kubernetes) - я могу масштабировать эту работу вверх / вниз по мере и при необходимости.
Проблема
Когда задание развернуто (прокручивается) или задание было остановлено из-за ошибки / ошибки, и были ли какие-либо неиспользованные сообщения в ActiveMQ или неподтвержденные сообщения во Flink (но записанные в базу данных), когда задание восстанавливается (или новое задание развертывается) процесс задания уже обработал сообщения, в результате чего в базу данных вставляются повторяющиеся записи.
Вопрос
- Разве контрольные точки не должны помочь восстановлению работы с того места, где она была остановлена?
- Должен ли я пройти контрольно-пропускной пункт перед развертыванием нового задания?
- Что произойдет, если задание завершится с ошибкой или отказом кластера?
- Поскольку
jobid
меняется при каждом развертывании, как происходит восстановление? - Изменить. Поскольку я не могу ожидать идемпотентности от базы данных, чтобы избежать дублирования, сохраняемого в базе данных (
Exactly-Once
), могу ли я написать запрос для конкретной базы данных (upsert
) для обновления, если данная запись присутствует, и вставить, если нет?