Akka Flow зависает при отправке http-запросов через пул соединений

Я использую Akka 2.4.4 и пытаюсь перейти с Apache HttpAsyncClient (безуспешно).

Ниже приведена упрощенная версия кода, которую я использую в своем проекте.

Проблема в том, что он зависает, если я отправляю в поток более 1-3 запросов. Пока что после 6 часов отладки я даже не смог найти проблему. Я не вижу исключений, журналов ошибок, событий в Decider. НИЧЕГО :)

Я попытался уменьшить настройку connection-timeout до 1 с, думая, что, возможно, он ждет ответа от сервера, но это не помогло.

Что я делаю не так ?

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.headers.Referer
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.http.scaladsl.settings.ConnectionPoolSettings
import akka.stream.Supervision.Decider
import akka.stream.scaladsl.{Sink, Source}
import akka.stream.{ActorAttributes, Supervision}
import com.typesafe.config.ConfigFactory

import scala.collection.immutable.{Seq => imSeq}
import scala.concurrent.{Await, Future}
import scala.concurrent.duration.Duration
import scala.util.Try

object Main {

  implicit val system = ActorSystem("root")
  implicit val executor = system.dispatcher
  val config = ConfigFactory.load()

  private val baseDomain = "www.google.com"
  private val poolClientFlow = Http()(system).cachedHostConnectionPool[Any](baseDomain, 80, ConnectionPoolSettings(config))

  private val decider: Decider = {
    case ex =>
      ex.printStackTrace()
      Supervision.Stop
  }

  private def sendMultipleRequests[T](items: Seq[(HttpRequest, T)]): Future[Seq[(Try[HttpResponse], T)]] =

    Source.fromIterator(() => items.toIterator)
      .via(poolClientFlow)
      .log("Logger")(log = myAdapter)
      .recoverWith {
        case ex =>
          println(ex)
          null
      }
      .withAttributes(ActorAttributes.supervisionStrategy(decider))
      .runWith(Sink.seq)
      .map { v =>
        println(s"Got ${v.length} responses in Flow")
        v.asInstanceOf[Seq[(Try[HttpResponse], T)]]
      }

  def main(args: Array[String]) {

    val headers = imSeq(Referer("https://www.google.com/"))
    val reqPair = HttpRequest(uri = "/intl/en/policies/privacy").withHeaders(headers) -> "some req ID"
    val requests = List.fill(10)(reqPair)
    val qwe = sendMultipleRequests(requests).map { case responses =>
      println(s"Got ${responses.length} responses")

      system.terminate()
    }

    Await.ready(system.whenTerminated, Duration.Inf)
  }
}

Также что случилось с поддержка прокси ? У меня тоже не работает.


person expert    schedule 05.05.2016    source источник


Ответы (1)


Вам необходимо полностью использовать тело ответа, чтобы соединение стало доступным для последующих запросов. Если вы вообще не заботитесь об объекте ответа, вы можете просто слить его в Sink.ignore, что-то вроде этого:

resp.entity.dataBytes.runWith(Sink.ignore)

В конфигурации по умолчанию при использовании пула соединений хоста максимальное количество соединений установлено на 4. Каждый пул имеет свою собственную очередь, в которой запросы ждут, пока одно из открытых соединений не станет доступным. Если эта очередь когда-либо превысит 32 (конфигурация по умолчанию, может быть изменена, должна быть степенью 2), тогда вы начнете видеть сбои. В вашем случае вы делаете только 10 запросов, поэтому вы не достигаете этого предела. Но, не потребляя объект ответа, вы не освобождаете соединение, а все остальное просто стоит в очереди, ожидая освобождения соединений.

person cmbaxter    schedule 06.05.2016
comment
Я на самом деле пробовал это, и это не помогло. Может быть, я разместил его в неправильном месте. Не могли бы вы взглянуть на автономный проект, который я создал? github.com/cppexpert/akka_flow_freezing - person expert; 06.05.2016
comment
Да, это проблема. Вы пытаетесь упорядочить результаты 10 фьючерсов, а затем читаете тело. Проблема в том, что для вызова карты на sequence все 10 фьючерсов должны быть завершены, и только первые 4 будут, и эти первые 4 блокируют остальные 6. Продвиньте код чтения ответа дальше, и это решит вашу проблему. . - person cmbaxter; 06.05.2016
comment
Не могли бы вы продемонстрировать, как вы переместите код чтения ответов вверх? Я пробовал несколько вещей, и он все еще ждет, когда фьючерсы будут готовы в большом количестве. Даже в моем примере следует ли parseResponse вызываться асинхронно перед каждым ответом ДО того, как он будет передан Future.sequence ? Возможно, я мог бы перейти на toMat очереди, но тогда я не смогу использовать ее для разбора разных ответов. Паковать лямбу вместе с (Any, Promise[..]) для каждого запроса мне кажется слишком некрасиво. - person expert; 08.05.2016
comment
Вот некрасиво - github.com/akka/akka/issues/20460#issuecomment- 217727754 - person expert; 08.05.2016
comment
Я не смогу сделать это сегодня, но я разветвлю ваш репозиторий и отправлю запрос на вытягивание завтра рано утром. - person cmbaxter; 08.05.2016