Автоматический выключатель для службы Scala и akka-http rest

В настоящее время я реализую автоматический выключатель с Akka-HTTP следующим образом:

 def sendMail(entity: MyEntity): ToResponseMarshallable = {

      Thread.sleep(5 * 1000)
      validateEntity(entity).map[ToResponseMarshallable] {
        case (body, subject) if !isEmpty(body, subject) => {
          val mailResponse = sendMail(body, subject)
          OK -> ProcessedEmailMessage(mailResponse)
        }
        case _ =>
          BadRequest -> s"error: for $entity".toJson
      }
    } catch {
      case e: DeserializationException => HttpResponse(BadRequest).withEntity(HttpEntity(s"error:${e.msg}").withContentType(ContentTypes.`application/json`))
    }
  }

  val maxFailures: Int = 2
  val callTimeout: FiniteDuration = 1 second
  val resetTimeout: FiniteDuration = 30 seconds

  def open: Unit = {
    logger.info("Circuit Breaker is open")
  }

  def close: Unit = {
    logger.info("Circuit Breaker is closed")
  }

  def halfopen: Unit = {
    logger.info("Circuit Breaker is half-open, next message goes through")

  private lazy val breaker = CircuitBreaker(
    system.scheduler,
    maxFailures,
    callTimeout,
    resetTimeout
  ).onOpen(open).onClose(close).onHalfOpen(halfopen)

  def routes: Route = {
    logRequestResult("email-service_aggregator_email") {
      pathPrefix("v1") {
        path("sendmail") {
          post {
            entity(as[EmailMessage]) { entity =>
              complete {
                breaker.withCircuitBreaker(Future(sendMail(entity)))
              }
            }
          }
        }
      }
    }
  }

Моя проблема в том, что если я использую breaker.withCircuitBreaker(Future(sendMail(entity))), автоматический выключатель переходит в разомкнутое состояние, но остальная часть ответа возвращает There was an internal server error в качестве ответа.

Если вместо этого я использую breaker.withSyncCircuitBreaker(Future(sendMail(entity))), то автоматический выключатель никогда не переходит в разомкнутое состояние, но возвращает ожидаемое HttpResponse

Любые мысли о том, как я могу решить эту проблему, чтобы активировать автоматический выключатель, а также вернуть правильный ответ HTTP?


person Lucian Enache    schedule 01.03.2016    source источник
comment
Можете ли вы также опубликовать код, который создает breaker ?   -  person Pim Verkerk    schedule 01.03.2016
comment
Вам нужно использовать onComplete вместо complete. Директива complete ожидает, что ответ будет готов немедленно. В вашем случае withCircuitBreaker возвращает Future, поэтому complete не будет допустимым вариантом. Директива onComplete настроена для работы с Future, так что здесь она лучше подходит. Затем внутри обратного вызова onComplete вы можете использовать complete.   -  person cmbaxter    schedule 01.03.2016
comment
@cmbaxter, можете ли вы опубликовать пример, я не понимаю, как связать/следующий onComplete с полным   -  person Lucian Enache    schedule 01.03.2016


Ответы (2)


Я собираюсь предложить еще одно возможное решение, так как я считаю, что onComplete — это идиоматический способ работы с результатом Future при завершении маршрута:

entity(as[EmailMessage]) { entity =>
  val withBreaker = breaker.withCircuitBreaker(Future(sendMail(entity)))

  onComplete(withBreaker){
    case Success(trm) => 
      complete(trm)

    //Circuit breaker opened handling
    case Failure(ex:CircuitBreakerOpenException) => 
      complete(HttpResponse(TooManyRequests).withEntity("Server Busy"))

    //General exception handling
    case Failure(ex) =>
      complete(InternalServerError)
  }
}
person cmbaxter    schedule 01.03.2016

person    schedule
comment
Это не работает, я получаю то же самое. Было сообщение о внутренней ошибке сервера. Я попытался внести некоторые коррективы и метод sendMail(), чтобы возвращать непосредственно будущее, если я не оберну это в автоматический выключатель, он сработает, и будущее будет решено. - person Lucian Enache; 01.03.2016
comment
Отправка почты занимает не менее 5 секунд, в то время как время ожидания вашего звонка составляет 1 секунду. Не вызовет ли это исключение? - person Pim Verkerk; 01.03.2016
comment
Он должен войти в открытое состояние, что происходит, но ответ не возвращается - person Lucian Enache; 01.03.2016
comment
Я удалил Thread.sleep() из вызова функции, и ответ возвращается правильно. Теперь я хотел просто вернуть HttpResponse из функции onOpen - person Lucian Enache; 01.03.2016
comment
Вы правы, я создал небольшое приложение, чтобы проверить это, и мой первый запрос работает, даже если он занимает больше времени, чем время ожидания. Любой последующий запрос в течение следующих 30 секунд завершится ошибкой. - person Pim Verkerk; 01.03.2016
comment
Там мне было интересно, когда CB меняет состояние, есть ли способ вернуть HttpResponses? - person Lucian Enache; 01.03.2016
comment
это круто, это работает для второго запуска, где открыто, но для первого запуска он работает регулярно, только если я уберу Thread.sleep(), если в коде присутствует Thread.sleep(), он все равно выдаст "There was an internal server error." и не вернет фактический HttpResponse - person Lucian Enache; 01.03.2016
comment
Вы можете добавить case ex => ..., чтобы увидеть, есть ли другое исключение, и обработать его. - person Pim Verkerk; 01.03.2016
comment
в блоке withErrorHandling? - person Lucian Enache; 01.03.2016
comment
В части recover - person Pim Verkerk; 01.03.2016
comment
это было исключение TimeoutException, которое не было обработано, спасибо. - person Lucian Enache; 01.03.2016