Я пытаюсь добавить некоторую функциональность для rabbitmq с сообщениями о задержке. На самом деле мне нужно получить это сообщение через 2 недели. Насколько я знаю, нам не нужен никакой плагин. Кроме того, когда это сообщение вызывается, как мне переназначить новый обменник задержки x для повторного вызова в течение 2 недель. Куда я должен добавить это сообщение о задержке x.
конфигурация
"messageQueue": {
"connectionString": "amqp://guest:guest@localhost:5672?heartbeat=5",
"queueName": "history",
"exchange": {
"type": "headers",
"prefix": "history."
},
"reconnectTimeout": 5000
},
сервис:
import amqplib from 'amqplib'
import config from 'config'
import logger from './logger'
const {reconnectTimeout, connectionString, exchange: {prefix, type: exchangeType}, queueName} = config.messageQueue
const onConsume = (expectedMessages, channel, onMessage) => async message => {
const {fields: {exchange}, properties: {correlationId, replyTo}, content} = message
logger.silly(`consumed message from ${exchange}`)
const messageTypeName = exchange.substring(exchange.startsWith(prefix) ? prefix.length : 0)
const messageType = expectedMessages[messageTypeName]
if (!messageType) {
logger.warn(`Unexpected message of type ${messageTypeName} received. The service only accepts messages of types `, Object.keys(expectedMessages))
return
}
const deserializedMessage = messageType.decode(content)
const object = deserializedMessage.toJSON()
const result = await onMessage(messageTypeName, object)
if (correlationId && replyTo) {
const {type, response} = result
const encoded = type.encode(response).finish()
channel.publish('', replyTo, encoded, {correlationId})
}
}
const startService = async (expectedMessages, onMessage) => {
const restoreOnFailure = e => {
logger.warn('connection with message bus lost due to error', e)
logger.info(`reconnecting in ${reconnectTimeout} milliseconds`)
setTimeout(() => startService(expectedMessages, onMessage), reconnectTimeout)
}
const exchanges = Object.keys(expectedMessages).map(m => `${prefix}${m}`)
try {
const connection = await amqplib.connect(connectionString)
connection.on('error', restoreOnFailure)
const channel = await connection.createChannel()
const handleConsume = onConsume(expectedMessages, channel, onMessage)
const queue = await channel.assertQueue(queueName)
exchanges.forEach(exchange => {
channel.assertExchange(exchange, exchangeType, {durable: true})
channel.bindQueue(queue.queue, exchange, '')
})
logger.debug(`start listening messages from ${exchanges.join(', ')}`)
channel.consume(queue.queue, handleConsume, {noAck: true})
}
catch (e) {
logger.warn('error while subscribing for messages message', e)
restoreOnFailure(e)
}
}
export default startService