Публикация и подписка ZeroMQ одновременно

Я работаю над программой на C++, которая должна иметь возможность отправлять/получать JSON-полезные нагрузки от произвольного числа других клиентов.

Сначала я попытался реализовать сервис PubNub, но понял, что не могу одновременно получать и публиковать сообщения (даже используя два разных контекста в разных потоках). Мне нужно уметь это делать. Я также обнаружил, что у PubNub слишком большая задержка, на мой вкус.

Я наткнулся на библиотеку ZeroMQ с моделью PUB/SUB. который удовлетворил бы мои потребности. Но все примеры Я наткнулся на объяснение, как реализовать это таким образом, чтобы один процесс был издателем ИЛИ подписчиком, а не обоими одновременно.

В идеале я хотел бы запрограммировать сервер, который ретранслировал бы все сообщения, приходящие от кого-либо, любому, кто подписался на определенный канал, указанный в сообщении. Каждый человек должен иметь возможность получать и публиковать сообщения любому другому пользователю в сети при условии, что он подписан на правильный канал.


UPDATE 1:

Примечание. Мне не нужна страховка получения, потому что полезная нагрузка N+1 будет иметь приоритет над полезной нагрузкой N. Мне нужно средство связи "отправить и забыть" (типа UDP).

В соответствии с запросом: ограничение PubNub в 32 kB на JSON полезной нагрузки было идеальным для меня, мне не нужно больше. Фактически, моя полезная нагрузка в среднем составляет около 4 kB. Все экземпляры клиентов будут работать в одной и той же локальной сети, поэтому в идеале задержка должна быть меньше 5 ms. Что касается количества клиентов, то на один и тот же канал/тему одновременно может быть подписано не более 4 клиентов.


UPDATE 2 :

Я не могу заранее предсказать, сколько каналов/тем будет существовать, но их будет порядка десятков (в большинстве случаев), сотен (на пике). Не тысячи.


Вопросы:

Q1: — Могу ли я реализовать такое поведение с помощью ZeroMQ?
Q2: — Есть ли рабочий пример, демонстрирующий это (желательно в C++)?
Q3: - Если нет, есть предложения по библиотеке в C++?


архитектура pubsub


person Hippolyte Barraud    schedule 17.05.2016    source источник
comment
Не могли бы вы указать количественный порог задержки процесса между узлами в [ns], после которого какое-то решение имеет слишком большую задержку на ваше усмотрение + рабочий оценка верхних границ для произвольного числа других клиентов в [thousands] + набор рабочих оценок на { min | Среднее | MAX }-размеры JSON-полезных данных в [kB] -- как указано выше в вашем тексте? Спасибо.   -  person user3666197    schedule 17.05.2016
comment
Спасибо @HippolyteBarrau за ваши любезные обновления. Каким может быть результирующее количество клиентов внутри одного и того же домена коллизий, если взять указанное максимальное количество 4 на канал/тему, умноженное на теоретическое (еще не указанное) максимальное количество возможных тем? Спасибо за количественные данные, от которых сильно зависит архитектура.   -  person user3666197    schedule 17.05.2016
comment
@user3666197 user3666197: Извините за неточность. Теперь вопрос должен быть завершен.   -  person Hippolyte Barraud    schedule 17.05.2016


Ответы (3)


ZeroMQ : хорошо справляется с этой задачей в масштабах, указанных выше
nanomsg : тоже может выполнять эту задачу, необходимо перепроверить порты/привязки для клиентов

Обзор дизайна:

  • Экземпляры client не являются постоянными, могут свободно появляться сами по себе, могут свободно исчезать сами по себе или по ошибке
  • Экземпляр client самостоятельно решает, что он собирается PUB-лить в качестве полезной нагрузки сообщения
  • Экземпляр client самостоятельно решает, что он собирается SUB-подписать как фактический входящий поток сообщений TOPIC-filter
  • Экземпляр client самостоятельно обменивается (отправляет) простыми, не составными, JSON-форматированными сообщениями, которые он подготовил/создал
  • Экземпляр client собирает (получает) сообщения, для которых он предполагает, что они имеют одинаковую форму, не состоящую из нескольких частей, в формате JSON, и для которых будет предпринята попытка их локальной обработки после приема. завершено
  • максимальное количество экземпляров client не превышает нескольких сотен
  • максимальный размер любой полезной нагрузки в формате JSON меньше 32 kB, в среднем около 4 kB
  • максимально допустимая задержка при доставке между процессами E2E в общем домене коллизии локальных сетей составляет менее 5,000 [usec]
  • Экземпляр server играет центральную роль и является постоянным объектом.
  • Экземпляр server предоставляет известную URL-цель транспортного класса для всех опоздавших присоединившихся.connect()-s

Предложение:

сервер может развертывать несколько поведений для достижения заданных целей, используя как PUB, так и SUB поведения, и обеспечивает управляемый кодом, быстрый, присоединенный к SUB стороне, неблокирующий цикл обработки событий .poll() с выровненной повторной передачей любой полезной нагрузки со стороны SUB со стороны .recv() на PUB-сторона, в настоящее время .connect()-ed, аудитория (живые экземпляры client):

установите s_SUB_recv = aZmqCONTEXT.socket( zmq.SUB );
и s_PUB_send = aZmqCONTEXT.socket( zmq.PUB );

из соображений производительности, которые не здесь так сложно, можно также разделить обработку потоков рабочей нагрузки, сопоставив каждый из них с разрозненными подмножествами нескольких созданных потоков ввода-вывода:

map s_SUB_recv.setsockopt( ZMQ_AFFINITY, 0 );
и s_PUB_send.setsockopt( ZMQ_AFFINITY, 1 );

установить s_PUB_send.bind( "tcp://localhost:8899" );
+
установить s_SUB_recv.setsockopt( ZMQ_SUBSCRIBE, "" ); // forever *every*-TOPIC
установить s_SUB_recv.setsockopt( ZMQ_MAXMSGSIZE, 32000 ); // protective ceiling
установить s_SUB_recv.setsockopt( ZMQ_CONFLATE, True ); // retain just the last msg
установить s_SUB_recv.setsockopt( ZMQ_LINGER, 0 ); // avoid blocking
установить s_SUB_recv.setsockopt( ZMQ_TOS, anAppToS_NETWORK_PRIO_CODE );

и s_SUB_recv.bind( "tcp://localhost:8888" ); // [PUB]s .connect()


Аналогично ,
Экземпляр client может развернуть обращенный назад тандем из PUB-конечной точки и SUB-конечной точки, готовый .connect() к действию nown transport-target-URL.

Конкретная подписка client локально решает, что нужно отфильтровать из входящего потока сообщений (до ZeroMQ v.3.1 API будет доставлено множество всех сообщений каждому экземпляру client по транспортному классу, однако, поскольку API v.3.1+, тематический фильтр работает на стороне PUB, что в желаемом образе действий устраняет напрасный объемы данных по сети, но в то же время это увеличивает накладные расходы на обработку на стороне PUB ( ссылка: примечания о принципе увеличения отображения нескольких потоков ввода-вывода / повышения производительности выше )

установить c_SUB_recv = aZmqCONTEXT.socket( zmq.SUB );
и c_PUB_send = aZmqCONTEXT.socket( zmq.PUB );

если только накладные расходы на сборку/обработку полезной нагрузки не приближаются к допустимому порогу сквозной задержки, нет необходимости разделять/разделять низкоуровневые потоки ввода-вывода ZeroMQ здесь:
map c_SUB_recv.setsockopt( ZMQ_AFFINITY, 0 );
и c_PUB_send.setsockopt( ZMQ_AFFINITY, 1 );

set c_PUB_send.connect( "tcp://server:8888" ); // reverse .bind on [SUB]
+
set c_SUB_recv.setsockopt( ZMQ_SUBSCRIBE, "" ); // modified on-the-fly
set c_SUB_recv.setsockopt( ZMQ_MAXMSGSIZE, 32000 ); // protective ceiling
set c_SUB_recv.setsockopt( ZMQ_CONFLATE, True ); // take just last< бр> се t c_SUB_recv.setsockopt( ZMQ_LINGER, 0 ); // avoid blocking
установить c_SUB_recv.setsockopt( ZMQ_TOS, anAppToS_NETWORK_PRIO_CODE );
и c_SUB_recv.connect( "tcp://server:8899" );


Обсуждение:

Для хобби-проектов в инфраструктуре обмена сообщениями не так много требуется, тем не менее, для более серьезных доменов есть дополнительные службы, и экземпляры server и client должны иметь некоторые дополнительные услуги. добавлены шаблоны поведения для формальных коммуникаций.
 – r/KBD для удаленной клавиатуры с утилитами специальной проверки, подобными CLI
 – KEEP_ALIVE транспондеры для обеспечения общесистемного мониторинга состояния/производительности
- SIG_EXIT обработчики для разрешения общесистемного / специфическая для экземпляра служба SIG_EXITs
 — distributed syslog, позволяющая безопасно собирать/хранить неблокирующую реплику записей журнала (будь то на этапе отладки, на этапе настройки производительности или на этапе производственного уровня). сбора доказательств )

- Identity Management инструменты для журналов аудита и др.

- WhiteList/BlackList для повышение надежности инфраструктуры, чтобы сделать ее более защищенной от DoS-атак/poiso определение ошибочных всплесков трафика NIC и др.

- Adaptive Node re-Discovery для более разумного / специального проектирования инфраструктуры и мониторинга состояния или при использовании нескольких ролей / ( N + M )-заштрихованные сценарии активного горячего резерва, передачи/поглощения ролей и др. выходят на сцену

Резюме

A1: Да, полностью в пределах возможностей ZeroMQ
A2: Да, примеры кода C++ в книге ZeroMQ/доступны руководства
A3: Ссылка: A1, а также может понравиться подробное замечание в сообщении Мартина САСТРИКА "Различия между nanomsg и ZeroMQ"

Надеюсь, вам понравятся возможности распределенной обработки, независимо от того, поддерживается ли она ZeroMQ или nanomsg, или обоими.

Пределом может быть только собственное воображение.

Если вас интересуют дополнительные сведения, вам может понравиться книга, упомянутая в разделе Лучший следующий шаг< /strong> раздел этого поста

person user3666197    schedule 17.05.2016
comment
Спасибо за ваш подробный ответ! Вы ответили на мой вопрос намного выше моих ожиданий. После тестирования обеих библиотек я склоняюсь к nanomsg из-за его простоты и удобства работы с памятью. У меня не будет проблем с адаптацией вашего ответа для моих нужд. - person Hippolyte Barraud; 18.05.2016
comment
О, конечно, да, Мартин САСТРИК является соавтором обоих :о) Если ваша клиентская зона не страдает от отсутствия языковой привязки/порта, вы счастливый человек в этом смысле. Удачи, Ипполит, и вперед! Не стесняйтесь добавить +1 голос в знак полезности ответа. - person user3666197; 18.05.2016

Q1: — Могу ли я реализовать такое поведение с помощью ZeroMQ?

Определенно да; но, вероятно, не используя сокеты PUB/SUB.

Способ сделать это с помощью PUB/SUB заключается в следующем: для каждого узла в системе вы создаете один сокет PUB и один сокет SUB, соединяете один сокет SUB с сокетами PUB всех других узлов и устанавливаете ваш фильтр подписки соответственно. Это ограничено в своей полезности, потому что (я думаю) вам нужно установить один и тот же фильтр для всех ваших соединений. Обратите внимание, что вам определенно следует НЕ создавать более одного контекста в каждом узле.

Если общее количество узлов невелико (например, 10-20 или меньше), вы можете создать один PUB сокет и N-1 SUB сокетов на узел (по-прежнему все в одном контексте) и подключить каждый SUB сокет к каждому из PUB сокетов. остальных узлов.

Если у вас есть четкое представление о клиентских и серверных узлах, вы можете использовать более новые сокеты CLIENT/SERVER (доступные в 4.2 или 4.1, я полагаю). Это будет более элегантно и, вероятно, проще в управлении, но вам придется реализовать фильтрацию содержимого. («каналы») себя; который может быть довольно простым или немного сложным, в зависимости от того, что именно вы хотите сделать.

Q2:. Есть ли рабочий пример, демонстрирующий это (желательно на C++)?

Не то, что я знаю из.

Q3: — Если нет, какие предложения по библиотеке на C++?

Я бы по-прежнему рекомендовал ZeroMQ из-за его относительно легкого веса, простого и элегантного интерфейса, обширной функциональности и способности работать на многих языках и со многими языками. Есть много комбинаций розеток на выбор. В худшем случае вы всегда можете использовать сокеты PAIR везде.

person yzt    schedule 17.05.2016
comment
При всем уважении, что является основанием для приведенного выше предположения не использовать более одного экземпляра Context() в приложении ZeroMQ? У вас есть цитата / источник для этого мнения? - person user3666197; 17.05.2016
comment
Да, я знаю, что в конечном итоге это решение всегда существует. Но идея заключалась в том, чтобы реализовать это в крайнем случае, предпочтительно используя что-то более простое, если это возможно. Я бы предпочел управлять большим количеством сокетов только на стороне сервера. Оставив клиентам два сокета на макс. - person Hippolyte Barraud; 17.05.2016

nanomsg с протоколом BUS, см. http://nanomsg.org/documentation-zeromq.html

person Severin Pappadeux    schedule 17.05.2016
comment
Да, Мартин Сустрик выделил еще одну замечательную библиотеку обмена сообщениями в nanomsg, тем не менее NN_BUS не предоставляет сами по себе, насколько мне известно, запрошенные сервисные компоненты, указанные выше OP (фильтрация тем, узел (re )-обнаружение обновлений и др.), поэтому дизайн/архитектура по-прежнему сильно зависит от сведений о пороговых значениях задержки + оценки емкости, указанные выше, которые еще предстоит уточнить. - person user3666197; 17.05.2016
comment
Я взглянул на протокол BUS nanomsg. Хотя на первый взгляд кажется, что это не совсем то, что мне нужно, автор говорит о нетривиальной топологии, основанной на брокере, в которой есть человек посередине, передающий все сообщения всем остальным (как описано здесь). @user3666197 user3666197: Как вы думаете, это подойдет, учитывая приведенные выше оценки? - person Hippolyte Barraud; 17.05.2016
comment
@user3666197 user3666197 это правда, поверх nanomsg нужно написать некоторый код. Но я думаю, что наличие протокола BUS должно сильно упростить ситуацию в сценарии OP. Было бы мучительно... делать это поверх 0mq - person Severin Pappadeux; 17.05.2016
comment
@HippolyteBarraud да, nanomsg с BUS не совсем подходит, вам понадобится больше кода. Но из трех библиотек обмена сообщениями, с которыми я имел дело (MPI, 0mq, nanomsg), у последней есть по крайней мере достойная основа для того, что вам нужно. . Возможно, другие библиотеки для обмена сообщениями лучше подходят для поставленной задачи (RabbitMQ? понятия не имею об этом) - person Severin Pappadeux; 17.05.2016
comment
При полном уважении к вашему мнению, мой опыт заставил меня поверить, что ни один проект с одним архетипом не соответствует требованиям реальной экосистемы производственного уровня (даже, а может быть тем более если вам продают эту выдумку от наивных менеджеров проектов PMO и специальных спонсоров проектов CXO). Применяются бесчисленные примеры. Кстати, я очень ценю свежие и прекрасные посты в блоге Мартина на скорости 250 ударов в минуту. - person user3666197; 17.05.2016
comment
Нет, @SeverinPappadeus, любое решение на основе брокера (мастодонт-в-фарфоре, мотивированное настойчивостью) - это не путь вперед. Забавно читать MPI в контексте умного и легкого обмена сообщениями. MPI проложил путь в среде научного кластера с довольно наивными (массово)-распределенными кодовыми конструкциями, а не ресурсом первого выбора в неструктурированном специальном интеллектуальном обмене сообщениями. - person user3666197; 17.05.2016
comment
@ user3666197 Funny to read an MPI in a context of a smart & lightweight messaging. ;))). Нет, я бы не советовал использовать MPI для задачи ОП. - person Severin Pappadeux; 17.05.2016