У меня проблемы с созданием «системы противодавления». Я использую Vertx HttpClient и RxJava. Мне нужно сделать 6000 запросов к внешней службе, и, чтобы избежать переполнения в waitForQueue, поскольку эта внешняя служба не может обрабатывать так быстро, как я отправляю, я устанавливаю задержку между запросом / ответом.
Поскольку это путешествие работает как пакетный процесс, не беспокойтесь, если это займет минуту.
Вот мой код
return from(subGroups)
.flatMap(subGroup -> getProductIdsForSubGroup(subGroup))
.delay(50, TimeUnit.MILLISECONDS)
Этот метод вызывается из наблюдаемого интервала, который выполняется каждые 24 часа, передавая этот список подгрупп (6000).
Но после проверки моих журналов я не вижу задержки между моим запросом в 50 мс.
Вот 3 моих лога
{"@timestamp":"2016-11-30T10:32:48.973+00:00","event":"started","requestHost":"localhost","requestMethod":"GET","requestUri":"/v3/comercial?category=T15EB&clientId=ERROR_NOT_SUPPLIED","requestHash":189630582,"level":"INFO","thread_name":"vert.x-eventloop-thread-5"}
{"@timestamp":"2016-11-30T10:32:48.978+00:00","event":"started","requestHost":"localhost","requestMethod":"GET","requestUri":"/v3/commercial?category=T15EE&clientId=ERROR_NOT_SUPPLIED","requestHash":1296199359,"level":"INFO","thread_name":"vert.x-eventloop-thread-5"}
{"@timestamp":"2016-11-30T10:32:48.981+00:00","event":"started","requestHost":"localhost","requestMethod":"GET","requestUri":"/v3/commercial?category=T15EG&clientId=ERROR_NOT_SUPPLIED","requestHash":228306365,"level":"INFO","thread_name":"vert.x-eventloop-thread-5"}
Есть идеи, что мне нужно сделать для этого?
С Уважением.
РЕШЕНИЕ
В конце концов я использую concatMap
. Если у вас есть лучшее решение, дайте мне знать
return from(subGroups)
.concatMap(subGroup -> Observable.just(subGroup).delay(50, TimeUnit.MILLISECONDS))
.flatMap(subGroup -> getProductIdsForSubGroup(subGroup))