Поток Spark не может читать файлы, созданные из потока в hdfs

Я создал приложение в реальном времени, в котором я записываю потоки данных в hdfs из веб-журналов с помощью flume, а затем обрабатываю эти данные с помощью искрового потока. Но пока flume пишет и создает новые файлы в потоке искры hdfs, не может обрабатывать эти файлы. Если я помещаю файлы в каталог hdfs с помощью команды put, поток искры может читать и обрабатывать файлы. Любая помощь относительно того же будет большой.


person Y0gesh Gupta    schedule 09.06.2015    source источник


Ответы (3)


Вы сами обнаружили проблему: пока поток данных продолжается, файл HDFS «заблокирован» и не может быть прочитан каким-либо другим процессом. Напротив, как вы испытали, если вы поместите пакет данных (это yur файл, пакет, а не поток), как только он будет загружен, он будет готов к чтению.

В любом случае, и не будучи экспертом в потоковой передаче Spark, кажется, что из Spark Streaming Руководство по программированию, раздел «Обзор», свидетельствует о том, что вы не выполняете правильное развертывание. Я имею в виду, что из показанного здесь изображения кажется, что потоковая передача (в данном случае сгенерированная Flume) должна быть напрямую отправлена ​​в движок потоковой передачи Spark; тогда результаты будут помещены в HDFS.

Тем не менее, если вы хотите сохранить свое развертывание, то есть Flume -> HDFS -> Spark, я предлагаю создать мини-пакеты данных во временных папках HDFS, и как только мини-пакеты будут готовы, хранить новые данные за секунду. minibatch, передавая первую партию в Spark для анализа.

HTH

person frb    schedule 11.06.2015

В дополнение к ответу frb: что правильно - SparkStreaming с Flume действует как RPC-сервер Avro - вам необходимо настроить AvroSink, который указывает на ваш экземпляр SparkStreaming.

person Erik Schmiegelow    schedule 12.06.2015

с помощью spark2 теперь вы можете напрямую подключить потоковую передачу искр к потоку, см. официальный docs, а затем один раз записать в HDFS в конце процесса.

 import org.apache.spark.streaming.flume._
 val flumeStream = FlumeUtils.createStream(streamingContext, [chosen machine's hostname], [chosen port])
person CarloV    schedule 23.11.2017