Коннектор Kafka JDBC загружает все данные, затем инкрементно

Я пытаюсь понять, как сначала получить все данные из запроса, а затем постепенно изменять только с помощью коннектора kafka. Причина в том, что я хочу загрузить все данные в эластичный поиск, а затем синхронизировать es с моими потоками kafka. В настоящее время я делаю это, сначала используя коннектор с mode = bulk, а затем меняю его на временную метку. Это прекрасно работает.

Однако, если мы когда-либо захотим перезагрузить все данные в Streams и ES, это означает, что нам нужно написать несколько скриптов, которые каким-то образом очищают или удаляют потоки kafka и данные индексов es, изменить подключенный ini, чтобы установить режим как массовый, перезапустить все, дать пришло время загрузить все эти данные, затем снова изменить сценарии в режим отметки времени, затем перезапустить все еще раз (причина необходимости в таком сценарии заключается в том, что иногда массовые обновления происходят для исправления исторических данных с помощью процесса etl, который мы еще не контролируем , и этот процесс не обновляет метки времени)

Кто-нибудь делает что-то подобное и нашел более элегантное решение?


person mike01010    schedule 04.05.2017    source источник


Ответы (2)


как сначала получить все данные из запроса, а затем постепенно изменять только с помощью коннектора kafka.

Может быть, это может вам помочь. Например, у меня есть таблица:

╔════╦═════════════╦═══════════╗
║ Id ║    Name     ║  Surname  ║
╠════╬═════════════╬═══════════╣
║  1 ║ Martin      ║ Scorsese  ║
║  2 ║ Steven      ║ Spielberg ║
║  3 ║ Christopher ║ Nolan     ║
╚════╩═════════════╩═══════════╝

В этом случае я создам View:

CREATE OR REPLACE VIEW EDGE_DIRECTORS AS
SELECT 0 AS EXID, ID, NAME, SURNAME
FROM DIRECTORS WHERE ID =< 2
UNION ALL
SELECT ID AS EXID, ID, NAME, SURNAME
FROM DIRECTORS WHERE ID > 2;

В файле свойств коннектора kafka jdbc вы можете использовать:

connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
mode=incrementing
incrementing.column.name=EXID
topic.prefix=
tasks.max=1
name=gv-jdbc-source-connector
connection.url=
table.types=VIEW
table.whitelist=EDGE_DIRECTORS

Итак, коннектор kafka jdbc примет меры:

  1. Сначала все данные, где EXID = 0;
  2. Он сохранит в файле connector.offsets значение смещения = 0;
  3. Новая строка будет вставлена ​​в таблицу DIRECTORS.
  4. Коннектор Kafka JDBC выполнит: Select EXID, ID, NAME, SURNAME FROM EDGE_DIRECTORS и заметит, что EXID был увеличен.
  5. Данные будут обновлены в Kafka Streams.
person edward_wong    schedule 09.05.2017
comment
не совсем то, о чем я спрашивал. в настоящее время я использую столбцы с отметками времени. Мне нужно изменить режим на массовый, чтобы перезагрузить все, затем вернуться к метке времени, чтобы кафка, а затем постепенно загружал измененные или новые данные (для этого он добавляет запрос с меткой времени до и от). Я надеялся избежать переключения режима каждый раз, когда я хочу начать с «чистого» листа. - person mike01010; 26.07.2017

возвращаясь к этому через долгое время. Способ смог решить эту проблему и никогда не использовать массовый режим

  1. стоп-соединители
  2. стереть файлы смещения для каждого коннектора jvm
  3. (необязательно), если вы хотите выполнить полную очистку и загрузку, вы, вероятно, также захотите удалить свои темы, используя api kafka / connect utils / rest (и не забывайте темы состояния)
  4. перезапуск подключается.
person mike01010    schedule 28.01.2018
comment
Я пытаюсь решить такую ​​же проблему (например, перезагрузить данные с самого начала). Ваше решение кажется более сложным, чем обновление конфигурации до массового и возврата к отметке времени. Какую проблему он решает, чего не решает другой метод? - person Ickster; 26.07.2018