производитель/потребитель, использующий ускоренные потоки и циклические зависания буфера

Я понял. Глупая ошибка с моей стороны, я на самом деле не удалял элемент из очереди, я просто читал первый элемент. Я изменил код, и код ниже не работает. Спасибо всем за помощь.

Я пытаюсь реализовать проблему производителя-потребителя, используя boost, который на самом деле является частью более крупного проекта. Я реализовал программу из примеров в Интернете и даже нашел здесь некоторую помощь. Однако в настоящее время мой код просто зависает. Основываясь на одном хорошем совете, я решил использовать буфер Boost ciruclar для хранения моих данных между производителем и потребителем. Там было много похожего кода, и я смог объединить идеи из них и написать что-то свое. Тем не менее, у меня все еще, кажется, та же проблема, что и раньше (моя программа просто зависает). Я думал, что не совершаю той же ошибки, что и раньше..

Мой код приведен ниже, я вынул свой более ранний код, где у меня есть только мой собственный список ссылок.

Заголовок буфера:

#ifndef PCDBUFFER_H
#define PCDBUFFER_H

#include <pcl/io/pcd_io.h>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition.hpp>
#include <boost/circular_buffer.hpp>

class pcdBuffer
{
    public:
        pcdBuffer(int buffSize);
        void put(int data);
        int get();
        bool isFull();
        bool isEmpty();
        int getSize();
        int getCapacity();
    private:
        boost::mutex bmutex;
        boost::condition_variable buffEmpty;
        boost::condition_variable buffFull;
        boost::circular_buffer<int> buffer;
};


#endif

Источник буфера (только соответствующие части):

#include "pcdBuffer.h"
#include <iostream>

//boost::mutex io_mutex;

pcdBuffer::pcdBuffer(int buffSize)
{
    buffer.set_capacity(buffSize);
}

void pcdBuffer::put(int data)
{
    {
        boost::mutex::scoped_lock buffLock(bmutex);
        while(buffer.full())
        {
            std::cout << "Buffer is full" << std::endl;
            buffFull.wait(buffLock);
        }
        buffer.push_back(data);
    }
    buffEmpty.notify_one();
}

int pcdBuffer::get()
{
    int data;
    {
        boost::mutex::scoped_lock buffLock(bmutex);
        while(buffer.empty())
        {
            std::cout << "Buffer is empty" << std::endl;
            buffEmpty.wait(buffLock);
        }
        data = buffer.front();
            buffer.pop_front();
    }
    buffFull.notify_one();
    return data;
}

основной драйвер для кода:

#include <iostream>
#include <boost/thread/mutex.hpp>
#include <boost/thread/thread.hpp>
#include <unistd.h>
#include "pcdBuffer.h"

pcdBuffer buff(100);

void producer()
{
    int i = 10;
    while (true)
    {
        buff.put(i);
        i++;
    }
}

void consumer()
{
    int i;
    while(true)
    {
        i = buff.get();
        std::cout << "Data: " << i << std::endl;
    }
}

int main(int argc, char** argv)
{
    std::cout << "Starting main...." << std::endl;
    std::cout << "Buffer Details: " << std::endl;
    std::cout << "Capacity: " << buff.getCapacity() << ", isEmpty: " << buff.isEmpty() << ", isFull: " << buff.isFull() << std::endl;
    boost::thread cons(consumer);
    sleep(5);
    boost::thread prod(producer);
    prod.join();
    cons.join();
    return 0;
}

Моя емкость буфера правильно инициализирована до 100. Поток-потребитель ждет и сообщает, что «буфер пуст» в течение 5 секунд, но после этого я просто получаю «буфер заполнен» из метода put и «Данные: 10» из потребительская функция, чередующаяся с stdout. Как видите, 10 — это первый элемент, который я вставил. Кажется, что буфер заполняется и не уведомляет потребителя, но я проверил свои блокировки и думаю, что они верны. Любая помощь в этом очень ценится.

Вот ссылка ссылок, из которых я написал этот код:

http://www.boost.org/ doc/libs/1_49_0/libs/circular_buffer/doc/circular_buffer.html#classboost

#ifndef PCDBUFFER_H
#define PCDBUFFER_H

#include <pcl/io/pcd_io.h>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition.hpp>
#include <boost/circular_buffer.hpp>

class pcdBuffer
{
    public:
        pcdBuffer(int buffSize);
        void put(int data);
        int get();
        bool isFull();
        bool isEmpty();
        int getSize();
        int getCapacity();
    private:
        boost::mutex bmutex;
        boost::condition_variable buffEmpty;
        boost::condition_variable buffFull;
        boost::circular_buffer<int> buffer;
};


#endif
1circular__buffer_19ba12c0142a21a7d960877c22fa3ea00

http://www.drdobbs.com/article/print?articleId=184401518&siteSectionName=cpp

Поточно-безопасная реализация циклического буфера


person shaun    schedule 17.04.2012    source источник
comment
Взгляните на это: justsoftwaresolutions.co.uk/threading/   -  person stefaanv    schedule 17.04.2012
comment
Эта статья действительно интересна, однако еще одна причина, по которой я хочу избежать очередей stl, — это скорость. В конце концов, структура данных, которой я буду делиться между потоками, представляет собой облако точек, представляющее захват кадра с камеры, похожей на kinect. Я хочу захватывать кадры со скоростью 30 кадров в секунду и сохранять их на диск. Таким образом, я хотел бы минимизировать свои накладные расходы, насколько это возможно. Это еще одна причина, по которой я избегал очередей STL. Я попытался разблокировать свои блокировки, прежде чем уведомить переменную условия, как предлагается в статье, но она все еще зависает. Я обновил свой код здесь.   -  person shaun    schedule 17.04.2012
comment
Сначала заставьте это работать, затем сделайте это быстро. Избегайте предположений о том, какие части вашего кода/дизайна будут медленными. Избегайте преждевременной оптимизации. Сосредоточьте свою работу на важной части, по возможности используя стандартные компоненты. Если он окажется слишком медленным, измерьте, какая часть работает медленно, ЗАТЕМ оптимизируйте эту часть.   -  person Johannes S.    schedule 17.04.2012
comment
Это был хороший совет. Я скопировал и реализовал код из статьи, на которую ссылается stefaanv, и его гораздо легче понять и понять, чем мой уродливый код. Я попробую это с данными облака точек, и, надеюсь, это будет достаточно быстро для того, что мне нужно сделать.   -  person shaun    schedule 17.04.2012


Ответы (3)


Прежде всего, вместо того, чтобы писать свой собственный список, вы могли бы просто обернуть std::list в pcdQueue вместо того, чтобы писать свой собственный. Это правильно, что std::list не является потокобезопасным как есть, но вы в любом случае предоставляете необходимые примитивы синхронизации в своем классе.

Причина, по которой ваша программа зависает: вы держите блокировку и заполняете очередь, пока она не заполнится. Ваше уведомление потребителя через notify_one бесполезно, потому что ваш потребитель снова заблокируется, поскольку мьютекс уже занят (блокировкой в ​​​​производителе).

Когда вы, наконец, снимаете блокировку (когда очередь заполнена), ожидая condition_variable, вы не будите своего потребителя, поэтому и ваш потребитель, и ваш производитель блокируются, и ваша программа зависает.

Измените его на:

void pcdQueue::produce()
{
    int i=0;
    while(true)
    {
        {
            boost::mutex::scoped_lock lock(qmutex);
            while( ! qlen < buffSize ) {
                std::cout << "Queue is full" << std::endl;
                full.wait(lock);
            }

            enqueue(i); // or myList.push_back(i) if you switch to std::list
        }

        empty.notify_one();


    }
}

У вас те же проблемы в вашем методе consume(). Измените его на:

pcdFrame* pcdQueue::consume()
{
    pcdFrame *frame;

    {
        boost::mutex::scoped_lock lock(qmutex);
        while( qlen == 0 ) {
            std::cout << "Queue is empty" << std::endl;
            empty.wait(lock);
        }

        frame = dequeue();
    }
    full.notify_one();

    return frame;
}

В общем, будьте осторожны и учтите, что уведомления действуют только в том случае, если кто-то ждет. В противном случае они «теряются». Кроме того, обратите внимание, что вам не нужно держать мьютекс заблокированным при вызове notify_one (на самом деле это может привести к дополнительным затратам на переключение контекста, потому что вы пробуждаете другой поток, который затем будет ожидать мьютекс, который в настоящее время (все еще ) заблокирован (вами).Поэтому сначала освободите мьютекс, а затем прикажите другому потоку продолжить.

Обратите внимание, что оба потока работают бесконечно, поэтому ваша основная программа все равно зависнет на первом join() и никогда не завершится. Вы можете включить флаг остановки, который вы проверяете в своих while-циклах, чтобы сообщить своим потокам о завершении.

person Johannes S.    schedule 17.04.2012

Boost теперь предлагает тип очереди производитель/потребитель в разделе без блокировки, который в значительной степени не имеет блокировки, хотя он МОЖЕТ блокироваться, если очередь заполняется.

Вы найдете документацию здесь:

http://www.boost.org/doc/libs/1_54_0/doc/html/lockfree.html

Это довольно недавнее дополнение, поэтому я не думаю, что оно еще является частью многих стандартных пакетов. С другой стороны, похоже, что в основном это заголовки, и все они зависят от довольно низкоуровневого системного материала, который долгое время был в бусте.

person Jugglist    schedule 09.08.2013

Я столкнулся с той же проблемой зависания. Благодаря ответу Йоханнеса я смог заставить его работать полностью. Однако мне пришлось использовать full.wait_for(lock, boost::chrono::milliseconds(100)); как в потребителе, так и в производителе, чтобы предотвратить зависание при использовании очень короткого кольцевого буфера (3-4 элемента).

Кроме того, вместо while (true) я использовал while (buffer->empty().

Наконец-то все работает надежно и быстро.

person Terry    schedule 27.01.2014