Каков стандартный способ обработки исключения OOM в Apache Flink

Я учусь обрабатывать потоковые данные с помощью Flink.

Мне удалось создать пример, который должен получить и десериализовать потоковые данные из источника данных, преобразовать их и распечатать результат.

Сейчас думаю, как обработать исключение OOM во Flink.

Например, если существует некоторая проблема с противодавлением, что означает, что если скорость отправки данных из источника данных выше, чем обработка данных в операторах Flink, как я понимаю, оперативная память будет исчерпана через некоторое время. Так что, если это случится? Как справиться с такого рода исключениями? Можно ли игнорировать какой-либо ввод, чтобы процесс не вызвал никаких ошибок?

Другими словами, я ожидаю какой-то механизм, как показано ниже:

if (RAM is almost exhausted)
    ignore the coming data
else
    process the coming data

person Yves    schedule 08.06.2020    source источник


Ответы (1)


Механизма, который вы себе представляете, не существует. Вы могли бы построить его самостоятельно, но это кажется неправильным способом решения проблемы.

Противодавление не вызовет исключений OOM во Flink. Его сетевой стек использует пул сетевых буферов вне кучи фиксированного размера вместе с управлением потоком на основе кредита. Задача не может отправлять данные в нисходящем направлении, если для нее уже не выделен буфер в приемнике. Это означает, что источники данных быстро адаптируются к мощности самой медленной задачи (задач) в конвейере. Таким образом, вместо того, чтобы игнорировать входящие данные, источники естественным образом ограничивают себя и избегают чтения данных, которые они не могут отправить вниз по потоку.

Единственная вероятная причина ошибок OOM заключается в том, что со временем ваше приложение использует все больше и больше ключевых состояний и таймеров. Вы можете решить эту проблему несколькими способами:

  • использовать бэкэнд состояния RocksDB (который сохраняет состояние на локальном диске, а также кэш вне кучи)
  • по возможности используйте предварительную агрегацию
  • быть более агрессивным в отношении очистки состояния устаревших ключей
person David Anderson    schedule 09.06.2020