После отмены подписки моя подписка продолжает действовать

Я пытаюсь изучить RxScala. Я использую Observable.interval(1 second).buffer(10 seconds) и делаю подписку для печати значений, но не могу правильно отказаться от подписки. После того, как я отменил подписку, она продолжает работать, но с пустым результатом.

Мой код:

import scala.language.postfixOps
import scala.concurrent.duration.DurationLong
import scala.concurrent.duration.DurationInt
import scala.concurrent.duration.Duration
import rx.lang.scala._

val o = Observable.interval(1 second).buffer(10 seconds)
val s = o.subscribe(
          { n =>  println("buffer --> " + n) }, 
          e => { e.printStackTrace() },
          () => { println("Completed!") })

Он хорошо печатает результаты:

buffer --> Buffer(0, 1, 2, 3, 4, 5, 6, 7, 8)
buffer --> Buffer(9, 10, 11, 12, 13, 14, 15, 16, 17, 18)
buffer --> Buffer(19, 20, 21, 22, 23, 24, 25, 26, 27, 28)
buffer --> Buffer(29, 30, 31, 32, 33, 34, 35, 36, 37, 38)

Через некоторое время отписываюсь:

s.unsubscribe

И я все еще получаю отпечатки, но с пустыми буферами:

buffer --> Buffer()

Я думаю, это означает, что моя подписка все еще активна, но получаю пустые буферы. Как я могу остановить свою подписку?


person Filipe Pais Lenfers    schedule 08.12.2013    source источник
comment
Может быть, форум Coursera может быть лучшим местом для этого вопроса?   -  person Mark Lister    schedule 09.12.2013
comment
@MarkLister Я задал этот вопрос на форуме Coursera (class.coursera.org/ reactive-001/forum/thread?thread_id=1858), как вы предложили, и получил ответ. Похоже, это ошибка: github.com/Netflix/RxJava/issues/559 . спасибо   -  person Filipe Pais Lenfers    schedule 09.12.2013


Ответы (1)


Я задал этот вопрос на форуме курса (class.coursera.org/reactive-001/forum/thread?thread_id=1858), как предложил @MarkLister, и получил ответ.

Кажется, это ошибка в версии 0.15.1: github.com/Netflix/RxJava/issues/559.

person Filipe Pais Lenfers    schedule 09.12.2013