Есть много онлайн-примеров чтения json из Kafka (для записи на паркет), но я не могу понять, как применить схему к строке CSV из kafka.
Потоковые данные:
customer_1945,cusaccid_995,27999941
customer_1459,cusaccid_1102,27999942
Схема:
schema = StructType() \
.add("customer_id",StringType()) \
.add("customer_acct_id",StringType()) \
.add("serv_acct_id",StringType())
Прочтите ленту:
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "xx.xx.xx.xx:9092") \
.option("subscribe", "test") \
.load()
Я использовал это для JSON:
interval=df \
.select(from_json(col("value").cast("string"), schema).alias("json")) \
.select("json.*")
Перед записью на паркет с заданной схемой:
query=interval \
.writeStream \
.format("parquet") \
.option("checkpointLocation", "/user/whatever/checkpoint24") \
.start("/user/ehatever/interval24")
Поскольку я не могу использовать from_json () для CSV - я не знаю, как применить схему к фрейму данных, чтобы я мог использовать аналогичную команду writeStream ().