Очередь с несколькими производителями и несколькими потребителями без блокировки (или даже без ожидания)

Я ищу документацию о том, как написать очередь MP/MC, чтобы она не блокировалась или даже не ждала. Я использую .Net 4.0. Нашел много кода на C++, но я не очень хорошо разбираюсь в моделях памяти, поэтому велика вероятность, что при переносе на C# я внесу некоторые ошибки.


person adontz    schedule 20.05.2011    source источник
comment
Эта ветка может быть отправной точкой: groups.google.com/group /comp.programming.threads/browse_thread/   -  person dlev    schedule 21.05.2011
comment
На эту тему есть отличная книга по Java: Java Concurreny in Practice. Все примеры кода доступны на их [веб-сайте][1], однако без книги код может быть трудно понять, если вы не знакомы с инфраструктурой параллелизма Java. [1]: javaconcurrencyinpractice.com   -  person Stefan    schedule 21.05.2011


Ответы (3)


Как вы думаете, зачем вам нужна незаблокированная очередь? Пробовали ли вы использовать ConcurrentQueue<T>, возможно, заключенный в BlockingCollection<T>?

Писать многопоточный код сложно. Писать код без блокировки еще сложнее, и вам не следует делать это самостоятельно, если в этом нет особой необходимости.

person svick    schedule 20.05.2011
comment
Я пишу подсистему общего назначения, поэтому я просто хочу сделать ее как можно лучше. - person adontz; 21.05.2011
comment
@adontz, в этом случае вы должны сделать свою систему расширяемой: вы предоставляете некоторую реализацию параллельной очереди, которая достаточно хороша в большинстве случаев, но позволяете вашим пользователям писать свои собственные, если им это нужно. - person svick; 21.05.2011
comment
К сожалению (для меня), основным вариантом использования является высокая нагрузка, поэтому отсутствие блокировки не более чем достаточно. Я не должен реализовывать очередь без ожидания по умолчанию, однако решение на основе блокировки неприемлемо. Я себе не враг, Очень нужно реализовать хотя бы lock-free очередь. - person adontz; 21.05.2011
comment
@adontz, вам нужно, чтобы ваша очередь была действительно FIFO? Если нет, и вы ожидаете, что часто один и тот же поток создает и потребляет элементы, вы можете использовать ConcurrentBag<T>. В любом случае, все эти коллекции стараются использовать блокировки как можно реже, поэтому я не уверен, что ваше решение будет намного лучше. - person svick; 21.05.2011

Как вариант рассмотреть, есть алгоритм ограниченного Multiple Producer Multiple Consumer queue от Дмитрия Вьюкова. Я перенес алгоритм на .NET, вы можете найти исходники на github. Это очень быстро.

Алгоритм постановки в очередь:

public bool TryEnqueue(object item)
{
    do
    {
        var buffer = _buffer; // prefetch the buffer pointer
        var pos = _enqueuePos; // fetch the current position where to enqueue the item
        var index = pos & _bufferMask; // precalculate the index in the buffer for that position
        var cell = buffer[index]; // fetch the cell by the index
        // If its sequence wasn't touched by other producers
        // and we can increment the enqueue position
        if (cell.Sequence == pos && Interlocked.CompareExchange(ref _enqueuePos, pos + 1, pos) == pos)
        {
            // write the item we want to enqueue
            Volatile.Write(ref buffer[index].Element, item);
            // bump the sequence
            buffer[index].Sequence = pos + 1;
            return true;
        }

        // If the queue is full we cannot enqueue and just return false
        if (cell.Sequence < pos)
        {
            return false;
        }

        // repeat the process if other producer managed to enqueue before us
    } while (true);
}

Алгоритм удаления из очереди:

public bool TryDequeue(out object result)
{
    do
    {
        var buffer = _buffer; // prefetch the buffer pointer
        var bufferMask = _bufferMask; // prefetch the buffer mask
        var pos = _dequeuePos; // fetch the current position from where we can dequeue an item
        var index = pos & bufferMask; // precalculate the index in the buffer for that position
        var cell = buffer[index]; // fetch the cell by the index
        // If its sequence was changed by a producer and wasn't changed by other consumers
        // and we can increment the dequeue position
        if (cell.Sequence == pos + 1 && Interlocked.CompareExchange(ref _dequeuePos, pos + 1, pos) == pos)
        {
            // read the item
            result = Volatile.Read(ref cell.Element);
            // update for the next round of the buffer
            buffer[index] = new Cell(pos + bufferMask + 1, null);
            return true;
        }

        // If the queue is empty return false
        if (cell.Sequence < pos + 1)
        {
            result = default(object);
            return false;
        }

        // repeat the process if other consumer managed to dequeue before us
    } while (true);
}
person Alexandr Nikitin    schedule 13.12.2016

Мой первый опыт будет с ConcurrentQueue<T>, но вы можете абстрагировать свое хранилище данных за интерфейс, чтобы вы могли легко изменять реализации. Затем сравните типичные сценарии и посмотрите, где вы сталкиваетесь с проблемами. Помните: преждевременная оптимизация — корень всех зол. Спроектируйте свою систему так, чтобы она была привязана не к реализации, а к контракту, и тогда вы сможете оптимизировать свои реализации так, как захотите.

Я взглянул на ConcurrentQueue<T> с ILSpy и, на первый взгляд, кажется, что это реализация без блокировки - так что есть большая вероятность, что это именно то, что вы ищете.

person ChrisWue    schedule 20.05.2011
comment
ConcurrentQueue‹T› — это просто часть очереди MPMC. Это не помогает в ожидании прибытия нового товара, уведомляя о прибытии нового товара. Я думаю, что для этого будет достаточно одного AutoResetEvent, моделировал разные ситуации: в какой-то момент производители работают быстрее, чем потребители, в какой-то момент потребители работают быстрее, чем производители. Ложное пробуждение также является проблемой, поток не должен просыпаться только для того, чтобы увидеть пустую очередь. Так что есть много мест, где можно ошибиться. Я в целом осведомлен о многопоточности, но точно не профессионал в этой области. - person adontz; 21.05.2011
comment
@adontz, вот здесь BlockingCollection<T> тебе поможет. Он отвечает за ожидание новых элементов для потребителей и ожидание свободного места в очереди, если вы решите ограничить ее размер, для производителей. - person svick; 21.05.2011