Хороший интервал сердцебиения для pika-rabbitmq в Amazon ec2

Я использую последнюю версию библиотеки pika (0.9.9+) для rabbitmq. Я использую rabbitmq и pika следующим образом:

  1. У меня как рабочих есть длительные задачи (около 5 минут). Эти задачи берут свои запросы от rabbitmq. Запросы поступают очень редко, т.е. между запросами бывает длительное время простоя.
  2. Проблема, с которой я столкнулся ранее, связана с незанятыми соединениями (закрытие соединений из-за незанятых соединений). Итак, я включил сердцебиение в пике.
  3. Теперь проблема с выбором сердцебиения. Кажется, что Pika - это однопоточная библиотека, в которой прием и подтверждение пульса происходит в промежутках между запросами.
  4. Таким образом, если интервал пульса установлен меньше времени, которое функция обратного вызова использует для выполнения своих длительных вычислений, сервер не получает никаких подтверждений пульса и закрывает соединение.
  5. Итак, я предполагаю, что минимальный интервал пульса должен быть максимальным временем вычисления функции обратного вызова в блокирующем соединении.

Какое значение пульса для amazon ec2 может быть хорошим, чтобы предотвратить закрытие незанятых соединений?

Кроме того, некоторые предлагают использовать rabbitmq keepalive (или libkeepalive) для поддержки TCP-соединений. Я думаю, что управление тактовыми сигналами на уровне tcp намного лучше, потому что приложение не должно управлять ими. Верно ли это? Является ли поддержка активности хорошим методом по сравнению с сердцебиением RMQ?

Я видел, что некоторые предлагают использовать несколько потоков и очереди для длительных задач. Но разве это единственный вариант для длительных задач? Очень досадно, что для этого сценария необходимо использовать другую очередь.

Заранее спасибо. Думаю, я подробно описал проблему. Дайте мне знать, если я могу предоставить более подробную информацию.


person vadlamani maitreya    schedule 22.02.2013    source источник


Ответы (1)


Если вы не привязаны к использованию pika, эта ветка помогла мне добиться того, что вы пытаюсь сделать с помощью комбу:

#!/usr/bin/env python
import time, logging, weakref, eventlet
from kombu import Connection, Exchange, Queue
from kombu.utils.debug import setup_logging
from kombu.common import eventloop
from eventlet import spawn_after

eventlet.monkey_patch()

log_format = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) '
              '-35s %(lineno) -5d: %(message)s')
logging.basicConfig(level=logging.INFO, format=log_format)
logger = logging.getLogger('job_worker')
logger.setLevel(logging.INFO)


def long_running_function(body):
    time.sleep(300)

def job_worker(body, message):
    long_running_function(body)
    message.ack()

def monitor_heartbeats(connection, rate=2):
    """Function to send heartbeat checks to RabbitMQ. This keeps the
       connection alive over long-running processes."""
    if not connection.heartbeat:
        logger.info("No heartbeat set for connection: %s" % connection.heartbeat)
        return
    interval = connection.heartbeat
    cref = weakref.ref(connection)
    logger.info("Starting heartbeat monitor.")

    def heartbeat_check():
        conn = cref()
        if conn is not None and conn.connected:
            conn.heartbeat_check(rate=rate)
            logger.info("Ran heartbeat check.")
            spawn_after(interval, heartbeat_check)
    return spawn_after(interval, heartbeat_check)

def main():
    setup_logging(loglevel='INFO')

    # process for heartbeat monitor
    p = None

    try:
        with Connection('amqp://guest:guest@localhost:5672//', heartbeat=300) as conn:
            conn.ensure_connection()
            monitor_heartbeats(conn)
            queue = Queue('job_queue',
                          Exchange('job_queue', type='direct'),
                          routing_key='job_queue')
            logger.info("Starting worker.")
            with conn.Consumer(queue, callbacks=[job_worker]) as consumer:
                consumer.qos(prefetch_count=1)
                for _ in eventloop(conn, timeout=1, ignore_timeouts=True):
                    pass
    except KeyboardInterrupt:
        logger.info("Worker was shut down.")

if __name__ == "__main__":
    main()

Я вырезал код, специфичный для моей предметной области, но, по сути, я использую именно этот фреймворк.

person Gerald Manipon    schedule 13.05.2013