потоковая передача искры: чтение строки CSV из кафки, запись в паркет

Есть много онлайн-примеров чтения json из Kafka (для записи на паркет), но я не могу понять, как применить схему к строке CSV из kafka.

Потоковые данные:

customer_1945,cusaccid_995,27999941    
customer_1459,cusaccid_1102,27999942

Схема:

schema = StructType() \
.add("customer_id",StringType()) \
.add("customer_acct_id",StringType()) \
.add("serv_acct_id",StringType())

Прочтите ленту:

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "xx.xx.xx.xx:9092") \
  .option("subscribe", "test") \
  .load()

Я использовал это для JSON:

interval=df \
  .select(from_json(col("value").cast("string"), schema).alias("json")) \
  .select("json.*")

Перед записью на паркет с заданной схемой:

query=interval     \
  .writeStream  \
  .format("parquet") \
  .option("checkpointLocation", "/user/whatever/checkpoint24") \
  .start("/user/ehatever/interval24")

Поскольку я не могу использовать from_json () для CSV - я не знаю, как применить схему к фрейму данных, чтобы я мог использовать аналогичную команду writeStream ().


person MarkTeehan    schedule 20.12.2017    source источник


Ответы (1)


Вот как я это сделал. Без from_json извлеките строку csv:

interval=df.select(col("value").cast("string")) .alias("csv").select("csv.*")

А затем разбейте его на столбцы. Это можно записать как паркетный файл, используя ту же инструкцию, что и выше.

interval2=interval \
      .selectExpr("split(value,',')[0] as customer_id" \
                 ,"split(value,',')[1] as customer_acct_id" \
                 ,"split(value,',')[2] as serv_acct_id" \
                 ,"split(value,',')[3] as installed_service_id" \
                 ,"split(value,',')[4] as meter_id" \
                 ,"split(value,',')[5] as channel_number" \
                 ... etc
                 )
person MarkTeehan    schedule 21.12.2017