Всякий раз, когда я перезапускаю систему, он показывает, что кодек utf-8 не может декодировать байт 0x98 в kafka, чтобы зажечь

Это мой следующий код для получения данных от kafka для запуска потоковой передачи Сначала он работал, но когда я перезапустил систему, он снова показывает следующую ошибку:

UnicodeDecodeError: кодек utf-8 не может декодировать байт 0x98 в позиции 5: недопустимый начальный байт

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.streaming.kafka import KafkaUtils
from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient
from confluent_kafka.avro.serializer.message_serializer import MessageSerializer

schema_registry_client = CachedSchemaRegistryClient(url='http://0.0.0.0:8081')
serializer = MessageSerializer(schema_registry_client)
sc = SparkContext()
sc.setLogLevel("ERROR")
ssc = StreamingContext(sc, 2)



# def decoder(s):
#     decoded_message = serializer.decode_message(s)
#     return decoded_message

kvs = KafkaUtils.createDirectStream(ssc, ["demo.Appointment_Attendance.Patient"], {
                                "metadata.broker.list": "localhost:9092"}, 
                                 valueDecoder=serializer.decode_message)
keyDecoder=lambda x: x, valueDecoder=lambda x: x)
lines = kvs.map(lambda x: x[1])
lines.pprint()

ssc.start()
ssc.awaitTermination()

person Kinjal Patel    schedule 19.12.2019    source источник
comment
Что ж, 0.0.0.0:8081 не настоящий адрес. Вы имели в виду localhost:8081? Можете ли вы получать сообщения без декодера? Затем сопоставить их позже?   -  person OneCricketeer    schedule 19.12.2019
comment
Я изменил его на localhost: 8081. Я не получаю сообщение без декодера @ cricket_007   -  person Kinjal Patel    schedule 19.12.2019
comment
если я передаю декодер следующим образом: kvs=KafkaUtils.createDirectStream(ssc["demo.Appointment_Attendance.Patient"], {"metadata.broker.list": "localhost:9092"}, keyDecoder=lambda x: x,valueDecoder=lambda x: x) Я получаю такой вывод: b'\x00\x00\x00\x00\x08\x00\x02\xa6\x01\nMayur\x02\xd2\xd5\xc8\xc24\x02\xc8\x0b\x08male\x08demoT\xf0\xf1\xd9\xdf\x0b\x00 mysql-bin.000020\xce$\x00\x00\x02\x04\x02,Appointment_Attendance\x02\x0ePatient\x02c\x02\xba\xd6\xa1\xd6\xe3[' Как преобразовать это в читаемый формат   -  person Kinjal Patel    schedule 19.12.2019
comment
Что-то вроде kvs.mapValues(lambda v: serializer.decode_message(v))   -  person OneCricketeer    schedule 19.12.2019
comment
хорошо, спасибо, это сработало   -  person Kinjal Patel    schedule 19.12.2019


Ответы (2)


0x98 - кириллическая буква И в utf-8, фактически эта буква представлена ​​байтами 0xD0,0x98. В кодировке windows-1251 0x98 описывается как неопределенное. У вас есть доступ к строке с «И»? Можете ли вы поменять его на маленькую букву «и»? Другой вариант - заменить «И» на некоторый уникальный маркер, например == CYR_I ==, а затем выполнить обратную замену.

person dimirsen    schedule 19.12.2019
comment
Я не думаю, что это важно в том, какой это персонаж. Пятый байт - это содержимое данных docs.confluent .io / current / schema-registry / - person OneCricketeer; 19.12.2019

Перемещение комментария к ответу ...

Сначала попробуйте просто использовать необработанные байты

topic = ssc ["demo.Appointment_Attendance.Patient"] kvs = KafkaUtils.createDirectStream (тема, {"metadata.broker.list": "localhost: 9092"})

Если это сработает, попробуйте на более позднем этапе отменить серию через map / mapValues.

kvs.mapValues(lambda v: serializer.decode_message(v))

Как только это сработает, попробуйте вернуться к использованию valueDecoder

person OneCricketeer    schedule 05.01.2020