Потокобезопасный кольцевой буфер без копирования для больших массивов

Для обработки сигналов на больших массивах (10^7 элементов) я использую разные потоки, связанные с кольцевыми буферами. К сожалению, слишком много времени требуется просто для копирования данных в буфер и из него. Текущая реализация основана на boost::lockfree::spsc_queue.

Поэтому я ищу решение для обмена владельцами векторов между потоками и буфером, используя unique_ptr для векторов (см. прикрепленный рисунок: переключение указателя между потоками и очередью).

Перемещение интеллектуальных указателей не соответствует моим потребностям, потому что мне нужно постоянно выделять память во время выполнения для новых векторных элементов. Эти накладные расходы больше, чем копирование данных.

Я пропустил недостаток в этом дизайне?

Существуют ли потокобезопасные или даже неблокирующие реализации кольцевого буфера, позволяющие выполнять операции подкачки для push и pop?

Изменить: я изменил кольцевой буфер блокировки, чтобы поменять местами unique_ptr. Прирост производительности огромный. Хотя это не похоже на элегантное решение. Какие-нибудь рекомендации?

// https://github.com/embeddedartistry/embedded-resources/blob/master/examples/cpp/circular_buffer.cpp

#include <memory>
#include <mutex>

template <typename T, int SIZE>
class RingbufferPointer {
typedef std::unique_ptr<T> TPointer;
public:
    explicit RingbufferPointer() {
        // create objects
        for (int i=0; i<SIZE; i++) {
            buf_[i] = std::make_unique<T>();
        }
    }

    bool push(TPointer &item) {
        std::lock_guard<std::mutex> lock(mutex_);
        if (full())
            return false;

        std::swap(buf_[head_], item);

        if (full_)
            tail_ = (tail_ + 1) % max_size_;

        head_ = (head_ + 1) % max_size_;
        full_ = head_ == tail_;

        return true;
    }

    bool pop(TPointer &item) {
        std::lock_guard<std::mutex> lock(mutex_);
        if (empty())
            return false;

        std::swap(buf_[tail_], item);

        full_ = false;
        tail_ = (tail_ + 1) % max_size_;

        return true;
    }

    void reset() {
        std::lock_guard<std::mutex> lock(mutex_);
        head_ = tail_;
        full_ = false;
    }

    bool empty() const {
        return (!full_ && (head_ == tail_));
    }

    bool full() const {
        return full_;
    }

    int capacity() const {
        return max_size_;
    }

    int size() const {
        int size = max_size_;

        if(!full_) {
            if(head_ >= tail_)
                size = head_ - tail_;
            else
                size = max_size_ + head_ - tail_;
        }

        return size;
    }

private:
    TPointer buf_[SIZE];

    std::mutex mutex_;
    int head_ = 0;
    int tail_ = 0;
    const int max_size_ = SIZE;
    bool full_ = 0;
};

person luxderfux    schedule 06.09.2018    source источник
comment
Упомянутая очередь ускорения имеет очень строгий подход к элементам push и pop, и AFAIK не позволяет получить элемент, обработать его и освободить для перезаписи. Может быть, вы смешиваете подход к пулу с ответом Майка. stackoverflow.com/a/52204403/9293869   -  person JackGrinningCat    schedule 06.09.2018
comment
не понимаю (для меня) для чего нужно подкачивать указатели или элементы внутри циркулярного буфера. у вас есть один производитель и потребитель? если да, то почему производитель не может считывать данные непосредственно в позицию записи, а потребитель напрямую обрабатывать данные из позиции чтения.   -  person RbMm    schedule 07.09.2018
comment
Вам нужно точно обрабатывать элементы в порядке FIFO? в любом случае это можно сделать с помощью кода без блокировки без мьютекса   -  person RbMm    schedule 07.09.2018
comment
@RdMm: Да, только SPSC в порядке FIFO. Что вы имеете в виду под записью напрямую в FIFO? Вы можете это быть потокобезопасным? Что произойдет, если потребитель захочет получить доступ к позиции записи, пока она записана? Как мне заблокировать этот доступ, но при этом получить потокобезопасную или даже свободную от блокировки очередь?   -  person luxderfux    schedule 07.09.2018
comment
@luxderfux - понимаете, возможно реализовать SPSC в порядке FIFO. безопасный для протектора и без замков. без мьютекса   -  person RbMm    schedule 07.09.2018


Ответы (4)


Перемещение интеллектуальных указателей не соответствует моим потребностям, потому что мне нужно постоянно выделять память во время выполнения для новых векторных элементов.

Не обязательно так, если вы предварительно выделяете достаточно места для хранения и реализуете собственное управление памятью в стиле простого отдельного хранилища, также известного как пулирование.

Если вы сделаете это, ничто не помешает вам поменяться местами, и вы сможете сохранить существующую архитектуру, используя любой кольцевой буфер, который поддерживает замену элементов, и сохраните ту же потокобезопасность, которая у вас была раньше. Вы можете проверить вариант просто использование boost::pool вместо реализации собственного.

person Geezer    schedule 06.09.2018
comment
Отличная идея. Обычно я устанавливаю размеры своих векторов во время инициализации. Таким образом, с подходом объединения я экономлю время на распределение, но мне приходится снова инициализировать. Общий измеренный прирост производительности при использовании пула незначителен. Поэтому я должен поддерживать свои объекты в живых. Есть ли другой подходящий подход? - person luxderfux; 07.09.2018
comment
@luxderfux Подождите, что именно вам придется повторно инициализировать? - person Geezer; 07.09.2018
comment
Используя векторы, я устанавливаю размер в своих конструкторах следующим образом: boost::object_pool<std::vector<float>> pool{10000000, 0}; std::vector<float> *vec = pool.construct(1000000); Или я неправильно понимаю концепцию? - person luxderfux; 07.09.2018
comment
@luxderfux Хорошо, это уже в том направлении, в котором я стремился с этим ответом. Итак, вы ищете другой подходящий подход, который улучшит этот способ? - person Geezer; 07.09.2018
comment
Итак, как я понял: объединение в пул помогает мне предварительно выделить память и ускорить доступ к ней во время выполнения. Но все же инициализация членов класса является дорогостоящей задачей из-за размеров векторов. Может быть, можно сохранить объекты живыми в каком-то контейнере, который позволяет заимствовать и возвращать эти объекты. Или есть другой способ быстрой инициализации векторов? - person luxderfux; 07.09.2018
comment
Под пулом я имел в виду именно это: вы никогда ничего не удаляете, а просто возвращаете это в пул для следующего запроса (взять тот же физический, предварительно выделенный и уже инициализированный объект). Дайте мне знать, если вы хотите, чтобы я чтобы уточнить, поэтому я просто добавлю его к исходному ответу. - person Geezer; 07.09.2018
comment
Да пожалуйста, звучит многообещающе :) - person luxderfux; 07.09.2018
comment
Давайте продолжим обсуждение в чате. - person Geezer; 07.09.2018
comment
@luxderfux Извините, я ответил вам в обсуждении (давно), но не уверен, как это работает ... Комментарий был: Хорошо, поэтому для этого мне нужно иметь четкое представление о вашем сценарии: Согласно вашему рисунку ваш продюсер помещает в очередь целый большой вектор, и он остается там до тех пор, пока потребитель не вытолкнет его полностью, верно? Если это так, то объекты, которые вы будете менять местами, сами по себе являются векторами, верно? Когда вы говорите об инициализации членов класса, вы имеете в виду членов каждого элемента вектора? - person Geezer; 13.09.2018
comment
Спасибо за вопрос. Большой вектор был самым простым случаем. В конце концов, я хочу передавать объекты, которые капсулируют большие массивы. Мне удалось построить быстрый и грязный контейнер для пула, который пока соответствует моим потребностям. Так что спасибо за идею объединения очереди с пулом. Кажется, работает :) - person luxderfux; 14.09.2018

если я правильно понимаю вашу задачу - вам нужно 2 контейнера:

  • Thread-safe и lock-free пул для свободных элементов — чтобы не выделять/освобождать его каждый раз. нажатие и выталкивание не требуют ожидания.
  • Потокобезопасная и неблокируемая очередь FIFO с одной записью/одним чтением, отправка и извлечение без ожидания.

с этим вы можете сделать следующее:

  • вначале вы выделяете N элементов и помещаете их в пул.
  • Производитель освобождает элемент из пула (вместо этого выделяет память)
  • подготовить данные об элементе
  • поместите его в очередь FIFO
  • если свободных предметов в пуле нет - ждать сигнала от Потребителя

  • Потребительский всплывающий элемент из очереди FIFO
  • данные элемента обработки
  • отправить элемент обратно в пул (вместо того, чтобы освободить память)
  • если очередь пуста - ждать сигнала от производителя

Очередь FIFO может быть реализована следующим образом:

class CyclicBufer
{
    struct alignas(8) Position 
    {
        ULONG _begin, _data_size;
    };

    std::atomic<Position> _pos;
    void** _items;

    ULONG _buf_size;

public:

    // Requires: only one thread is allowed to push data to the CyclicBufer
    bool push(void* item, bool* bWasEmpty = 0);

    // Requires: only one thread is allowed to pop data to the CyclicBufer
    bool pop(void** pitem, bool* bNotEmpty = 0);

    ~CyclicBufer()
    {
        if (_items)
        {
            delete [] _items;
        }
    }

    CyclicBufer() : _items(0), _buf_size(0)
    {
        _pos._My_val._begin = 0, _pos._My_val._data_size = 0;
    }

    bool create(ULONG buf_size)
    {
        if (_items = new(std::nothrow) void*[buf_size])
        {
            _buf_size = buf_size;
            return true;
        }

        return false;
    }

    bool is_empty()
    {
        Position current_pos = _pos.load(std::memory_order_relaxed);

        return !current_pos._data_size;
    }
};

bool CyclicBufer::push(void* item, bool* bWasEmpty /*= 0*/)
{
    Position current_pos = _pos.load(std::memory_order_relaxed);

    if (current_pos._data_size >= _buf_size) return false;

    // (_pos._begin + _pos._data_size) % _buf_size never changed in pop
    _items[(current_pos._begin + current_pos._data_size) % _buf_size] = item;

    for (;;)
    {
        Position new_pos = {
            current_pos._begin, current_pos._data_size + 1
        };

        if (_pos.compare_exchange_weak(current_pos, new_pos, std::memory_order_release))
        {
            if (bWasEmpty) *bWasEmpty = current_pos._data_size == 0;
            return true;
        }
    }
}

bool CyclicBufer::pop(void** pitem, bool* bNotEmpty /*= 0*/)
{
    Position current_pos = _pos.load(std::memory_order_acquire);

    if (!current_pos._data_size) return false;

    // current_pos._begin never changed in push
    void* item = _items[current_pos._begin];

    for (;;)
    {
        Position new_pos = {
            (current_pos._begin + 1) % _buf_size, current_pos._data_size - 1
        };

        if (_pos.compare_exchange_weak(current_pos, new_pos, std::memory_order_relaxed))
        {
            if (bNotEmpty) *bNotEmpty = new_pos._data_size != 0;
            *pitem = item;
            return true;
        }
    }
}

для потокобезопасного и безблокировочного пула реализация в Windows может использовать InterlockedPushEntrySList и InterlockedPopEntrySList, но, конечно, возможно реализовать это апи и себя:

struct list_entry {
    list_entry *Next;
};

#if defined(_M_X64) || defined(_M_ARM64)
#define MACHINE_64
#endif

struct alignas(sizeof(PVOID)*2) list_head 
{  
    union {
        struct {
            INT_PTR DepthAndSequence;
            union {
                list_entry* NextEntry;
                INT_PTR iNextEntry;
            };
        };
        __int64 value; // for 32-bit only
    };

    void init()
    {
        iNextEntry = 0, DepthAndSequence = 0;
    }

    bool push(list_entry* entry)
    {
        list_head current = { { DepthAndSequence, NextEntry } }, new_head;

        for (;;)
        {
            entry->Next = current.NextEntry;
            new_head.NextEntry = entry;
            new_head.DepthAndSequence = current.DepthAndSequence + 0x10001;

#ifdef MACHINE_64
            if (_INTRIN_RELEASE(_InterlockedCompareExchange128)(
                &DepthAndSequence, 
                new_head.iNextEntry, new_head.DepthAndSequence, 
                &current.DepthAndSequence))
            {
                // return is list was empty before push
                return !current.NextEntry;
            }
#else
            new_head.value = _INTRIN_RELEASE(_InterlockedCompareExchange64)(
                &value, new_head.value, current.value);

            if (new_head.value == current.value)
            {
                // return is list was empty before push
                return !current.NextEntry;
            }

            current.value = new_head.value;
#endif
        }
    }

    list_entry* pop()
    {
        list_head current = { { DepthAndSequence, NextEntry } }, new_head;

        for (;;)
        {
            list_entry* entry = current.NextEntry;

            if (!entry)
            {
                return 0;
            }

            // entry must be valid memory
            new_head.NextEntry = entry->Next;
            new_head.DepthAndSequence = current.DepthAndSequence - 1;

#ifdef MACHINE_64
            if (_INTRIN_ACQUIRE(_InterlockedCompareExchange128)(&DepthAndSequence, 
                new_head.iNextEntry, new_head.DepthAndSequence, 
                &current.DepthAndSequence))
            {
                return entry;
            }
#else
            new_head.value = _INTRIN_ACQUIRE(_InterlockedCompareExchange64)(
                &value, new_head.value, current.value);

            if (new_head.value == current.value)
            {
                return entry;
            }

            current.value = new_head.value;
#endif
        }
    }
};

#pragma warning(disable : 4324)

template <class _Ty>
class FreeItems : list_head
{
    void* _items;

    union Chunk {
        list_entry entry;
        char buf[sizeof(_Ty)];
    };

public:

    ~FreeItems()
    {
        if (_items)
        {
            delete [] _items;
        }
    }

    FreeItems() : _items(0)
    {
        init();
    }

    bool create(ULONG count)
    {
        if (Chunk* items = new(std::nothrow) Chunk[count])
        {
            _items = items;

            union {
                list_entry* entry;
                Chunk* item;
            };

            item = items;

            do 
            {
                list_head::push(entry);

            } while (item++, --count);

            return true;
        }

        return false;
    }

    _Ty* pop()
    {
        return (_Ty*)list_head::pop();
    }

    bool push(_Ty* item)
    {
        return list_head::push((list_entry*)item);
    }
};

с этими двумя контейнерами демонстрационный/тестовый код может выглядеть так (код для окон, но основной - как мы используем пул и очередь)

struct BigData 
{
    ULONG _id;
};

struct CPData : CyclicBufer, FreeItems<BigData>
{
    HANDLE _hDataEvent, _hFreeEvent, _hConsumerStop, _hProducerStop;

    ULONG _waitReadId, _writeId, _setFreeCount, _setDataCount;

    std::_Atomic_integral_t _dwRefCount;
    bool _bStop;

    static ULONG WINAPI sProducer(void* This)
    {
        reinterpret_cast<CPData*>(This)->Producer();
        reinterpret_cast<CPData*>(This)->Release();
        return __LINE__;
    }

    void Producer()
    {
        HANDLE Handles[] = { _hProducerStop, _hFreeEvent  };

        for (;;)
        {
            BigData* item;

            while (!_bStop && (item = FreeItems::pop()))
            {
                // init data item
                item->_id = _writeId++;

                bool bWasEmpty;

                if (!CyclicBufer::push(item, &bWasEmpty)) __debugbreak();

                if (bWasEmpty)
                {
                    _setDataCount++;
                    SetEvent(_hDataEvent);
                }
            }

            switch (WaitForMultipleObjects(2, Handles, FALSE, INFINITE))
            {
            case WAIT_OBJECT_0:
                SetEvent(_hConsumerStop);
                return;
            case WAIT_OBJECT_0 + 1:
                break;
            default:
                __debugbreak();
            }
        }
    }

    static ULONG WINAPI sConsumer(void* This)
    {
        reinterpret_cast<CPData*>(This)->Consumer();
        reinterpret_cast<CPData*>(This)->Release();
        return __LINE__;
    }

    void Consumer()
    {
        HANDLE Handles[] = { _hDataEvent, _hConsumerStop };

        for (;;)
        {
            switch (WaitForMultipleObjects(2, Handles, FALSE, INFINITE))
            {
            case WAIT_OBJECT_0:
                break;
            case WAIT_OBJECT_0 + 1:
                return;
            default:
                __debugbreak();
            }

            bool bNotEmpty;

            do 
            {
                BigData* item;

                if (!CyclicBufer::pop((void**)&item, &bNotEmpty)) __debugbreak();

                // check FIFO order
                if (item->_id != _waitReadId) __debugbreak();

                _waitReadId++;

                // process item

                // free item to the pool
                if (FreeItems::push(item))
                {
                    // stack was empty
                    _setFreeCount++;
                    SetEvent(_hFreeEvent);
                }

            } while (bNotEmpty);
        }
    }

    ~CPData()
    {
        if (_hConsumerStop) CloseHandle(_hConsumerStop);
        if (_hProducerStop) CloseHandle(_hProducerStop);
        if (_hFreeEvent) CloseHandle(_hFreeEvent);
        if (_hDataEvent) CloseHandle(_hDataEvent);

        if (_waitReadId != _writeId || !CyclicBufer::is_empty()) __debugbreak();

        DbgPrint("%s(%u %u %u)\n", __FUNCTION__, _writeId, _setFreeCount, _setDataCount);
    }

public:

    CPData()
    {
        _hFreeEvent = 0, _hDataEvent = 0, _hProducerStop = 0, _hConsumerStop = 0;
        _waitReadId = 0, _writeId = 0, _dwRefCount = 1;
        _setFreeCount = 0, _setDataCount = 0, _bStop = false;
    }

    void AddRef()
    {
        _MT_INCR(_dwRefCount);
    }

    void Release()
    {
        if (!_MT_DECR(_dwRefCount))
        {
            delete this;
        }
    }

    ULONG Create(ULONG n)
    {
        if (!CyclicBufer::create(n) || !FreeItems::create(n))
        {
            return ERROR_NO_SYSTEM_RESOURCES;
        }

        return (_hDataEvent = CreateEvent(0, FALSE, FALSE, 0)) &&
            (_hFreeEvent = CreateEvent(0, FALSE, FALSE, 0)) &&
            (_hProducerStop = CreateEvent(0, TRUE, FALSE, 0)) &&
            (_hConsumerStop = CreateEvent(0, TRUE, FALSE, 0)) ? 0 : GetLastError();
    }

    ULONG StartThread(bool bConsumer)
    {
        AddRef();

        if (HANDLE hThread = CreateThread(0, 0, bConsumer ? sConsumer : sProducer, this, 0, 0))
        {
            CloseHandle(hThread);
            return 0;
        }

        Release();

        return GetLastError();
    }

    ULONG Stop()
    {
        ULONG err = SetEvent(_hProducerStop) ? 0 : GetLastError();
        _bStop = true;
        return err;
    }
};

void BufTest()
{
    if (CPData* p = new CPData)
    {
        if (!p->Create(16))
        {
            if (!p->StartThread(false))
            {
                p->StartThread(true);
            }

            MessageBoxW(0, 0, L"Wait Stop", MB_ICONINFORMATION);
            p->Stop();
        }
        p->Release();
    }
    MessageBoxW(0,0,0,1);
}
person RbMm    schedule 07.09.2018

Хотя в boost::lockfree::spsc_queue отсутствует поддержка перемещения, вы все равно можете это сделать.

Пример перемещения векторов в очередь и из очереди:

struct Element {
    std::vector<int> data_;

    Element(std::vector<int>& data)
        : data_(std::move(data))
    {}

    Element(Element const&) = delete;
    Element operator=(Element const&) = delete;

    operator std::vector<int>&&() {
        return std::move(data_);
    }
};

int main() {
    boost::lockfree::spsc_queue<Element, boost::lockfree::capacity<2>> q;

    std::vector<int> a(1);
    assert(!a.empty());
    q.push(&a, &a + 1); // Move the vector into the queue.
    assert(a.empty());

    std::vector<int> b = q.front(); // Move the vector from queue.
    assert(!b.empty());
    q.pop();
}
person Maxim Egorushkin    schedule 06.09.2018

Одна техника, которую я использовал, это...

void next_step(std::vector<std::string> &a)
{
    std::vector<std::string> v;
    v.swap(a);
    // process vector ...
}

Нет замены или копирования отдельных элементов. Быстро и эффективно.

Майк

person Michael Surette    schedule 06.09.2018
comment
Но он упоминает, что у него все равно не было времени для создания этих объектов заранее. Даже если он поменяет ресурсы, он не сможет поменять их обратно в процесс в конце. - person JackGrinningCat; 06.09.2018
comment
@JackGrinningCat Этот метод позволяет избежать создания чего-либо, кроме пустого вектора. Это почти нулевые накладные расходы. Если я что-то пропустил? - person Michael Surette; 06.09.2018
comment
Уверены ли вы. Память, полученная от a, будет освобождена в конце next_step в (поток 2) и без пула должна быть получена потоком 1. Если не в конце потока 2, по крайней мере, в конце цепочки обработки сигналов - person JackGrinningCat; 06.09.2018
comment
@JackGrinningCat Конечно, частью обработки является передача вектора на следующий шаг аналогичным образом, либо напрямую, либо с помощью существующей системы кольцевых буферов. - person Michael Surette; 06.09.2018
comment
std::vector::swap не является потокобезопасным/свободным от блокировки. Реализации std::vector обычно имеют 3 указателя (начальный/конечный/используемый), поэтому только x86-64 с 32-битными указателями (например, x32 ABI) даже могли выполнять обмен без блокировки, используя 16-байтовый cmpxchg16b в некоторых гипотетических реализациях, которые хотели предоставить метод atomic_replace(), который атомарно заменяет все элементы. - person Peter Cordes; 06.09.2018
comment
Даже с защитой от мьютекса подкачка выполняется достаточно быстро, поэтому время подкачки незначительно по сравнению с альтернативами. - person Michael Surette; 06.09.2018