Невозможно написать сообщение в тему kafka (продюсер) при вызове класса kakfa production с циклом.
Я новичок в Python и Kafka. Я пытаюсь написать программу на Python для записи сообщений в тему Kafka и создания, чтобы потребитель Kafka мог подписаться на эту тему для публикации сообщения.
Я не уверен, чего не хватает в моей программе, что ограничивает возможность написания сообщения в теме.
Указание на примечание: я читаю файл JSON и использую цикл for, чтобы подготовить значение ключа. Затем присвойте его переменной и обратитесь к этой переменной с помощью Kafka, произведенного с помощью arg для msg.
Прилагается продюсерская программа Kafka.
Вход: Json_smpl.json
Содержание файла:
{
"transaction":{
"Accnttype":"Saving"
,"Branch":"West"
,"id":"WS"
}
}
Программа:
from confluent_kafka import Producer
import json
def acked(err, msg):
if err is not None:
print("Failed to deliver message: {0}: {1}"
.format(msg.value(), err.str()))
else:
print("Message produced: {0}".format(msg.value()))
p = Producer({'bootstrap.servers': 'localhost:9092'})
try:
with open('json_smpl.json') as read_j:
data = json.load(read_j)
get_data = data.get("transactions")
print(get_data)
for i in get_data:
a = list(get_data.items()[0])
p.produce(topic='mytopic12', 'myvalue #{0}'.format(a), callback=acked)
except KeyboardInterrupt:
pass
p.flush(1)
Ожидаемый результат: сообщение (ключ и значение JSON) будет записано в тему kafka для каждой итерации в цикле.
Фактический результат: в теме нет сообщений. поэтому потребитель не получает никаких сообщений.