Пытаюсь создать сообщение в теме kafka для каждой итерации, но похоже, что в итоге я не отправляю сообщение потребителю

Невозможно написать сообщение в тему kafka (продюсер) при вызове класса kakfa production с циклом.

Я новичок в Python и Kafka. Я пытаюсь написать программу на Python для записи сообщений в тему Kafka и создания, чтобы потребитель Kafka мог подписаться на эту тему для публикации сообщения.

Я не уверен, чего не хватает в моей программе, что ограничивает возможность написания сообщения в теме.

Указание на примечание: я читаю файл JSON и использую цикл for, чтобы подготовить значение ключа. Затем присвойте его переменной и обратитесь к этой переменной с помощью Kafka, произведенного с помощью arg для msg.

Прилагается продюсерская программа Kafka.

Вход: Json_smpl.json

Содержание файла:

{
"transaction":{
"Accnttype":"Saving"
,"Branch":"West"
,"id":"WS"
}
}

Программа:

from confluent_kafka import Producer
import json

def acked(err, msg):
    if err is not None:
        print("Failed to deliver message: {0}: {1}"
              .format(msg.value(), err.str()))
    else:
        print("Message produced: {0}".format(msg.value()))

p = Producer({'bootstrap.servers': 'localhost:9092'})
try:
    with open('json_smpl.json') as read_j:
        data = json.load(read_j)
        get_data = data.get("transactions")
    print(get_data)
    for i in get_data:
        a = list(get_data.items()[0])
        p.produce(topic='mytopic12', 'myvalue #{0}'.format(a), callback=acked)
except KeyboardInterrupt:
    pass
p.flush(1)

Ожидаемый результат: сообщение (ключ и значение JSON) будет записано в тему kafka для каждой итерации в цикле.

Фактический результат: в теме нет сообщений. поэтому потребитель не получает никаких сообщений.


person Vecivalus Rajeshkrishnan    schedule 11.01.2019    source источник


Ответы (1)


В вашем файле нет ключа transactions и цикла, который нужно пройти, поэтому ваш JSON не анализируется, и вы не обнаруживаете KeyError или ValueError

Начни с этого

p = Producer({'bootstrap.servers': 'localhost:9092'})
try:
    with open('json_smpl.json') as read_j:
        data = json.load(read_j).get("transaction")
        tosend = json.dumps(data)
        print("Ready to send : {}".format(tosend))
        p.produce(topic='mytopic12', tosend, callback=acked)
except:
    print("There was some error")
person OneCricketeer    schedule 11.01.2019
comment
Спасибо за ответ. Я все еще не могу пропарить это сообщение. В потребительской программе я подписался на конкретную тему (mytopic12). Когда я выполняю потребительскую программу, она не выбирает нам сообщение из подписанной темы. Так что теперь я не уверен, есть ли в этой теме какое-либо сообщение или неверно то, как я программирую потребителя. Для справки, вот как я подписываюсь на тему. c = Потребитель (настройки) c_offset = (settings ['default.topic.config']) c.subscribe (['mytopic12']) - person Vecivalus Rajeshkrishnan; 12.01.2019
comment
Я бы посоветовал использовать команды kafka-console-consumer или kafkacat, чтобы убедиться, что вы начинаете с самого раннего смещения. - person OneCricketeer; 12.01.2019