Project_Bank.csv не является файлом Parquet. ожидаемое магическое число в хвосте [80, 65, 82, 49], но найдено [110, 111, 13, 10]

Итак, я пытался загрузить файл csv с выводом настраиваемой схемы, но каждый раз получаю следующие ошибки:

Project_Bank.csv не является файлом Parquet. ожидаемое магическое число в хвосте [80, 65, 82, 49], но найдено [110, 111, 13, 10]

Вот как выглядит моя программа и записи в файле csv,

возраст; работа; в браке; образование; дефолт; баланс; жилье; ссуда; контакт; день; месяц; продолжительность; кампания; дни; предыдущий; доход; y 58; руководство; женат; высшее образование; нет; 2143; да; нет; неизвестно; 5; май; 261; 1; -1; 0; неизвестно; нет 44; техник; холост; второстепенный; нет; 29; да; нет; неизвестно; 5; май; 151; 1; -1; 0 ; неизвестно; нет 33; предприниматель; замужем; вторичный; нет; 2; да; да; неизвестно; 5; май; 76; 1; -1; 0; неизвестно; нет

Мой код:

$ spark-shell --packages com.databricks: spark-csv_2.10: 1.5.0

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import org.apache.spark.sql.types._
import org.apache.spark.sql.SQLContext   
import sqlContext.implicits._    
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}

val bankSchema = StructType(Array(
  StructField("age", IntegerType, true),
  StructField("job", StringType, true),
  StructField("marital", StringType, true),
  StructField("education", StringType, true),
  StructField("default", StringType, true),
  StructField("balance", IntegerType, true),
  StructField("housing", StringType, true),
  StructField("loan", StringType, true),
  StructField("contact", StringType, true),
  StructField("day", IntegerType, true),
  StructField("month", StringType, true),
  StructField("duration", IntegerType, true),
  StructField("campaign", IntegerType, true),
  StructField("pdays", IntegerType, true),
  StructField("previous", IntegerType, true),
  StructField("poutcome", StringType, true),
  StructField("y", StringType, true)))


 val df = sqlContext.
  read.
  schema(bankSchema).
  option("header", "true").
  option("delimiter", ";").
  load("/user/amit.kudnaver_gmail/hadoop/project_bank/Project_Bank.csv").toDF()

  df.registerTempTable("people")
  df.printSchema()
  val distinctage = sqlContext.sql("select distinct age from people")

Любые предложения о том, почему я не могу работать с файлом csv здесь после нажатия правильной схемы. Заранее благодарим за совет.

Спасибо Амит К


person amitk    schedule 22.05.2017    source источник


Ответы (2)


Здесь проблема в том, что Data Frame ожидает Parquet-файл при его обработке. Для обработки данных в CSV. Вот что ты можешь сделать.

Прежде всего, удалите строку заголовка из данных.

58;management;married;tertiary;no;2143;yes;no;unknown;5;may;261;1;-1;0;unknown;no
44;technician;single;secondary;no;29;yes;no;unknown;5;may;151;1;-1;0;unknown;no
33;entrepreneur;married;secondary;no;2;yes;yes;unknown;5;may;76;1;-1;0;unknown;no

Затем мы пишем следующий код для чтения данных.

Создать класс дела

case class BankSchema(age: Int, job: String, marital:String, education:String, default:String, balance:Int, housing:String, loan:String, contact:String, day:Int, month:String, duration:Int, campaign:Int, pdays:Int, previous:Int, poutcome:String, y:String)

Прочитать данные из HDFS и проанализировать их

val bankData = sc.textFile("/user/myuser/Project_Bank.csv").map(_.split(";")).map(p => BankSchema(p(0).toInt, p(1), p(2),p(3),p(4), p(5).toInt, p(6), p(7), p(8), p(9).toInt, p(10), p(11).toInt, p(12).toInt, p(13).toInt, p(14).toInt, p(15), p(16))).toDF()

А затем зарегистрируйте таблицу и выполните запросы.

bankData.registerTempTable("bankData")
val distinctage = sqlContext.sql("select distinct age from bankData")

Вот как будет выглядеть результат

+---+
|age|
+---+
| 33|
| 44|
| 58|
+---+
person Tanmay Deshpande    schedule 23.05.2017
comment
Привет, Танмей! Очень признателен за вашу помощь здесь, я пытался построить схему с использованием метода класса case раньше, но я считаю, что забыл удалить строку заголовка из файла csv и закончил с некоторыми ошибками. Теперь он работает нормально, теперь я могу запускать все запросы sql с выходом :) С уважением, Амит К. - person amitk; 23.05.2017
comment
Амит, если решение сработало для вас, вы можете принять ответ. Спасибо. - person Tanmay Deshpande; 24.05.2017

Здесь ожидаемый формат файла - csv, но в соответствии с ошибкой он ищет формат файла parquet.

Это можно преодолеть, явно указав формат файла, как показано ниже (который отсутствовал в общей проблеме), потому что, если мы не укажем формат файла, он по умолчанию ожидает формат Parquet.

Согласно версии кода Java (примерный пример):

Dataset<Row> resultData = session.read().format("csv")
                                            .option("sep", ",")
                                            .option("header", true)
                                            .option("mode", "DROPMALFORMED")
                                            .schema(definedSchema)
                                            .load(inputPath);

Здесь схему можно определить либо с помощью java class (ie. POJO class), либо с помощью StructType, как уже упоминалось. А inputPath - это путь к входному csv файлу.

person RPaul    schedule 20.09.2018