Я пытаюсь понять, как сначала получить все данные из запроса, а затем постепенно изменять только с помощью коннектора kafka. Причина в том, что я хочу загрузить все данные в эластичный поиск, а затем синхронизировать es с моими потоками kafka. В настоящее время я делаю это, сначала используя коннектор с mode = bulk, а затем меняю его на временную метку. Это прекрасно работает.
Однако, если мы когда-либо захотим перезагрузить все данные в Streams и ES, это означает, что нам нужно написать несколько скриптов, которые каким-то образом очищают или удаляют потоки kafka и данные индексов es, изменить подключенный ini, чтобы установить режим как массовый, перезапустить все, дать пришло время загрузить все эти данные, затем снова изменить сценарии в режим отметки времени, затем перезапустить все еще раз (причина необходимости в таком сценарии заключается в том, что иногда массовые обновления происходят для исправления исторических данных с помощью процесса etl, который мы еще не контролируем , и этот процесс не обновляет метки времени)
Кто-нибудь делает что-то подобное и нашел более элегантное решение?