Akka/Scala: можете ли вы объяснить, что происходит в потоке Akka Streams?

Я режу зубы на потоках Akka и сделал пример издателя-подписчика фибоначчи следующим образом. Однако я пока не совсем понимаю, как изначально формируется спрос и какое отношение он имеет к стратегии запроса абонента. Может кто-нибудь объяснить?

ФибоначчиИздатель:

class FibonacciPublisher extends ActorPublisher[Long] with ActorLogging {
  private val queue = Queue[Long](0, 1)

  def receive = {
    case Request(_) => // _ is the demand
      log.debug("Received request; demand = {}.", totalDemand)
      publish
    case Cancel =>
      log.info("Stopping.")
      context.stop(self)
    case unknown => log.warning("Received unknown event: {}.", unknown)
  }

  final def publish = {
    while (isActive && totalDemand > 0) {
      val next = queue.head
      queue += (queue.dequeue + queue.head)

      log.debug("Producing fibonacci number: {}.", next)

      onNext(next)

      if (next > 5000) self ! Cancel
    }
  }
}

ФибоначчиПодписчик:

class FibonacciSubscriber extends ActorSubscriber with ActorLogging {
  val requestStrategy = WatermarkRequestStrategy(20)

  def receive = {
    case OnNext(fib: Long) =>
      log.debug("Received Fibonacci number: {}", fib)

      if (fib > 5000) self ! OnComplete
    case OnError(ex: Exception) =>
      log.error(ex, ex.getMessage)
      self ! OnComplete
    case OnComplete =>
      log.info("Fibonacci stream completed.")
      context.stop(self)
    case unknown => log.warning("Received unknown event: {}.", unknown)
  }
}

Приложение Фибоначчи:

val src = Source.actorPublisher(Props[FibonacciPublisher])
val flow = Flow[Long].map { _ * 2 }
val sink = Sink.actorSubscriber(Props[FibonacciSubscriber])

src.via(flow).runWith(sink)

Пример запуска: Вопрос. Откуда возник первоначальный спрос на 4?

2015-10-03 23:10:49.120 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciProducer - Received request; demand = 4.
2015-10-03 23:10:49.120 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciProducer - Producing fibonacci number: 0.
2015-10-03 23:10:49.121 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciProducer - Producing fibonacci number: 1.
2015-10-03 23:10:49.121 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciProducer - Producing fibonacci number: 1.
2015-10-03 23:10:49.121 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciProducer - Producing fibonacci number: 2.
2015-10-03 23:10:49.122 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciSubscriber - Received Fibonacci number: 0
2015-10-03 23:10:49.122 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciSubscriber - Received Fibonacci number: 2
2015-10-03 23:10:49.123 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciProducer - Received request; demand = 2.
2015-10-03 23:10:49.123 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciSubscriber - Received Fibonacci number: 2
2015-10-03 23:10:49.124 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciSubscriber - Received Fibonacci number: 4
2015-10-03 23:10:49.124 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciProducer - Producing fibonacci number: 3.
2015-10-03 23:10:49.125 [fibonacci-akka.actor.default-dispatcher-2] [DEBUG] n.a.s.f.FibonacciProducer - Producing fibonacci number: 5.

person Abhijit Sarkar    schedule 04.10.2015    source источник


Ответы (1)


Первоначальный спрос на ваш источник обеспечивается входным буфером ваших более поздних этапов. Это, в свою очередь, настраивается через экземпляр ActorMaterializerSettings, который вы передаете при инициализации вашего ActorMaterializer.

Если вы не передадите какую-либо конкретную настройку, akka будет использовать предоставленную конфигурацию для ее инициализации; в конфигурации по умолчанию вы может обнаружить, что akka.stream.materializer.initial-input-buffer-size установлено на 4. Изменение этого должно изменить ваш первоначальный спрос.

person Aldo Stracquadanio    schedule 05.10.2015
comment
Спасибо за Ваш ответ. Играя с моим примером Фибоначчи выше, вместо одного подписчика я прикрепил 2, и начальная потребность стала 16. Я вижу, что 16 — это значение по умолчанию для akka.stream.materializer.max-input-buffer-size. Я также прочитал Буферы и работающие со скоростью. Чего это не объясняет, и я надеюсь, что вы сможете это объяснить, так это того, что 1) когда спрос больше 4, но меньше 16, что происходит? И 2) что происходит, когда спрос больше 16? - person Abhijit Sarkar; 06.10.2015
comment
Я думаю, что на большинстве этапов буфер будет использоваться только тогда, когда источник быстрее, чем потребитель; в этом случае заполнение буфера (меньшее потребление) приведет к противодавлению на потребителя. Если вместо этого потребитель быстрее (больше спрос), буфер вообще не будет использоваться, и данные от производителя будут напрямую передаваться потребителю. - person Aldo Stracquadanio; 06.10.2015
comment
То, что вы сказали в своем комментарии, неверно. Согласно документу Буферы и работа со скоростью, по соображениям производительности Akka Streams вводит буфер для каждого этапа обработки. - person Abhijit Sarkar; 07.10.2015