Использовать сообщение из темы только один раз для прослушивателей, работающих в кластере.

Я реализую инфраструктуру событий домена, но проект не позволяет использовать инфраструктуру обмена сообщениями (клиент финансовых услуг), поэтому нашел альтернативу в Hazelcast Topics и ExecutorService,

Но проблема в том, что при работе в кластере сообщение должно быть доставлено слушателям, которые будут работать в кластере, поэтому для кластера из 2 у нас есть один и тот же слушатель, работающий в 2 jvm, и сообщение потребляется дважды и обрабатывается, предположим, что Предполагается, что событие домена выполняет какую-то неидемпотентную операцию, такую ​​как кредитование некоторых баллов лояльности, если я явно не поддерживаю трассировку события домена, на которое воздействовали, и не проверяю его каждый раз, когда я получаю событие, я в конечном итоге зачисляю его дважды, «любое предложение, реализующее это без необходимости писать эти котельные плиты, возможно, на инфраслое», или существует известный шаблон для такой реализации.

Изменить: тем временем я также оцениваю Hazelcast ExecutorService, как было предложено. Здесь


person Somasundaram Sekar    schedule 19.04.2016    source источник


Ответы (1)


Описанный вами вариант использования можно решить, используя очереди Hazelcast вместо тем. Основная причина использования тем — если вы заинтересованы в том, чтобы несколько (возможно, независимых) потребителей получили одно и то же сообщение. Ваше требование звучит так, как будто вы заинтересованы в том, чтобы только один из потребителей получил сообщение, и для этого и нужны очереди, см. документация Hazelcast для очередей.

person svenpanko    schedule 19.04.2016
comment
Могу ли я попытаться еще больше расширить требование. Тема была выбрана, потому что сообщение должно быть доставлено нескольким подписчикам, но если подписчик работает в нескольких jvm, только один экземпляр подписчика должен обрабатывать сообщение. - person Somasundaram Sekar; 19.04.2016
comment
В этом случае вы можете решить эту проблему разными способами с помощью внешней блокировки, т. е. вы должны убедиться, что обрабатываемые вами сообщения в настоящее время не обрабатываются где-либо еще. Одним из решений может быть использование распределенной карты, к которой имеют доступ все слушатели, в сочетании с распределенной блокировкой. Каждый слушатель получает блокировку, проверяет, было ли сообщение уже обработано или обрабатывается в данный момент (идентификатор сообщения на карте), и если нет, он помещает запись в карту (IN_PROCESS или что-то подобное). Затем слушатель снимает блокировку и в зависимости от состояния карты обрабатывает ее или нет. - person svenpanko; 20.04.2016
comment
И после успешной обработки соответствующий слушатель снова получает блокировку, обновляет запись карты на PROCESSED или аналогичную, и все. Единственное, что вы должны убедиться, это то, что в какой-то момент времени срок действия записей на вашей карте истекает, иначе у вас закончится память ;) - person svenpanko; 20.04.2016
comment
Проблема здесь — это идентификатор сообщения. Для больших систем обмена сообщениями не существует концепции уникального идентификатора, который можно использовать для идентификации конкретного сообщения. (скажем MQTT). - person NishM; 22.03.2018