Как ограничить количество запущенных экземпляров в C++

У меня есть класс С++, который выделяет много памяти. Он делает это, вызывая стороннюю библиотеку, которая предназначена для сбоя, если она не может выделить память, и иногда мое приложение создает несколько экземпляров моего класса в параллельных потоках. При слишком большом количестве потоков у меня происходит сбой. Моя лучшая идея для решения - убедиться, что никогда, скажем, не работает более трех экземпляров одновременно. (Хорошая ли это идея?) И моей лучшей идеей на данный момент для реализации это является использование повышающего мьютекса. Что-то вроде следующего псевдокода,

MyClass::MyClass(){
  my_thread_number = -1; //this is a class variable
  while (my_thread_number == -1)
    for (int i=0; i < MAX_PROCESSES; i++)
      if(try_lock a mutex named i){
        my_thread_number = i;
        break;
      }
  //Now I know that my thread has mutex number i and it is allowed to run
}

MyClass::~MyClass(){
    release mutex named my_thread_number
}

Как видите, я не совсем уверен в точном синтаксисе мьютексов здесь. Итак, подводя итог, мои вопросы таковы:

  1. Я на правильном пути, когда хочу решить свою ошибку памяти, ограничив количество потоков?
  2. Если да, должен ли я сделать это с помощью мьютексов или другими способами?
  3. Если да, верен ли мой алгоритм?
  4. Есть ли где-нибудь хороший пример того, как использовать try_lock с мьютексами boost?

Изменить: я понял, что говорю о потоках, а не о процессах. Редактировать: я участвую в создании приложения, которое может работать как на Linux, так и на Windows...


person Emil Fredrik    schedule 20.03.2014    source источник
comment
Не можете ли вы просто хранить где-нибудь статическую переменную, которая увеличивается при каждом создании экземпляра, где создание новых экземпляров зависит от того, находится ли указанная переменная ниже установленного вами предела?   -  person MMJZ    schedule 21.03.2014
comment
Семафор предназначен для этого варианта использования: en.wikipedia.org/wiki/Semaphore_(programming) и boost.org/doc/libs/1_55_0/doc/html/interprocess/   -  person Lasse Espeholt    schedule 21.03.2014
comment
Почему половина вашего вопроса говорит о потоках, а половина — о процессах?   -  person jalf    schedule 21.03.2014
comment
@jalf: извините, я имел в виду темы. Я перепутал понятия.   -  person Emil Fredrik    schedule 21.03.2014
comment
@MMJZ Статическая переменная звучит заманчиво, но нет способа защитить конструкцию от сбоев, не так ли ...?   -  person Emil Fredrik    schedule 21.03.2014


Ответы (2)


Вот упрощенный способ реализовать свой собственный «семафор» (поскольку я не думаю, что стандартная библиотека или повышение имеют его). Это выбирает «кооперативный» подход, и рабочие будут ждать друг друга:

#include <boost/thread.hpp>
#include <boost/phoenix.hpp>

using namespace boost;
using namespace boost::phoenix::arg_names;

void the_work(int id)
{
    static int running = 0;
    std::cout << "worker " << id << " entered (" << running << " running)\n";

    static mutex mx;
    static condition_variable cv;

    // synchronize here, waiting until we can begin work
    {
        unique_lock<mutex> lk(mx);
        cv.wait(lk, phoenix::cref(running) < 3);
        running += 1;
    }

    std::cout << "worker " << id << " start work\n";
    this_thread::sleep_for(chrono::seconds(2));
    std::cout << "worker " << id << " done\n";

    // signal one other worker, if waiting
    {
        lock_guard<mutex> lk(mx);
        running -= 1;
        cv.notify_one(); 
    }
}

int main()
{
    thread_group pool;

    for (int i = 0; i < 10; ++i)
        pool.create_thread(bind(the_work, i));

    pool.join_all();
}

Теперь я бы сказал, что, вероятно, лучше иметь выделенный пул из n рабочих, которые по очереди берут свою работу из очереди:

#include <boost/thread.hpp>
#include <boost/phoenix.hpp>
#include <boost/optional.hpp>

using namespace boost;
using namespace boost::phoenix::arg_names;

class thread_pool
{
  private:
      mutex mx;
      condition_variable cv;

      typedef function<void()> job_t;
      std::deque<job_t> _queue;

      thread_group pool;

      boost::atomic_bool shutdown;
      static void worker_thread(thread_pool& q)
      {
          while (auto job = q.dequeue())
              (*job)();
      }

  public:
      thread_pool() : shutdown(false) {
          for (unsigned i = 0; i < boost::thread::hardware_concurrency(); ++i)
              pool.create_thread(bind(worker_thread, ref(*this)));
      }

      void enqueue(job_t job) 
      {
          lock_guard<mutex> lk(mx);
          _queue.push_back(std::move(job));

          cv.notify_one();
      }

      optional<job_t> dequeue() 
      {
          unique_lock<mutex> lk(mx);
          namespace phx = boost::phoenix;

          cv.wait(lk, phx::ref(shutdown) || !phx::empty(phx::ref(_queue)));

          if (_queue.empty())
              return none;

          auto job = std::move(_queue.front());
          _queue.pop_front();

          return std::move(job);
      }

      ~thread_pool()
      {
          shutdown = true;
          {
              lock_guard<mutex> lk(mx);
              cv.notify_all();
          }

          pool.join_all();
      }
};

void the_work(int id)
{
    std::cout << "worker " << id << " entered\n";

    // no more synchronization; the pool size determines max concurrency
    std::cout << "worker " << id << " start work\n";
    this_thread::sleep_for(chrono::seconds(2));
    std::cout << "worker " << id << " done\n";
}

int main()
{
    thread_pool pool; // uses 1 thread per core

    for (int i = 0; i < 10; ++i)
        pool.enqueue(bind(the_work, i));
}

PS. Вместо этого вы можете использовать лямбда-выражения C++11 boost::phoenix, если хотите.

person sehe    schedule 21.03.2014

ОБНОВЛЕНИЕ Мой другой ответ касается планирования ресурсов среди потоков (после уточнения вопроса).

Он показывает как семафорный подход для координации работы между (многими) рабочими, так и thread_pool для ограничения рабочих в первую очередь и постановки работы в очередь.

В Linux (и, возможно, в других ОС?) вы можете использовать идиому файла блокировки (но она не поддерживается некоторыми файловыми системами и старыми ядрами).

Я бы предложил использовать объекты межпроцессной синхронизации.

Например, используя именованный семафор Boost Interprocess:

#include <boost/interprocess/sync/named_semaphore.hpp>
#include <boost/thread.hpp>
#include <cassert>

int main()
{
    using namespace boost::interprocess;
    named_semaphore sem(open_or_create, "ffed38bd-f0fc-4f79-8838-5301c328268c", 0ul);

    if (sem.try_wait())
    {
        std::cout << "Oops, second instance\n";
    }
    else
    {
        sem.post();

        // feign hard work for 30s
        boost::this_thread::sleep_for(boost::chrono::seconds(30));

        if (sem.try_wait())
        {
            sem.remove("ffed38bd-f0fc-4f79-8838-5301c328268c");
        }
    }
}

Если вы запустите одну копию в фоновом режиме, новые копии будут «отказываться» запускаться («Ой, второй экземпляр») в течение примерно 30 секунд.

У меня есть ощущение, что здесь может быть проще изменить логику. М-м-м. Дай попробовать.

проходит некоторое время

Хе-хе. Это было сложнее, чем я думал.

Дело в том, что вы хотите убедиться, что блокировка не остается, когда ваше приложение прерывается или уничтожается. В интересах обмена методами переносимой обработки сигналов:

#include <boost/interprocess/sync/named_semaphore.hpp>
#include <boost/thread.hpp>
#include <cassert>
#include <boost/asio.hpp>

#define MAX_PROCESS_INSTANCES 3

boost::interprocess::named_semaphore sem(
        boost::interprocess::open_or_create, 
        "4de7ddfe-2bd5-428f-b74d-080970f980be",
        MAX_PROCESS_INSTANCES);

// to handle signals:
boost::asio::io_service service;
boost::asio::signal_set sig(service);

int main()
{

    if (sem.try_wait())
    {
        sig.add(SIGINT);
        sig.add(SIGTERM);
        sig.add(SIGABRT);
        sig.async_wait([](boost::system::error_code,int sig){ 
                std::cerr << "Exiting with signal " << sig << "...\n";
                sem.post();
            });
        boost::thread sig_listener([&] { service.run(); });

        boost::this_thread::sleep_for(boost::chrono::seconds(3));

        service.post([&] { sig.cancel(); });
        sig_listener.join();
    }
    else
    {
        std::cout << "More than " << MAX_PROCESS_INSTANCES << " instances not allowed\n";
    }
}

Там можно многое объяснить. Дайте мне знать, если вы заинтересованы.

ПРИМЕЧАНИЕ Должно быть совершенно очевидно, что если в вашем приложении используется kill -9 (принудительное завершение), то все ставки сняты, и вам придется либо удалить объект Name Semaphore, либо явно разблокировать его (post()).

Вот тест на моей системе:

sehe@desktop:/tmp$ (for a in {1..6}; do ./test& done; time wait)
More than 3 instances not allowed
More than 3 instances not allowed
More than 3 instances not allowed
Exiting with signal 0...
Exiting with signal 0...
Exiting with signal 0...

real    0m3.005s
user    0m0.013s
sys 0m0.012s
person sehe    schedule 20.03.2014
comment
Я только что изменил свой второй пример, чтобы он был несколько устойчивым к прерыванию/прекращению процесса, и продемонстрировал, как ограничить до 3 экземпляров за раз. - person sehe; 21.03.2014
comment
Спасибо @sehe за длинный и хорошо написанный ответ - но, похоже, вы обращались к процессам, а случай, который я имел в виду, был на самом деле о потоках ... еще раз извините :) Мой случай должен быть проще, верно? - person Emil Fredrik; 21.03.2014
comment
@EmilFredrik ахахахаха... что!? :) Да. Это проще. как в: это решенная проблема. См. мой новый ответ: stackoverflow.com/a/22553946/85371 - person sehe; 21.03.2014