Задержка сообщения rabbitmq на nodejs

Я пытаюсь добавить некоторую функциональность для rabbitmq с сообщениями о задержке. На самом деле мне нужно получить это сообщение через 2 недели. Насколько я знаю, нам не нужен никакой плагин. Кроме того, когда это сообщение вызывается, как мне переназначить новый обменник задержки x для повторного вызова в течение 2 недель. Куда я должен добавить это сообщение о задержке x.

конфигурация

"messageQueue": {
        "connectionString": "amqp://guest:guest@localhost:5672?heartbeat=5",
        "queueName": "history",
        "exchange": {
            "type": "headers",
            "prefix": "history."
        },
        "reconnectTimeout": 5000
    },

сервис:

import amqplib from 'amqplib'
import config from 'config'

import logger from './logger'

const {reconnectTimeout, connectionString, exchange: {prefix, type: exchangeType}, queueName} = config.messageQueue

const onConsume = (expectedMessages, channel, onMessage) => async message => {
    const {fields: {exchange}, properties: {correlationId, replyTo}, content} = message

    logger.silly(`consumed message from ${exchange}`)

    const messageTypeName = exchange.substring(exchange.startsWith(prefix) ? prefix.length : 0)

    const messageType = expectedMessages[messageTypeName]

    if (!messageType) {
        logger.warn(`Unexpected message of type ${messageTypeName} received. The service only accepts messages of types `, Object.keys(expectedMessages))

        return
    }

    const deserializedMessage = messageType.decode(content)

    const object = deserializedMessage.toJSON()

    const result = await onMessage(messageTypeName, object)

    if (correlationId && replyTo) {
        const {type, response} = result

        const encoded = type.encode(response).finish()

        channel.publish('', replyTo, encoded, {correlationId})
    }
}

const startService = async (expectedMessages, onMessage) => {

    const restoreOnFailure = e => {
        logger.warn('connection with message bus lost due to error', e)
        logger.info(`reconnecting in ${reconnectTimeout} milliseconds`)

        setTimeout(() => startService(expectedMessages, onMessage), reconnectTimeout)
    }

    const exchanges = Object.keys(expectedMessages).map(m => `${prefix}${m}`)

    try {
        const connection = await amqplib.connect(connectionString)

        connection.on('error', restoreOnFailure)

        const channel = await connection.createChannel()

        const handleConsume = onConsume(expectedMessages, channel, onMessage)

        const queue = await channel.assertQueue(queueName)

        exchanges.forEach(exchange => {
            channel.assertExchange(exchange, exchangeType, {durable: true})

            channel.bindQueue(queue.queue, exchange, '')
        })

        logger.debug(`start listening messages from ${exchanges.join(', ')}`)

        channel.consume(queue.queue, handleConsume, {noAck: true})
    }
    catch (e) {
        logger.warn('error while subscribing for messages message', e)

        restoreOnFailure(e)
    }
}

export default startService

person Palaniichuk Dmytro    schedule 02.04.2018    source источник
comment
Вы можете использовать планировщик, чтобы запланировать отправку сообщения через 2 недели, используя node-schedule.   -  person TGW    schedule 02.04.2018


Ответы (1)


У RabbitMQ есть плагин для планирования сообщений. Вы можете использовать его с учетом важного предостережения относительно дизайна, которое я объясню ниже.

Использовать шаги

Вы должны сначала установить его:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

Затем вам нужно настроить отложенный обмен:

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args);

Наконец, вы можете установить параметр x-delay (где задержка в миллисекундах).

byte[] messageBodyBytes = "delayed payload".getBytes();
AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder();
headers = new HashMap<String, Object>();
headers.put("x-delay", 5000);
props.headers(headers);
channel.basicPublish("my-exchange", "", props.build(), messageBodyBytes);

Две недели равны (7*24*60*60*1000 = 604,800,000) миллисекундам.

Важное предостережение Как я объяснил в это answer, очень плохо просить об этом брокера сообщений.

Важно иметь в виду, что при работе с очередями сообщений они выполняют очень специфическую функцию в системе: удерживают сообщения, пока процессоры заняты обработкой более ранних сообщений. Ожидается, что правильно функционирующая очередь сообщений будет доставлять сообщения в разумные сроки. По сути, основное ожидание состоит в том, что как только сообщение достигает головы очереди, следующее получение из очереди выдаст сообщение -- без задержки.

Задержка становится результатом того, как система с очередью обрабатывает сообщения. Фактически, Закон Литтла предлагает некоторые интересные идеи по этому поводу. Если вы собираетесь вставить туда произвольную задержку, вам действительно не нужна очередь сообщений для начала — вся ваша работа запланирована заранее.

Таким образом, в системе, где необходима задержка (например, для присоединения/ожидания завершения параллельной операции), вам следует рассмотреть другие методы. Как правило, в этом конкретном случае имеет смысл запрашиваемая база данных. Если вы обнаружите, что держите сообщения в очереди в течение заранее установленного периода времени, вы фактически используете очередь сообщений в качестве базы данных — функция, для которой она не предназначена. Это не только рискованно, но и имеет высокую вероятность снижения производительности вашего брокера сообщений.

person theMayer    schedule 03.04.2018