SPARK Чтение CSV с FTP: входной путь не существует

Я пытаюсь сделать что-то, что должно быть довольно простым, но не могу этого сделать.

У меня есть файл .csv на FTP-сервере от клиента. Путь как таковой:

ftp://[user]:[passwd]@[IP-ADDRESS]/file.csv

Скопировав и вставив адрес, я могу легко получить доступ к файлу в моем браузере (или любой другой программе). Но я не могу получить к нему доступ в pyspark.

Вот что я пытаюсь сделать, используя databricks spark-csv (https://github.com/databricks/spark-csv):

file_path = ftp://[user]:[passwd]@[IP-ADDRESS]/file.csv
reader = sqlContext.read.format('com.databricks.spark.csv')
         .options(header=False, charset='cp860', inferschema='true', delim=";")
         .load(file_path)

и я получаю следующую ошибку:

org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: ftp://[user]:[passwd]@[IP-ADDRESS]/file.csv

Попытка прочитать файл как текстовый файл:

df = sqlContext.read.text("ftp://[user]:[passwd]@[IP-ADDRESS]/SALES_37_TIRADENTES_2016-09-01.csv", )
df.collect()

ошибка:

Py4JJavaError: An error occurred while calling o147.collectToPython.
: java.io.IOException: No input paths specified in job

Я почти уверен, что это как-то связано с доступом по FTP, но я не уверен.


РЕДАКТИРОВАТЬ

В итоге я загрузил файл с помощью ftplib Python и загрузил файл в RDD. Возможно, это неоптимальный подход, если размер ваших данных слишком велик, но он работает.


person Arthur Camara    schedule 14.09.2016    source источник
comment
Можете ли вы загрузить образцы файлов с любых сайтов, для которых не требуется идентификатор пользователя / пароль?   -  person blackpen    schedule 15.09.2016
comment
Не повезло читать из ftp.debian.org/debian/README   -  person Arthur Camara    schedule 15.09.2016


Ответы (2)


Вы можете использовать файл с FTP, используя SparkFiles http://spark.apache.org/docs/latest/api/python/pyspark.html

addFile (путь)

Добавьте файл для загрузки с этим заданием Spark на каждом узле. Переданный путь может быть либо локальным файлом, либо файлом в HDFS (или других файловых системах, поддерживаемых Hadoop), либо URI HTTP, HTTPS или FTP.

Чтобы получить доступ к файлу в заданиях Spark, используйте L {SparkFiles.get (fileName)} с именем файла, чтобы найти место его загрузки.

>>> from pyspark import SparkFiles
>>> path = os.path.join(tempdir, "test.txt")
>>> with open(path, "w") as testFile:
...    _ = testFile.write("100")
>>> sc.addFile(path)
>>> def func(iterator):
...    with open(SparkFiles.get("test.txt")) as testFile:
...        fileVal = int(testFile.readline())
...        return [x * fileVal for x in iterator]
>>> sc.parallelize([1, 2, 3, 4]).mapPartitions(func).collect()
[100, 200, 300, 400]

Следующее было протестировано (pyspark - 1.5.2) на работоспособность:

from pyspark import SparkFiles

file_path = "ftp://ftp:[email protected]/pub/Unix/Win2000_PlainPassword.reg"
sc.addFile(file_path)

filename = SparkFiles.get(file_path.split('/')[-1])

rdd = sc.textFile("file://"+filename)
rdd.take(10)
rdd.collect()
person Yaron    schedule 15.09.2016
comment
Почти готово. sc.addFile добавляет файл в tmp каталог. Но я все еще не могу его открыть, ни используя имя_файла, ни возвращаемое значение из SparkFiles.get(file_name), которое правильно возвращает локальный путь к файлу. - person Arthur Camara; 15.09.2016
comment
@ArthurCamara, пожалуйста, попробуйте предложение в моем обновлении - person Yaron; 15.09.2016
comment
Я уже этим занимался. df.collect() выходит из строя с той же ошибкой. Дело в том, что при открытии файла и чтении строки работает. Указывает, что файл действительно присутствует. Также стоит отметить, что я использую Spark 1.6.2, а не 2.0 (мы полагаемся на Bluemix от IBM) - person Arthur Camara; 15.09.2016
comment
На самом деле, извините, теперь ошибка говорит java.io.FileNotFoundException: File file:/tmp/spark-160-ego-master/work/spark-69648973-8776-4bc4-91e9-61605c7f4d3d/userFiles-d790dfaa-b665-4afa-a08c-057d61831b8d/file.csv does not exist - person Arthur Camara; 15.09.2016
comment
Все еще не могу найти файл на 1.6.0. Я думаю, что это проблема с серверами IBM Bluemix. - person Arthur Camara; 20.09.2016
comment
Даже у меня такая же проблема. Любая помощь? - person Tutu Kumari; 21.11.2019

Вот кое-что, что может помочь (версия scala, протестированная с помощью Spark 1.6.3)

Сначала получите файл с помощью метода WholeTextFiles из контекста Spark.

val dataSource = "ftp://[user]:[passwd]@[IP-ADDRESS]"

val fileRDD = sc.wholeTextFiles(dataSource).values

а затем создайте экземпляр CsvParser с вашими параметрами

val csvParser: CsvParser = new CsvParser().withUseHeader(true).withInferSchema(true).withDelimiter(';').withCharset("cp860")

(если вы не хотите использовать заголовок, не забудьте удалить первую строку)

и наконец

val df = csvParser.csvRdd(sq, fileRDD)
df.collect()

Надеюсь это поможет !

person pbamba    schedule 23.07.2017