Boost::interprocess очередь сообщений Race Condition при создании?

Я пытаюсь отладить спорадические нарушения доступа, которые происходят внутри очереди сообщений boost::interprocess. (нарушение доступа при чтении адреса в области разделяемой памяти).

Окружение: Boost 1.54, VС++ 2010. Встречается как в сборках Debug, так и в Release.

Это всегда происходит на или около строки 854 (в случае приема) в message_queue.hpp: Комментарии были добавлены мной

      recvd_size     = top_msg.len; // top_msg points to invalid location

Или строка 756 (в случае отправки)

BOOST_ASSERT(free_msg_hdr.priority == 0); // free_msg_hdr points to invalid location

Похоже, что это связано с созданием очереди сообщений. Если очередь сообщений создана "правильно" (т. е. без возможного состояния гонки), ошибка никогда не возникает. В противном случае это может произойти в timed_receive() или timed_send() в очереди в случайные моменты времени.

Я придумал короткий пример, представляющий проблему: К сожалению, я не могу запустить его на Coliru, так как для этого требуется два процесса. Один должен запускаться без каких-либо параметров, второй с любым единственным параметром. После нескольких запусков произойдет сбой одного из процессов в message_queue.

#include <iostream>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/thread.hpp>
#include <boost/assert.hpp>
#include <boost/date_time.hpp>

using namespace boost::interprocess;
using namespace boost::posix_time;
using boost::posix_time::microsec_clock; // microsec_clock is ambiguous between boost::posix_time and boost::interprocess. What are the odds?

int main(int argc, wchar_t** argv)
{
    while(true)
    {
        int proc = 0;
        message_queue* queues[2] = {NULL, NULL};
        std::string names[] = {"msgq0", "msgq1"};
        if(1 == argc)
        {
            proc = 0;
            message_queue::remove(names[0].c_str());
            if(NULL != queues[0]) { delete queues[0]; queues[0] = NULL; }
            queues[0] = new message_queue(open_or_create, names[0].c_str(), 128, 10240);

            bool bRet = false;
            do
            {
                try
                {
                    if(NULL != queues[1]) { delete queues[1]; queues[1] = NULL; }
                    queues[1]=new message_queue(open_only, names[1].c_str());
                    bRet = true;
                }
                catch(const interprocess_exception&)
                {
                    //boost::this_thread::sleep(boost::posix_time::milliseconds(2));
                    delete queues[1];
                    queues[1] = NULL; 
                    continue;
                }
            }while(!bRet);

        }
        else
        {
            proc = 1;
            message_queue::remove(names[1].c_str());
            if(NULL != queues[1]) { delete queues[1]; queues[1] = NULL; }
            queues[1] = new message_queue(open_or_create, names[1].c_str(), 128, 10240);

            bool bRet = false;
            do
            {
                try
                {
                    if(NULL != queues[0]) { delete queues[0]; queues[0] = NULL; }
                    queues[0]=new message_queue(open_only, names[0].c_str());
                    bRet = true;
                }
                catch(const interprocess_exception&)
                {
                    //boost::this_thread::sleep(boost::posix_time::milliseconds(2));
                    delete queues[0];
                    queues[0] = NULL;
                    continue;
                }
            }while(!bRet);
        }

        long long nCnt = 0;
        for(int i = 0; i < 1; ++i)
        {
            if(proc)
            {
                std::string sOut;
                sOut = "Proc1 says: Hello ProcA " + std::to_string(nCnt) + " ";
                sOut.resize(10230, ':');
                for(int n = 0; n < 3; ++n)
                {
                    queues[1]->timed_send(sOut.data(), sOut.size(), 0, ptime(boost::posix_time::microsec_clock::universal_time()) + milliseconds(1));
                }

                bool bMessage = false;
                for(int n = 0; n < 3; ++n)
                {
                    size_t nRec; unsigned int nPrio;
                    std::string sIn; sIn.resize(10240);
                    bMessage = queues[0]->timed_receive(&sIn[0], 10240, nRec, nPrio, ptime(boost::posix_time::microsec_clock::universal_time()) + milliseconds(1));
                    if(bMessage)
                    {
                        sIn.resize(nRec);
                        //std::cout << sIn << " ";
                    }
                }
                if(bMessage)
                {
                    //std::cout << std::endl;
                }
            }
            else
            {
                std::string sOut;
                sOut = "Proc0 says: Hello Procccccccdadae4325a " + std::to_string(nCnt);
                sOut.resize(10240, '.');
                for(int n = 0; n < 3; ++n)
                {
                    queues[0]->timed_send(sOut.data(), sOut.size(), 0, ptime(boost::posix_time::microsec_clock::universal_time()) + milliseconds(1));
                }

                bool bMessage = false;
                for(int n = 0; n < 3; ++n)
                {
                    size_t nRec; unsigned int nPrio;
                    std::string sIn; sIn.resize(10240);
                    bMessage = queues[1]->timed_receive(&sIn[0], 10240, nRec, nPrio, ptime(boost::posix_time::microsec_clock::universal_time()) + milliseconds(1));
                    if(bMessage)
                    {
                        sIn.resize(nRec);
                        //std::cout << sIn << " ";
                    }
                }
                if(bMessage)
                {
                    //std::cout << std::endl;
                }
            }

            nCnt++;
            boost::this_thread::sleep(boost::posix_time::milliseconds(10));
        }
    }
    return 0;
}

Я все еще думаю, что, возможно, делаю что-то не так, так как я больше нигде ничего не могу найти об этой проблеме, а библиотеки повышения обычно очень хороши.

Есть ли что-то, что я могу сделать неправильно с использованием message_queue в этом примере?


person namezero    schedule 07.02.2014    source источник


Ответы (1)


Я не думаю, что оба процесса, использующие open_or_create, являются поддерживаемой идиомой. Знаете ли вы об этом тема в списке рассылки? Я не могу найти больше обсуждений, поэтому мне кажется, что в конечном итоге не было сочтено необходимым добавлять управление сроком службы.

Таким образом, вам потребуется синхронизировать создание вручную с помощью boost::interprocess или, возможно, заставив один из процессов повторить попытку open_only очереди, пока другой процесс не создаст ее.

person mockinterface    schedule 07.02.2014
comment
Спасибо! Это то, что делает пример. Каждый процесс создает исходящую очередь (open_or_create) и вращает другую с помощью open_only, как вы предложили. Однако только из-за догадки из вашего поста и ссылки я попытался использовать create_only вместо open_or_create. Сейчас тестирую, скоро вернусь. - person namezero; 07.02.2014
comment
Хорошо, после небольшого тестирования create_only вместе со sleep(1000) в блоке catch для open_only, кажется, смягчает проблему до такой степени, что в рабочем приложении этого не происходит. Однако в тестовом приложении я все же могу это воспроизвести, хотя и гораздо реже. Таким образом, кажется, что существует гонка при создании/открытии очереди сообщений, которую необходимо обойти. Я нахожу поразительным, что это, по-видимому, никогда нигде не всплывало, хотя обсуждение используемых замков вызывает у меня некоторую тошноту по поводу его использования. - person namezero; 07.02.2014
comment
Я могу поискать другие библиотеки очередей IPC, если это произойдет снова, но на данный момент вышеупомянутое решение дает приемлемые результаты, по крайней мере, в рабочем приложении. Я свяжу этот пост с багтрекером boost; может быть у кого-то есть какой-то вклад там. - person namezero; 07.02.2014