Как лучше всего очистить очередь заданий от повторных/дублированных заданий (используя sidekiq и redis-semaphore)

У меня есть приложение rails, которое получает много писем из нескольких учетных записей IMAP.

  • Я использую sidekiq для обработки заданий.
  • Я использую sidetiq для планирования заданий.
  • Я использую redis-semaphore, чтобы повторяющиеся задания для одного и того же пользователя не натыкались друг на друга.

2 проблемы, хотя:

  • 1: Когда задание попадает в «if s.lock», redis-semaphore приостанавливает его выполнение до завершения всех предыдущих заданий. Мне нужно, чтобы задание было отменено, а не поставлено в очередь.
  • 2: Если во время выполнения задания возникнет исключение, что приведет к сбою, sidekiq вернет задание в очередь для повторной попытки. Мне нужно, чтобы задание было отменено, а не поставлено в очередь. Включение «sidekiq_options :retry => false» в код, похоже, не имеет значения.

Мой код:

class FetchMailsJobs
  include Sidekiq::Worker
  include Sidetiq::Schedulable

  tiq { hourly.minute_of_hour(0, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55) }

  def perform(last_occurrence, current_occurrence)
    users = User.all
    users.each do |user|

      if user.imap_accounts.exists?
        ImapJob.perform_async(user._id.to_s)
      end
    end
  end
end

class ImapJob
  include Sidekiq::Worker

  def perform(user_id)
    s = Redis::Semaphore.new("fetch_imap_mails_for_#{user_id}".to_sym, connection: "localhost")
    if s.lock
      user = User.where(_id: user_id).first
      emails = ImapMails.receive_mails(user)
      s.unlock
    end
  end
end

person Cjoerg    schedule 04.05.2013    source источник


Ответы (2)


1. Создайте подкласс Redis и перегрузите blpop, чтобы принять -1 для неблокирующего использования lpop.

redis-semaphore вызывает @redis.blpop в Redis::Semaphore#lock. Хотя вы можете перегрузить метод lock, чтобы вместо него использовать @redis.lpop, гораздо проще было бы передать собственный экземпляр Redis к семафору.

Поместите следующее в lib вашего приложения rails и потребуйте его в своем config/initializers/sidekiq.rb (или сделайте то, что вы предпочитаете для загрузки следующего класса).

class NonBlockingRedis < Redis
  def blpop(key, timeout)
    if timeout == -1
      result = lpop(key)
      return result if result.nil?
      return [key, result]
    else
      super(key, timeout)
    end
  end
end

Всякий раз, когда вы вызываете Redis::Semaphore.new, передайте ключ :redis с новым экземпляром класса NonBlockingRedis.

Вызовите s.lock с -1 в качестве аргумента, чтобы использовать lpop вместо blpop.

s = Redis::Semaphore.new("fetch_imap_mails_for_#{user_id}".to_sym, redis: NonBlockingRedis.new(connection: "localhost"))
if s.lock -1
  user = User.where(_id: user_id).first
  emails = ImapMails.receive_mails(user)
  s.unlock
end

2. Использование sidekiq_options retry: false в вашем рабочем классе должно работать, см. пример ниже.

В своем вопросе вы не указали, у какого работника у вас возникли проблемы с заданиями, попадающими в очередь повторных попыток. Поскольку FetchMailsJobs в конечном итоге ставит в очередь ImapJob заданий, исключение в первом случае может привести к тому, что ImapJob повторно ставится в очередь.

С вашей блокировкой семафора также было бы неплохо обернуть вашу работу в блок begin rescue ensure.

class FetchMailsJobs
  include Sidekiq::Worker
  include Sidetiq::Schedulable

  sidekiq_options retry: false

  tiq { hourly.minute_of_hour(0, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55) }

  def perform(last_occurrence, current_occurrence)
    users = User.all
    users.each do |user|

      if user.imap_accounts.exists?
        ImapJob.perform_async(user._id.to_s)
      end
    end
  end
end

class ImapJob
  include Sidekiq::Worker

  sidekiq_options retry: false

  def perform(user_id)
    s = Redis::Semaphore.new("fetch_imap_mails_for_#{user_id}".to_sym, redis: NonBlockingRedis.new(connection: "localhost"))
    if s.lock - 1
      begin
        user = User.where(_id: user_id).first
        emails = ImapMails.receive_mails(user)
      rescue => e
        # ignore; do nothing
      ensure
        s.unlock
      end
    end
  end
end

Дополнительные сведения см. в разделе Дополнительные параметры sidekiq: рабочие.

person potatosalad    schedule 05.05.2013

Нельзя ли отказаться от redis-semaphore и использовать sidekiq- жемчужина уникальных рабочих мест ? Кажется, что это хороший инструмент, который делает именно то, что вы ищете.

person Ivar    schedule 20.11.2013