Структурированное потоковое чтение из темы Kafka

Я прочитал файл csv, преобразовал поле значения в байты и написал в тему Kafka, используя приложение производителя Kafka. Теперь я пытаюсь читать из темы Kafka, используя структурированную потоковую передачу, но не могу применить настраиваемую десериализацию kryo в поле значения.

Может ли кто-нибудь сказать мне, как использовать пользовательскую десериализацию в структурированной потоковой передаче?


person Deepak Nayak    schedule 10.01.2018    source источник
comment
Прочтите При каких обстоятельствах я могу добавить к своему вопросу «срочно» или другие похожие фразы, чтобы получить более быстрые ответы? - Резюмируя, можно сказать, что это не идеальный способ обращения к волонтерам и, вероятно, контрпродуктивен для получения ответов. Пожалуйста, воздержитесь от добавления этого к своим вопросам.   -  person halfer    schedule 10.01.2018


Ответы (1)


У меня была аналогичная проблема, в основном, у меня были все сообщения Kafka на Protobuf, и я решил это с помощью UDF.

from pyspark.sql.functions import udf

def deserialization_function(message):
    #You need to add your code to deserialize your messages
    #I returned a json but you can return other structure
    json = {"x": x_deserializable,
            "y": y_deserializable,
            "w": w_deserializable,
            "z": z_deserializable,
    return json

schema = StructType() \
                    .add("x", TimestampType()) \
                    .add("y", StringType()) \
                    .add("z", StringType()) \
                    .add("w", StringType()) 

own_udf = udf(deserialization_function, schema)

stream = spark.readStream \
          .format("kafka") \
          .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
          .option("subscribe", topic) \
          .load()

query = stream \
        .select(col("value")) \
        .select((own_udf("value")).alias("value_udf")) \
        .select("value_udf.x", "value_udf.y", "value_udf.w", "value_udf.z")
person Eric Bellet    schedule 15.06.2018