Я ищу документацию о том, как написать очередь MP/MC, чтобы она не блокировалась или даже не ждала. Я использую .Net 4.0. Нашел много кода на C++, но я не очень хорошо разбираюсь в моделях памяти, поэтому велика вероятность, что при переносе на C# я внесу некоторые ошибки.
Очередь с несколькими производителями и несколькими потребителями без блокировки (или даже без ожидания)
Ответы (3)
Как вы думаете, зачем вам нужна незаблокированная очередь? Пробовали ли вы использовать ConcurrentQueue<T>
, возможно, заключенный в BlockingCollection<T>
?
Писать многопоточный код сложно. Писать код без блокировки еще сложнее, и вам не следует делать это самостоятельно, если в этом нет особой необходимости.
ConcurrentBag<T>
. В любом случае, все эти коллекции стараются использовать блокировки как можно реже, поэтому я не уверен, что ваше решение будет намного лучше.
- person svick; 21.05.2011
Как вариант рассмотреть, есть алгоритм ограниченного Multiple Producer Multiple Consumer queue от Дмитрия Вьюкова. Я перенес алгоритм на .NET, вы можете найти исходники на github. Это очень быстро.
Алгоритм постановки в очередь:
public bool TryEnqueue(object item)
{
do
{
var buffer = _buffer; // prefetch the buffer pointer
var pos = _enqueuePos; // fetch the current position where to enqueue the item
var index = pos & _bufferMask; // precalculate the index in the buffer for that position
var cell = buffer[index]; // fetch the cell by the index
// If its sequence wasn't touched by other producers
// and we can increment the enqueue position
if (cell.Sequence == pos && Interlocked.CompareExchange(ref _enqueuePos, pos + 1, pos) == pos)
{
// write the item we want to enqueue
Volatile.Write(ref buffer[index].Element, item);
// bump the sequence
buffer[index].Sequence = pos + 1;
return true;
}
// If the queue is full we cannot enqueue and just return false
if (cell.Sequence < pos)
{
return false;
}
// repeat the process if other producer managed to enqueue before us
} while (true);
}
Алгоритм удаления из очереди:
public bool TryDequeue(out object result)
{
do
{
var buffer = _buffer; // prefetch the buffer pointer
var bufferMask = _bufferMask; // prefetch the buffer mask
var pos = _dequeuePos; // fetch the current position from where we can dequeue an item
var index = pos & bufferMask; // precalculate the index in the buffer for that position
var cell = buffer[index]; // fetch the cell by the index
// If its sequence was changed by a producer and wasn't changed by other consumers
// and we can increment the dequeue position
if (cell.Sequence == pos + 1 && Interlocked.CompareExchange(ref _dequeuePos, pos + 1, pos) == pos)
{
// read the item
result = Volatile.Read(ref cell.Element);
// update for the next round of the buffer
buffer[index] = new Cell(pos + bufferMask + 1, null);
return true;
}
// If the queue is empty return false
if (cell.Sequence < pos + 1)
{
result = default(object);
return false;
}
// repeat the process if other consumer managed to dequeue before us
} while (true);
}
Мой первый опыт будет с ConcurrentQueue<T>
, но вы можете абстрагировать свое хранилище данных за интерфейс, чтобы вы могли легко изменять реализации. Затем сравните типичные сценарии и посмотрите, где вы сталкиваетесь с проблемами. Помните: преждевременная оптимизация — корень всех зол. Спроектируйте свою систему так, чтобы она была привязана не к реализации, а к контракту, и тогда вы сможете оптимизировать свои реализации так, как захотите.
Я взглянул на ConcurrentQueue<T>
с ILSpy и, на первый взгляд, кажется, что это реализация без блокировки - так что есть большая вероятность, что это именно то, что вы ищете.
BlockingCollection<T>
тебе поможет. Он отвечает за ожидание новых элементов для потребителей и ожидание свободного места в очереди, если вы решите ограничить ее размер, для производителей.
- person svick; 21.05.2011