Flume: назначить ключ для раковины раздела kafka

Я имею дело с проблемой, но я не могу найти ответ на ее решение, ни в документации по Flume. Я хочу взять абсолютный путь к файлу хвоста и сохранить его. После того, как я хочу передать его в приемник kafka в качестве ключа, чтобы все события имели один и тот же путь в одном разделе. Я читал много статей, в которых говорится, что это возможно, но я не могу найти конфигурацию, которую нужно назначить, чтобы она работала. Может ли кто-нибудь дать мне ссылки или пример по настройке агента?
У меня есть следующая конфигурация агента:

source
agent3a.sources.source3a.type = TAILDIR
agent3a.sources.source3a.filegroups= f1
agent3a.sources.source3a.filegroups.f1 = /path/local/data/source/
agent3a.sources.source3a.channels= channel3a
agent3a.sources.source3a.batchSize=1
agent3a.sources.source3a.fileHeader= True

Memory Channel
agent3a.channels.channel3a.type = memory
agent3a.channels.channel3a.transactionCapacity = 100
agent3a.channels.channel3a.capacity = 100

Sink
agent3a.sinks.sink3a.type= org.apache.flume.sink.kafka.KafkaSink 
agent3a.sinks.sink3a.kafka.bootstrap.servers =########hostname1#####:port, ########hostname2#####:port
agent3a.sinks.sink3a.kafka.topic= topic_test
agent3a.sinks.sink3a.kafka.producer.acks=1
agent3a.sinks.sink3a.channel= channel3a
agent3a.sinks.sink3a.FlumeBatchSize=1
agent3a.sinks.sink3a.useFlumeEventFormat = true
agent3a.sinks.sink3a.kafka.producer.batch.size=10

тк вообще :)


person Claudia_S    schedule 20.11.2019    source источник


Ответы (1)


Наконец я нашел, как настроить агент, чтобы назначить ключ для раздела в теме кафки, используя абсолютный путь к файлам в источнике данных. Более подробно, необходимо установить свойство 'fileHeaderKey=key'. Таким образом, когда событие передается в приемник kafka, в заголовке содержится пара ключей =absolute/path/of/the/file, и kafka может его использовать. как ключ в своем сообщении.

agent3a.sources= source3a 
agent3a.channels= channel3a
agent3a.sinkss= sink3a

source
agent3a.sources.source3a.type = TAILDIR
agent3a.sources.source3a.filegroups= f1
agent3a.sources.source3a.filegroups.f1 = /path/local/data/source/
agent3a.sources.source3a.channels= channel3a
agent3a.sources.source3a.batchSize=1
agent3a.sources.source3a.fileHeader= True
agent3a.sources.source3a.fileHeaderKey= key #####property to set the fileHeader the 
                                                  key for partitioning###

Memory Channel
agent3a.channels.channel3a.type = memory
agent3a.channels.channel3a.transactionCapacity = 100
agent3a.channels.channel3a.capacity = 100

Sink
agent3a.sinks.sink3a.type= org.apache.flume.sink.kafka.KafkaSink 
agent3a.sinks.sink3a.kafka.bootstrap.servers =########hostname1#####:port, ########hostname2#####:port
agent3a.sinks.sink3a.kafka.topic= topic_test
agent3a.sinks.sink3a.kafka.producer.acks=1
agent3a.sinks.sink3a.channel= channel3a
agent3a.sinks.sink3a.FlumeBatchSize=1
agent3a.sinks.sink3a.useFlumeEventFormat = true
agent3a.sinks.sink3a.kafka.producer.batch.size=10
person Claudia_S    schedule 21.11.2019