Это мой следующий код для получения данных от kafka для запуска потоковой передачи Сначала он работал, но когда я перезапустил систему, он снова показывает следующую ошибку:
UnicodeDecodeError: кодек utf-8 не может декодировать байт 0x98 в позиции 5: недопустимый начальный байт
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.streaming.kafka import KafkaUtils
from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient
from confluent_kafka.avro.serializer.message_serializer import MessageSerializer
schema_registry_client = CachedSchemaRegistryClient(url='http://0.0.0.0:8081')
serializer = MessageSerializer(schema_registry_client)
sc = SparkContext()
sc.setLogLevel("ERROR")
ssc = StreamingContext(sc, 2)
# def decoder(s):
# decoded_message = serializer.decode_message(s)
# return decoded_message
kvs = KafkaUtils.createDirectStream(ssc, ["demo.Appointment_Attendance.Patient"], {
"metadata.broker.list": "localhost:9092"},
valueDecoder=serializer.decode_message)
keyDecoder=lambda x: x, valueDecoder=lambda x: x)
lines = kvs.map(lambda x: x[1])
lines.pprint()
ssc.start()
ssc.awaitTermination()
0.0.0.0:8081
не настоящий адрес. Вы имели в видуlocalhost:8081
? Можете ли вы получать сообщения без декодера? Затем сопоставить их позже? - person OneCricketeer   schedule 19.12.2019kvs=KafkaUtils.createDirectStream(ssc["demo.Appointment_Attendance.Patient"], {"metadata.broker.list": "localhost:9092"}, keyDecoder=lambda x: x,valueDecoder=lambda x: x)
Я получаю такой вывод:b'\x00\x00\x00\x00\x08\x00\x02\xa6\x01\nMayur\x02\xd2\xd5\xc8\xc24\x02\xc8\x0b\x08male\x08demoT\xf0\xf1\xd9\xdf\x0b\x00 mysql-bin.000020\xce$\x00\x00\x02\x04\x02,Appointment_Attendance\x02\x0ePatient\x02c\x02\xba\xd6\xa1\xd6\xe3['
Как преобразовать это в читаемый формат - person Kinjal Patel   schedule 19.12.2019kvs.mapValues(lambda v: serializer.decode_message(v))
- person OneCricketeer   schedule 19.12.2019