Как расширить контейнер списка значков C ++ для реализации поточно-ориентированной реализации с использованием мьютекса ускоренного обновления?

Я написал образец тестового кода, чтобы проверить функциональность использования мьютексов ускоренного обновления для реализации блокировки мьютексов чтения / записи над контейнером списка значков. У меня десять потоков, 5 - считыватели, 5 - писатели.
Я использовал интеллектуальные указатели, чтобы упростить управление памятью и позволить одному и тому же объекту содержаться в нескольких списках. Писатели постоянно удаляют и повторно вставляют объекты в свой список, в то время как читатели периодически просматривают список. Кажется, все работает так, как ожидалось, но при вызове функции-члена стирания списка он должен найти запись для удаления, когда она у меня уже есть.
Достаточно ли умен метод стирания, чтобы знать, что запись должна быть удалена, без необходимости поиска для него снова или он оптимизирован для исключения поиска, когда элемент списка известен? Если он выполняет поиск, то есть ли простой способ расширить его, чтобы уникальная блокировка могла применяться только при фактическом удалении из списка, а не также при поиске элемента списка? Вот код, который я связал с библиотекой boost 1.51 и протестировал с vs2008.

  //******************************************************************************
  // INCLUDE FILES
  //******************************************************************************
  #include <boost/thread.hpp>
  #include <boost/date_time.hpp>
  #include <boost/thread/locks.hpp>  
  #include <boost/thread/shared_mutex.hpp>                  
  #include <boost/container/list.hpp>      
  #include <boost/shared_ptr.hpp>
  #include <boost/make_shared.hpp>
  #include <iostream>

  using namespace std;

  //******************************************************************************
  // LOCAL DEFINES
  //******************************************************************************
  #define NUM_THREADS  10
  #define NUM_WIDTH    5

  #ifdef UNIQUE_MUTEX
  #define MAIN_LIST_MUTEX           g_listMutex
  #define INT_LIST_MUTEX            g_intListMutex
  #define FLOAT_LIST_MUTEX          g_floatListMutex
  #else
  #define MAIN_LIST_MUTEX           g_listMutex
  #define INT_LIST_MUTEX            g_listMutex
  #define FLOAT_LIST_MUTEX          g_listMutex
  #endif

  //******************************************************************************
  // LOCAL TYPEDEFS
  //******************************************************************************
  typedef boost::upgrade_mutex                   myMutex;
  typedef boost::shared_lock<myMutex>            SharedLock;
  typedef boost::upgrade_to_unique_lock<myMutex> UniqueLock;
  typedef boost::upgrade_lock<myMutex>           UpgradeLock;
  class myDataIntf;
  typedef boost::shared_ptr<myDataIntf>          myDataIntfPtr;
  typedef boost::container::list<myDataIntfPtr>  myList;

  //******************************************************************************
  // LOCAL CLASS DECLARATIONS
  //******************************************************************************
  class myDataIntf
  {
  public:
     virtual char* getDataType(void) = 0;
  };

  class intData : public myDataIntf
  {
  private:
     int  data;

  public:
     intData(int new_data) : data(new_data){};
     ~intData(void)
     {
        extern int instIntDeletes;

        instIntDeletes++;
     };
     char* getDataType(void)
     {
        return "Int";
     }
     int getData(void)
     {
        return data;
     }
     void setData(int new_data)
     {
        data = new_data;
     }
  };

  class floatData : public myDataIntf
  {
  private:
     float  data;

  public:
     floatData(float new_data) : data(new_data){};
     ~floatData(void)
     {
        extern int instFloatDeletes;

        instFloatDeletes++;
     };
     char* getDataType(void)
     {
        return "Float";
     }
     float getData(void)
     {
        return data;
     }
     void setData(float new_data)
     {
        data = new_data;
     }
  };

  //******************************************************************************
  // LOCALLY DEFINED GLOBAL DATA
  //******************************************************************************

  // Define one mutex per linked list
  myMutex  g_listMutex;
  myMutex  g_intListMutex;
  myMutex  g_floatListMutex;

  int instReadFloatCount[NUM_THREADS];
  int instWriteFloatCount[NUM_THREADS];
  int instReadIntCount[NUM_THREADS];
  int instWriteIntCount[NUM_THREADS];
  int instFloatDeletes = 0;
  int instIntDeletes = 0;

  //******************************************************************************
  // Worker Thread function
  //******************************************************************************
  void workerFunc(int inst, myList* assigned_list, myMutex*  mutex)
  {
     boost::posix_time::millisec workTime(1*inst);
     myList::iterator            i;
     int                         add_delay = 0;
     int                         add_f_count = 0;
     int                         add_i_count = 0;

     instReadFloatCount[inst] = 0;
     instReadIntCount[inst] = 0;
     instWriteIntCount[inst] = 0;
     instWriteFloatCount[inst] = 0;

     mutex->lock();
     cout << "Worker " << inst << ": ";
     for (i =  assigned_list->begin(); i != assigned_list->end(); ++i)
     {
        cout << (*i)->getDataType();
        if ( 0 == strcmp("Float", (*i)->getDataType() ) )
        {
           floatData*  f = (floatData*)i->get();
           cout << " " << f->getData() << " ";
        }
        if ( 0 == strcmp("Int", (*i)->getDataType() ) )
        {
           intData*  f = (intData*)i->get();
           cout << " " << f->getData() << " ";
        }
     }
     cout << endl;
     mutex->unlock();

     // Do some work for 10 seconds.
     for ( int tick = 0; tick < 10000/(1*inst+1); tick++)
     {
        add_delay++;
        boost::this_thread::sleep(workTime);
        if ( inst < (NUM_THREADS/2) )
        {
           // reader - Get a shared lock that allows multiple readers to
           // access the linked list. Upgrade locks act as shared locks
           // until converted to unique locks, at which point the 
           // thread converting to the unique lock will block until
           // all existing readers are done.  New readers will wait
           // after the unique lock is released.
           SharedLock shared_lock(*mutex);

           for (i =  assigned_list->begin(); i != assigned_list->end(); ++i)
           {
              if ( 0 == strcmp("Float", (*i)->getDataType() ) )
              {
                 floatData*  f = (floatData*)i->get();
                 instReadFloatCount[inst]++;
              }
              if ( 0 == strcmp("Int", (*i)->getDataType() ) )
              {
                 intData*  f = (intData*)i->get();
                 instReadIntCount[inst]++;
              }
           }
        }
        else
        {
           // writer - get the upgrade lock that will allow us
           // to make multiple modifications to the linked list
           // without being interrupted by other writers (other writers attempting
           // to get an upgrade lock will block until the writer that
           // has it releases it.)
           UpgradeLock  upgrade_lock(*mutex);

           for (i =  assigned_list->begin(); i != assigned_list->end(); )
           {
              if ( 0 == strcmp("Float", (*i)->getDataType() ) )
              {
                 floatData*   f = (floatData*)i->get();
                 UniqueLock   unique_lock(upgrade_lock); // Convert an existing upgrade lock to unique lock

                 f->setData(f->getData() + 0.123f);
                 assigned_list->push_front(*i); 
                 assigned_list->erase(i++);
                 instWriteFloatCount[inst]++;

                 // While the unique lock is in scope let's do some additional
                 // adds & deletes
                 if ( (add_delay > 100) && (add_f_count < 2) )
                 {
                    if ( add_f_count < 1)
                    {
                       // Delete the first record
                    }
                    else if ( add_f_count < 2)
                    {
                       // Add new item using separate allocations for smart pointer & data
                       assigned_list->insert(assigned_list->end(), new floatData(-(float)(inst*10000+add_f_count)));
                    }
                    else
                    {
                       // Add new item using make_shared function template.  Both objects are created using one allocation.
                       assigned_list->insert(assigned_list->end(), boost::make_shared<floatData>(-(float)(inst*10000+add_f_count)));
                    }
                    add_f_count++;
                 }
              }
              else if ( 0 == strcmp("Int", (*i)->getDataType() ) )
              {
                 intData*     f = (intData*)i->get();
                 UniqueLock   unique_lock(upgrade_lock); // Convert an existing upgrade lock to unique lock

                 f->setData(f->getData() + 123);
                 assigned_list->push_front(*i);
                 assigned_list->erase(i++);
                 instWriteIntCount[inst]++;

                 // While the unique lock is in scope let's do some additional
                 // adds & deletes
                 if ( (add_delay > 100) && (add_i_count < 3) )
                 {
                    if ( add_i_count < 1)
                    {
                       // Delete the first record
                    }
                    else if ( add_i_count < 2)
                    {
                       // Add new item using separate allocations for smart pointer & data
                       assigned_list->insert(assigned_list->end(), new intData(-(int)(inst*10000+add_i_count)));
                    }
                    else
                    {
                       // Add new item using make_shared function template.  Both objects are created using one allocation.
                       assigned_list->insert(assigned_list->end(), boost::make_shared<intData>(-(int)(inst*10000+add_i_count)));
                    }
                    add_i_count++;
                 }
              }
              else
              {
                 ++i;
              }
           }
        }
     }

     cout << "Worker: finished" << " " << inst << endl;
  }

  //******************************************************************************
  // Main Function
  //******************************************************************************
  int main(int argc, char* argv[])
  {
     {
        myList              test_list;
        myList              test_list_ints;
        myList              test_list_floats;
        myList::iterator    i;

        // Fill the main list with some values
        test_list.insert(test_list.end(), new intData(1));
        test_list.insert(test_list.end(), new intData(2));
        test_list.insert(test_list.end(), new intData(3));
        test_list.insert(test_list.end(), new floatData(333.333f));
        test_list.insert(test_list.end(), new floatData(555.555f));
        test_list.insert(test_list.end(), new floatData(777.777f));

        // Display the main list elements and add the specific values
        // for each specialized list containing specific types of elements.
        // The end result is that each object in the main list will also
        // be in the specialized list.
        cout << "test:";
        for (i =  test_list.begin(); i != test_list.end(); ++i)
        {
           cout << " " << (*i)->getDataType();
           if ( 0 == strcmp("Float", (*i)->getDataType() ) )
           {
              floatData*  f = (floatData*)i->get();
              cout << " " << f->getData();
              test_list_floats.insert(test_list_floats.end(), *i);
           }
           if ( 0 == strcmp("Int", (*i)->getDataType() ) )
           {
              intData*  f = (intData*)i->get();
              cout << " " << f->getData();
              test_list_ints.insert(test_list_ints.end(), *i);
           }
        }
        cout << endl;

        // Display the list with float type elements
        cout << "float test:";
        for (i =  test_list_floats.begin(); i != test_list_floats.end(); ++i)
        {
           cout << " " << (*i)->getDataType();
           floatData*  f = (floatData*)i->get();
           cout << " " << f->getData();
        }
        cout << endl;

        // Display the list with integer type elements
        cout << "int test:";
        for (i =  test_list_ints.begin(); i != test_list_ints.end(); ++i)
        {
           cout << " " << (*i)->getDataType();
           intData*  f = (intData*)i->get();
           cout << " " << f->getData();
        }
        cout << endl;

        // NOTE: To reduce mutex bottleneck coupling in a real application it is recommended that
        // each linked list have it's own shareable mutex.
        // I used the same mutex here for all three lists to have the output from each thread
        // appear in a single line. If I use one mutex per thread then it would appear
        // jumbled up and almost unreadable.
        // To use a unique mutex per list enable UNIQUE_MUTEX macro.
        // For this test I did not notice any performance differences, but that will
        // depend largely on how long the unique lock is held.
        boost::thread workerThread0(workerFunc, 0, &test_list,        &MAIN_LIST_MUTEX);
        boost::thread workerThread1(workerFunc, 1, &test_list_ints,   &INT_LIST_MUTEX);
        boost::thread workerThread2(workerFunc, 2, &test_list_floats, &FLOAT_LIST_MUTEX);
        boost::thread workerThread3(workerFunc, 3, &test_list,        &MAIN_LIST_MUTEX);
        boost::thread workerThread4(workerFunc, 4, &test_list_floats, &FLOAT_LIST_MUTEX);
        boost::thread workerThread5(workerFunc, 5, &test_list_ints,   &INT_LIST_MUTEX);
        boost::thread workerThread6(workerFunc, 6, &test_list,        &MAIN_LIST_MUTEX);
        boost::thread workerThread7(workerFunc, 7, &test_list_floats, &FLOAT_LIST_MUTEX);
        boost::thread workerThread8(workerFunc, 8, &test_list,        &MAIN_LIST_MUTEX);
        boost::thread workerThread9(workerFunc, 9, &test_list_ints,   &INT_LIST_MUTEX);
        workerThread0.join();
        workerThread1.join();
        workerThread2.join();
        workerThread3.join();
        workerThread4.join();
        workerThread5.join();
        workerThread6.join();
        workerThread7.join();
        workerThread8.join();
        workerThread9.join();

        cout << "*** Test End ***:";
        for (i =  test_list.begin(); i != test_list.end(); ++i)
        {
           cout << " " << (*i)->getDataType();
           if ( 0 == strcmp("Float", (*i)->getDataType() ) )
           {
              floatData*  f = (floatData*)i->get();
              cout << " " << f->getData();
           }
           if ( 0 == strcmp("Int", (*i)->getDataType() ) )
           {
              intData*  f = (intData*)i->get();
              cout << " " << f->getData();
           }
        }
        cout << endl;
        cout << "float test end:";
        for (i =  test_list_floats.begin(); i != test_list_floats.end(); ++i)
        {
           cout << " " << (*i)->getDataType();
           floatData*  f = (floatData*)i->get();
           cout << " " << f->getData();
        }
        cout << endl;
        cout << "int test end:";
        for (i =  test_list_ints.begin(); i != test_list_ints.end(); ++i)
        {
           cout << " " << (*i)->getDataType();
           intData*  f = (intData*)i->get();
           cout << " " << f->getData();
        }
        cout << endl;
        cout << "*** thread counts***" << endl;
        for ( int idx = 0; idx < NUM_THREADS; idx++)
        {
           cout << "    thread " << idx;
           cout << ": int rd(" << setw(NUM_WIDTH) << instReadIntCount[idx];
           cout << ") int wr(" << setw(NUM_WIDTH) << instWriteIntCount[idx];
           cout << ") flt rd(" << setw(NUM_WIDTH) << instReadFloatCount[idx];
           cout << ") flt wr(" << setw(NUM_WIDTH) << instWriteFloatCount[idx];
           cout << ")" <<  endl;
        }
     }

     // All records in the linked list have now been deallocated automatically(due to smart pointer)
     // as the linked list objects have been destroyed due to going out of scope.  
     cout << "*** Object Deletion counts***" << endl;
     cout << "  int deletes: " << instIntDeletes << endl;
     cout << "float deletes: " << instFloatDeletes << endl;

     return 0;
  }

person Ricardo Andujar    schedule 07.11.2012    source источник


Ответы (1)


Сложность boost::container::list::erase(const_iterator) - это амортизированное постоянное время (ищите iterator erase(const_iterator p) в boost / container / list.hpp). Таким образом, при вызове этой функции повторный поиск не выполняется.

Однако я хотел бы сделать несколько замечаний.

Совсем недавно эксперт по параллелизму посоветовал мне использовать концепцию UpgradeLockable только после выявления в ней явной необходимости; т.е. после профилирования. Блокировки, связанные с upgrade_mutexes, обязательно более сложны, чем простые boost::mutex::scoped_lock или std::lock_guard, и, следовательно, страдают от более низкой производительности.

В вашем примере вы, вероятно, обнаружите, что нет значительной разницы в производительности между вашей текущей (более сложной) настройкой и заменой upgrade_mutex на mutex и просто всегда исключительно блокировкой.

Другой момент заключается в том, что ваши комментарии к коду, кажется, указывают на то, что вы думаете, что несколько экземпляров boost::upgrade_lock на данном upgrade_mutex могут сосуществовать. Это не тот случай. Только одиночный поток может одновременно содержать upgrade_lock.

Несколько других потоков могут удерживать shared_lock, пока удерживается upgrade_lock, но эти shared_lock должны быть освобождены до того, как upgrade_lock можно будет обновить до уникального.

Дополнительную информацию см. В документации по ускорению на Концепция UpgradeLockable.


Редактировать

Чтобы подтвердить мысль, высказанную в комментариях ниже, следующий пример показывает, что новые shared_locks могут быть получены, пока существует upgrade_lock, но не пока существует upgrade_to_unique_lock (проверено с повышением 1.51):

#include <iostream>
#include <vector>

#include <boost/thread.hpp>
#include <boost/date_time.hpp>
#include <boost/thread/locks.hpp>
#include <boost/thread/shared_mutex.hpp>

typedef boost::shared_lock<boost::upgrade_mutex>            SharedLock;
typedef boost::upgrade_to_unique_lock<boost::upgrade_mutex> UniqueLock;
typedef boost::upgrade_lock<boost::upgrade_mutex>           UpgradeLock;

boost::upgrade_mutex the_mutex;

void Write() {
  UpgradeLock upgrade_lock(the_mutex);
  std::cout << "\tPreparing to write\n";
  boost::this_thread::sleep(boost::posix_time::seconds(1));
  UniqueLock unique_lock(upgrade_lock);
  std::cout << "\tStarting to write\n";
  boost::this_thread::sleep(boost::posix_time::seconds(5));
  std::cout << "\tDone writing.\n";
}

void Read() {
  SharedLock lock(the_mutex);
  std::cout << "Starting to read.\n";
  boost::this_thread::sleep(boost::posix_time::seconds(1));
  std::cout << "Done reading.\n";
}

int main() {
  // Start a read operation
  std::vector<boost::thread> reader_threads;
  reader_threads.push_back(std::move(boost::thread(Read)));
  boost::this_thread::sleep(boost::posix_time::milliseconds(250));

  // Start a write operation.  This will block trying to upgrade
  // the UpgradeLock to UniqueLock since a SharedLock currently exists.
  boost::thread writer_thread(Write);

  // Start several other read operations.  These won't be blocked
  // since only an UpgradeLock and SharedLocks currently exist.
  for (int i = 0; i < 25; ++i) {
    boost::this_thread::sleep(boost::posix_time::milliseconds(100));
    reader_threads.push_back(std::move(boost::thread(Read)));
  }

  // Join the readers.  This allows the writer to upgrade to UniqueLock
  // since it's currently the only lock.
  for (auto& reader_thread : reader_threads)
    reader_thread.join();

  // Start a new read operation.  This will be blocked since a UniqueLock
  // currently exists.
  boost::this_thread::sleep(boost::posix_time::milliseconds(100));
  boost::thread reader_thread(Read);

  writer_thread.join();
  reader_thread.join();

  return 0;
}
person Fraser    schedule 07.11.2012
comment
Привет, Фрейзер, я исправил комментарий о том, как работают замки. Насколько я понимаю из документации по бусту, несколько общих блокировок и одна блокировка обновления могут сосуществовать. Дело в том, что новые общие блокировки, пока блокировка обновления активна, а блокировка до тех пор, пока блокировка обновления не будет снята, это гарантирует, что блокировка обновления в конечном итоге может быть преобразована в уникальную блокировку. Несколько авторов, пытающихся получить блокировку обновления, будут блокироваться до тех пор, пока не будет снята текущая блокировка обновления, но только один может получить ее в любое время. - person Ricardo Andujar; 07.11.2012
comment
Кроме того, что касается производительности, я понимаю, что с этим связаны накладные расходы, но в моем приложении большую часть времени будут только блокировки чтения и случайные блокировки записи. Основная цель состояла в том, чтобы минимизировать влияние на производительность потоков, выполняющих итерацию по списку с блокировками считывателя. Используя общую блокировку чтения, несколько потоков могут выполнять итерацию по одному и тому же списку, при этом конкуренция обрабатывается приоритетом потока (это WC7, поэтому планировщик строго вытесняет на основе приоритета). - person Ricardo Andujar; 07.11.2012
comment
@RicardoAndujar Нет ничего, что могло бы помешать получению новых shared_locks, пока существует upgrade_lock. Более того, попытка обновления до unique_lock будет заблокирована до тех пор, пока не будут освобождены все shared_locks. Пока писатель ожидает обновления блокировки, большее количество читателей могут получить shared_locks. Таким образом, вы можете получить случай, когда существует так много читателей, что существует всегда shared_lock, что означает, что у автора никогда не будет возможности получить unique_lock. И оттуда вы попадаете на территорию try_lock или временных вариантов - более сложную. - person Fraser; 08.11.2012
comment
Фрейзер, единственное, что мне нужно сделать, это выяснить, есть ли необходимость инкапсулировать код списка с помощью sw для автоматического выполнения блокировки, но на данный момент я вижу только необходимость сделать мьютекс частью производного класса списка . Спасибо за ответ. - person Ricardo Andujar; 08.11.2012
comment
Я проверил это, и действительно оказалось, что новые считыватели блокируются обновленным замком. Я считаю, что ваше описание соответствует более ранней версии блокировки обновления, но мне нужно будет выполнить еще несколько тестов, чтобы подтвердить это. Я основываю этот ответ на информации отладчика, которая показывает одновременно активные блокировки считывателя и ожидающие блокировки считывателя, в то время как существует только блокировка обновления. - person Ricardo Andujar; 08.11.2012
comment
@RicardoAndujar Я добавил пример к своему ответу. - person Fraser; 08.11.2012