Публикация/использование: ожидание подписки, фильтрация сообщений и удаление

Использование Rx.Net 3

С помощью планировщика Quartz.Net я создаю диспетчер рабочих процессов для цепочки заданий (используя Quartz Joblistener для готовых заданий) со встроенным веб-сервером. Приложение создает экземпляр субъекта (синглтон).

Веб-служба берет данные и запускает рабочий процесс, вводя уникальный идентификатор. Этот уникальный идентификатор распространяется через рабочий процесс. Делегат Joblistener обнаруживает конец определенного задания и вызывает OnNext для внедренного экземпляра Subject с типом, содержащим уникальный идентификатор и идентификатор таблицы БД.

Идея заключалась в том, что веб-сервис при каждом звонке подписывается на Субъект и ждет входящие сообщения/события и фильтрует их по уникальному идентификатору. При обнаружении удаляет подписку, собирает и возвращает сгенерированные данные вызывающему объекту.

Как я могу заставить свой Subscribe() ждать входящие сообщения, фильтровать их и Dispose(), не завершая преждевременно веб-службу.


person Fritz Herbers    schedule 24.12.2016    source источник
comment
Вам нужно показать нам наш код. Мы не можем ответить на это без него. Вам нужен минимально воспроизводимый пример.   -  person Enigmativity    schedule 25.12.2016


Ответы (1)


Вам не нужно удалять подписку вручную. Любой ограничивающий оператор, такой как Take или First, сигнализирует OnCompleted, что приводит к удалению последовательности. Вы также можете использовать наблюдаемые await, чтобы избежать написания обратных вызовов.

Например, dbId = await AsyncCommunication.FirstAsync(x => x.Key == id)

person Asti    schedule 27.12.2016
comment
Спасибо за обеспечение некоторого внутреннего поведения. Хотя у меня была бы проблема параллелизма. Мне нужно подписаться, вызвать планировщик заданий и дождаться события. При первом вызове планировщика заданий я мог пропустить событие при использовании FirstAsync(). - person Fritz Herbers; 28.12.2016