Я использую RxJava.
У меня есть Observable<T>
. Как преобразовать его в List<T>
?
Вроде простая операция, но нигде в сети не нашел.
Я использую RxJava.
У меня есть Observable<T>
. Как преобразовать его в List<T>
?
Вроде простая операция, но нигде в сети не нашел.
Вы можете использовать toList() или toSortedList() . Например,
observable.toList(myObservable)
.subscribe({ myListOfSomething -> do something useful with the list });
subscribe
.
- person Adam Arold; 28.03.2016
Надеюсь это поможет.
List<T> myList = myObservable.toList().toBlocking().single();
спасибо
Ананд Раман
toBlocking
выдал это? (Вопрос не требует, чтобы он был асинхронным)
- person Nick Cardoso; 18.01.2018
RxJava 2+:
List<T> = theObservarale
.toList()
.blockingGet();
Вы также можете использовать оператор collect
:
ArrayList list = observable.collect(ArrayList::new, ArrayList::add)
.toBlocking()
.single();
С помощью collect
вы можете выбрать тип Collection
, который вы предпочитаете, и выполнить дополнительную операцию над элементом перед добавлением его в список.
collect(ArrayList::new, ArrayList::add)
- person Will; 26.01.2018
Вы не можете преобразовать observable в список каким-либо идиоматическим способом, потому что список на самом деле не тот тип, который подходит для Rx.
Если вы хотите заполнить список событиями из наблюдаемого потока, вам нужно в основном создать список и добавить элементы в метод Subscribe
, например так (C#):
IObservable<EventType> myObservable = ...;
var list = new List<EventType>();
myObservable.Subscribe(evt => list.Add(evt));
Операторы в стиле ToList
предоставляют список только после завершения потока (как IObservable<List<T>>
), поэтому это бесполезно в сценариях, где у вас есть долгоживущий поток или вы хотите увидеть значения до завершения потока.
Это работает.
public static void main(String[] args) {
Observable.just("this", "is", "how", "you", "do", "it")
.lift(customToList())
.subscribe(strings -> System.out.println(String.join(" ", strings)));
}
public static <T> ObservableOperator<List<T>, T> customToList() {
return observer -> new DisposableObserver<T>() {
ArrayList<T> arrayList = new ArrayList<>();
@Override
public void onNext(T t) {
arrayList.add(t);
}
@Override
public void onError(Throwable throwable) {
observer.onError(throwable);
}
@Override
public void onComplete() {
observer.onNext(arrayList);
observer.onComplete();
}
};
}`
Это может быть поздний ответ, надеюсь, что это поможет кому-то в будущем.
Есть оператор collectInto()
. Я бы посоветовал всем не использовать blocking()
(кроме тестового случая), так как вы полностью теряете цель асинхронных событий в Rxchains
. Старайтесь максимально связать свои операции
Completable setList(List<Integer> newIntegerList, Observable<Integer> observable){
return observable.collectInto(newIntegerList, List::add).ignoreElement();
}
// Can call this method
Observable<Integer> observable = Observable.just(1, 2, 3);
List<Integer> list = new ArrayList<>();
setList(list, observable);
В этом случае вы избавляете себя от необходимости использовать blocking()
.
Сам нашел
public static <T> List<T> toList(Observable<T> observable) {
final List<T> list = new ArrayList<T>();
observable.toBlocking().forEach(new Action1<T>() {
@Override
public void call(T t) {
list.add(t);
}
});
return list;
}
Observable.toList()
и убедитесь, что Observable
не бесконечно, иначе у вас возникнут проблемы.
- person tmn; 30.08.2015