Я пытаюсь настроить ровно один раз семантику в Kafka (Apache Beam). Вот изменения, которые я собираюсь представить:
Производитель:
enable.idenpotence
= верноtransactional.id
= uniqueTransactionalId
Потребитель:
установить
enable.auto.commit
= false// добавили в конструктор потребителей:
.commitOffsetsInFinalize()
.withReadCommitted()
В конструктор KafkaIO#write
добавлено следующее:
.withEOS(numShards, sinkGroupId)
Кто-нибудь знает, что еще нужно изменить, чтобы точно достичь семантики в Apache Beam KafkaIO?
Приведенная выше конфигурация выглядит нормально или я что-то неправильно понял?
Нужно ли мне указывать свойство transactional.id
, если я не использую API транзакций (потому что у меня нет явного производителя в Apache Beam)?