Канал памяти Flume для стока HDFS

У меня возникла проблема с Flume (1.5 на Cloudera CDH 5.3):

spoolDir source -> memory channel -> HDFS sink

Что я пытаюсь сделать: каждые 5 минут около 20 файлов помещаются в каталог буферизации (захвачены из удаленного хранилища). Каждый файл содержит несколько строк, каждая строка представляет собой журнал (в формате JSON). Размер файлов составляет от 10 КБ до 1 МБ.

Когда я запускаю агент, все файлы успешно помещаются в HDFS. Через 1 мин (это я выставил в flume.conf) файлы накатываются (убираем суффикс .tmp и закрываемся).

Но когда в каталоге буферизации обнаруживаются новые файлы, я получаю сообщение:

org.apache.flume.source.SpoolDirectorySource: The channel is full, and cannot write data now. The source will try again after 250 milliseconds

После безуспешных попыток множества различных конфигураций (увеличение/уменьшение канала transactionCapacity и емкости, увеличение/уменьшение размера партии и т. д.) я прошу вашей помощи.

Вот моя последняя конфигурация канала:

# source definition
sebanalytics.sources.spooldir-source.type = spooldir
sebanalytics.sources.spooldir-source.spoolDir = /var/flume/in
sebanalytics.sources.spooldir-source.basenameHeader = true
sebanalytics.sources.spooldir-source.basenameHeaderKey = basename
sebanalytics.sources.spooldir-source.batchSize = 10
sebanalytics.sources.spooldir-source.deletePolicy = immediate
# Max blob size: 1.5Go
sebanalytics.sources.spooldir-source.deserializer = org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder
sebanalytics.sources.spooldir-source.deserializer.maxBlobLength = 1610000000
# Attach the interceptor to the source
sebanalytics.sources.spooldir-source.interceptors = json-interceptor
sebanalytics.sources.spooldir-source.interceptors.json-interceptor.type = com.app.flume.interceptor.JsonInterceptor$Builder
# Define event's headers. basenameHeader must be the same than source.basenameHeaderKey (defaults is basename)
sebanalytics.sources.spooldir-source.interceptors.json-interceptor.basenameHeader = basename
sebanalytics.sources.spooldir-source.interceptors.json-interceptor.resourceHeader = resources
sebanalytics.sources.spooldir-source.interceptors.json-interceptor.ssidHeader = ssid

# channel definition
sebanalytics.channels.mem-channel-1.type = memory
sebanalytics.channels.mem-channel-1.capacity = 1000000
sebanalytics.channels.mem-channel-1.transactionCapacity = 10

# sink definition
sebanalytics.sinks.hdfs-sink-1.type = hdfs
sebanalytics.sinks.hdfs-sink-1.hdfs.path = hdfs://StandbyNameNode/data/in
sebanalytics.sinks.hdfs-sink-1.hdfs.filePrefix = %{resources}_%{ssid}
sebanalytics.sinks.hdfs-sink-1.hdfs.fileSuffix = .json
sebanalytics.sinks.hdfs-sink-1.hdfs.fileType = DataStream
sebanalytics.sinks.hdfs-sink-1.hdfs.writeFormat = Text
sebanalytics.sinks.hdfs-sink-1.hdfs.rollInterval = 3600
sebanalytics.sinks.hdfs-sink-1.hdfs.rollSize = 63000000
sebanalytics.sinks.hdfs-sink-1.hdfs.rollCount = 0
sebanalytics.sinks.hdfs-sink-1.hdfs.batchSize = 10
sebanalytics.sinks.hdfs-sink-1.hdfs.idleTimeout = 60

# connect source and sink to channel
sebanalytics.sources.spooldir-source.channels = mem-channel-1
sebanalytics.sinks.hdfs-sink-1.channel = mem-channel-1

person Adagyo    schedule 08.09.2015    source источник


Ответы (1)


Полный канал означает, что: канал не может получать больше событий от источника, поскольку приемник потребляет эти события медленнее, чем источник.

Увеличение пропускной способности канала только решает проблему. Возможные решения:

  • Улучшение обработки в приемнике... если приемник является пользовательским (улучшение/устранение циклов, использование более эффективного внутреннего API и т. д.). В данном случае это кажется невозможным, поскольку вы используете приемник HDFS по умолчанию.
  • Уменьшение частоты отправки данных в источник. Тем не менее, я думаю, вы не хотите/не можете этого делать из-за ваших требований к обработке.
  • Добавление дополнительных стоков, работающих параллельно. Я не уверен в этом, но могу себе представить, как дизайнеры Flume решили запускать каждый приемник в отдельном потоке. Если это так, то вы можете попробовать несколько параллельных приемников HDFS. Чтобы разделить данные на несколько приемников, вам придется использовать селектор мультиплексирования отличается от реплицирующегося по умолчанию.

ХТХ!

person frb    schedule 24.09.2015
comment
Я пробовал ваши решения без успеха. Однако я изменил источник на HTTP и больше никогда не сталкивался с проблемой полного канала, даже с огромным вводом. Итак, я решил свою проблему... но я бы предпочел понять, почему это не работает с SpoolDir. Но можно было обойтись и без :) - person Adagyo; 25.09.2015
comment
Причина должна быть проста: SpoolDir должен быть медленнее с точки зрения обработки данных, чем источник Http. - person frb; 29.09.2015