Безопасное и однозначное управление атомарными переменными в C ++ 11

Мне нужно прочитать некоторые данные (которые поступают с невероятной скоростью - до 5000 сообщений в секунду) из многоадресного (UDP) потока. Поскольку поток является многоадресным (и данные очень важны), поставщик данных предоставил два потока, которые отправляют идентичные данные (их логика заключается в том, что вероятность отбрасывания одного и того же пакета в обоих потоках очень близка к нулю). Все пакеты данных помечены порядковым номером для отслеживания.

Кроме того, приложение настолько критично по времени, что я вынужден прослушивать оба потока параллельно и выбирать следующий порядковый номер из того, какой многоадресный поток был получен первым - когда тот же пакет поступает в зеркальный поток, я просто отбрасываю его. .

Я планирую реализовать эту функцию перетаскивания, используя общую переменную "sequence_number" между двумя функциями, которые, кстати, выполняются в разных потоках. Порядковый номер atomic, поскольку он будет считываться и обновляться из двух разных потоков.

На ум приходит очевидный алгоритм:

if (sequence number received from the stream > sequence_number)
{
   process packet;
   sequence_number = sequence number received from the stream;
}

(Вышеупомянутый алгоритм необходимо изменить для тех случаев, когда порядковые номера выходят из строя - и они могут, поскольку это поток UDP, но давайте пока забудем об этом)

У меня такой вопрос:

С того момента, как я std::load мой sequence_number, проверяю, меньше ли он, чем порядковый номер, который я получил из потока, принимаю пакет и наконец std::store новый порядковый номер на sequence_number; если другой поток получает тот же пакет (с тем же порядковым номером) и выполняет те же операции (до того, как первый поток завершит std::store на этом порядковом номере), я, по сути, получу один и тот же пакет дважды в моей системе. Как выйти из этой ситуации?


person Chani    schedule 19.07.2014    source источник
comment
Я думаю, вам нужно изучить использование блокировки мьютексов. Именно для этого они предназначены. Автоматические переменные просто гарантируют, что никакие два потока не будут выполнять операцию с этой переменной одновременно, но это применимо только к одному чтению / записи. Вы явно делаете больше, чем одно чтение / запись. Вы выполняете транзакцию   -  person aruisdante    schedule 19.07.2014
comment
@aruisdante Я обдумал это. Но знаете, стоимость блокировки может быть совсем небольшой. По сути, я ищу здесь решение без блокировки.   -  person Chani    schedule 19.07.2014
comment
Есть много-много способов выполнить программирование без блокировки. Какой метод лучше всего подходит для вашего приложения, может отличаться. Я бы сначала попробовал простое решение мьютекса, и только если это не работает, попробую реализовать более сложный метод без блокировки. Вообще говоря, программирование без блокировок направлено на предотвращение тупиковых ситуаций, а не на повышение производительности транзакций.   -  person aruisdante    schedule 19.07.2014
comment
Как насчет того, чтобы избежать отбрасывания любого пакета, получить все пакеты и просто атомарно поместить их в соответствующий слот последовательности?   -  person    schedule 19.07.2014
comment
@ DieterLücking, это, наверное, лучшее решение. Если у вас есть потокобезопасная set-подобная структура, основанная на порядковом номере, вам не нужно беспокоиться о том, как определять, когда вообще игнорировать пакеты, структура данных сделает это за вас.   -  person aruisdante    schedule 19.07.2014
comment
@ DieterLücking Тогда я бы рискнул вставить дубликаты в слот последовательности, верно?   -  person Chani    schedule 19.07.2014
comment
@Wildling Я предполагаю, что эти дубликаты идентичны, поэтому замена одного на другой не повредит.   -  person    schedule 19.07.2014
comment
@ DieterLücking Верно, но это снова лишает возможности обработать самый ранний пакет и двигаться дальше. Однако, поскольку мой вопрос уже является крайним случаем, это определенно способ решить эту проблему. Спасибо !   -  person Chani    schedule 19.07.2014
comment
Вы также можете использовать compare_exchange, чтобы не заменять пустой слот.   -  person Ben Voigt    schedule 19.07.2014
comment
поскольку параллельная работа легкая (последовательность проверки), эксклюзивная работа тяжелая (обработка пакета), я бы не стал делать ее многопоточным, но использовал бы select / epoll / kqueue на нескольких сокетах.   -  person Non-maskable Interrupt    schedule 19.07.2014
comment
@Calvin Ссылка на учебник по select / epoll / kqueue, пожалуйста !!! :))   -  person Chani    schedule 19.07.2014


Ответы (3)


Не откладывайте заботу об обработке вышедших из строя пакетов на потом, потому что это решение также обеспечивает наиболее элегантное решение для синхронизации потоков.

Элементы массива - это уникальные ячейки памяти для целей гонки данных. Если вы поместите каждый пакет (атомарно с помощью записи указателя) в другой элемент массива в соответствии с его порядковым номером, вы избавитесь от большинства конфликтов. Также используйте сравнение-обмен, чтобы определить, видел ли этот пакет уже другой поток (другой поток).

Обратите внимание, что у вас не будет цикла повторных попыток, обычно связанного с обменом-сравнением, либо у вас есть первая копия пакета, и обмен-сравнение успешен, либо пакет уже существует, и ваша копия может быть отброшена. Так что этот подход не только без блокировок, но и без ожидания :)

person Ben Voigt    schedule 19.07.2014
comment
использование массива для хранения уникальных порядковых номеров может не сработать, поскольку я получаю до восемнадцати миллионов сообщений. Однако идея сравнения-обмена не вызывает возражений. - person Chani; 19.07.2014
comment
Уайлдинг, используйте только младшие биты порядкового номера в качестве индекса массива, что создает кольцевой буфер. - person Ben Voigt; 19.07.2014
comment
Да, было бы здорово. Другое дело, если вы не возражаете, если я спрошу; из cppreference std :: atomic :: compare_exchange_strong Атомно сравнивает значение, хранящееся в * this, со значением ожидаемого, и, если они равны, заменяет первое на желаемое (выполняет операцию чтения-изменения-записи). В противном случае загружает фактическое значение, хранящееся в * this, в ожидаемое (выполняет операцию загрузки). Я не понял, и если они равны, заменяет предыдущую на желаемую (выполняет операцию чтения-изменения-записи). Почему бы не оставить это в покое, если значения совпадают? - person Chani; 19.07.2014
comment
Успешно, когда текущее значение в атомарном элементе совпадает со старым значением, называемым ожидаемым. В этом случае используйте nullptr в качестве ожидаемого значения. - person Ben Voigt; 19.07.2014

Вот один из вариантов, если вы используете значения std::atomic, используя compare_exchange .

Не показано, как инициализировать last_processed_seqnum, так как вам нужно установить его на допустимое значение, а именно, на единицу меньше, чем seqnum следующего прибывшего пакета.

Его необходимо будет адаптировать для случая, когда есть пробелы в порядковых номерах. Вы упоминаете как часть вашей предпосылки, что не будет выпадать seqnums; но в приведенном ниже примере обработка пакетов прекращается (т. е. происходит катастрофическая ошибка) при любых пропусках seqnum.

std::atomic<int> last_processed_seqnum;
// sync last_processed_seqnum to first message(s).

int seqnum_from_stream = ...;
int putative_last_processed_seqnum = seqnum_from_stream - 1;


if (last_processed_seqnum.compare_exchange_strong(putative_last_processed_seqnum,
                                                  seqnum_from_stream))
{
   // sequence number has been updated in compare_exchange_strong
   // process packet;
} 

В идеале нам нужна функция compare_exchange, которая использует больше, чем, не равно. Я не знаю, как добиться такого поведения за одну операцию. Вопрос SO, который я связал со ссылками на ответ об итерации по всем значениям, меньшим целевого, подлежащего обновлению.

person NicholasM    schedule 19.07.2014

Вы, вероятно, реализуете обработчик фида цен, что это за биржа и какой протокол? Это ITCH или FIX Fast? Я бы не рекомендовал два потока для одного и того же канала, поскольку вам, вероятно, придется присоединиться к нескольким группам многоадресной рассылки для разных сегментов рынка / плат.

person user3856670    schedule 19.07.2014
comment
Данные, поступающие из определенного многоадресного потока, гарантированно отправляются только для определенного набора книг заказов. Так что, думаю, проблемы слияния книг заказов не возникает. Это то, о чем вы беспокоились? (Это бинарный протокол .. ни ITCH, ни FIX) - person Chani; 20.07.2014