Стриминговый сервер с модификацией 2 — фрагментированный

Я должен передавать конечную точку с сервера, который возвращает json с Transfer-Encoding: chunked.

У меня есть следующий код, но я не могу прочитать ответ. Я попробовал responseBody.streamBytes() и преобразовал входной поток в строку, но не могу сделать это в основном потоке. Как я мог прочитать ответ?

@Streaming
@GET("stream/status")
Observable<ResponseBody> streamStatus();

Observable<ResponseBody> observable = ApiClientHelper.getClient().streamStatus();
    observable
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Observer<ResponseBody>() {
                @Override
                public void onSubscribe(Disposable d) {
                }

                @Override
                public void onNext(final ResponseBody responseBody) {
                   //DON'T KNOW HOW TO READ DATA
                }

                @Override
                public void onError(Throwable e) {
                }

                @Override
                public void onComplete() {
                }
            });

РЕДАКТИРОВАТЬ:

Ответ сервера с использованием CURL:

*   Trying 192.168.1.3...
* TCP_NODELAY set
* Connected to 192.168.1.3 (192.168.1.3) port 80 (#0)
> GET /stream/meter HTTP/1.1
> Host: 192.168.1.3
> User-Agent: curl/7.54.0
> Accept: */*
> Cookie:sessionId=bf2533346190e1c72b532b9d6ec6a405
> 
< HTTP/1.1 200 OK 
< Content-Type: text/event-stream
< Cache-Control: no-cache, no-store
< Date: Tue, 17 Oct 2017 12:02:03 GMT
< Transfer-Encoding: chunked
< Connection: Keep-Alive
< 
data: {"production":{"ph-a":{"p":-0.817,"q":0.0,"s":47.302,"v":225.697,"i":0.21,"pf":0.0,"f":50.0},"ph-b":{"p":-0.066,"q":-0.0,"s":0.643,"v":3.091,"i":0.206,"pf":0.0,"f":50.0},"ph-c":{"p":-0.195,"q":-0.0,"s":0.943,"v":7.577,"i":0.123,"pf":0.0,"f":50.0}},"net-consumption":{"ph-a":{"p":-0.598,"q":0.0,"s":51.931,"v":225.606,"i":0.231,"pf":0.0,"f":50.0},"ph-b":{"p":-0.088,"q":0.0,"s":0.875,"v":4.585,"i":0.19,"pf":0.0,"f":50.0},"ph-c":{"p":-0.043,"q":0.0,"s":0.16,"v":1.23,"i":0.13,"pf":-1.0,"f":50.0}},"total-consumption":{"ph-a":{"p":-1.415,"q":-0.0,"s":-4.599,"v":225.652,"i":-0.02,"pf":-1.0,"f":50.0},"ph-b":{"p":-0.154,"q":0.0,"s":0.06,"v":3.838,"i":0.016,"pf":-1.0,"f":50.0},"ph-c":{"p":-0.237,"q":0.0,"s":-0.033,"v":4.404,"i":-0.008,"pf":-1.0,"f":50.0}}}

person Yamila    schedule 23.10.2017    source источник
comment
Это предположение, я не эксперт в http, но я думаю, что Retrfoit сам по себе не является HTTP-клиентом. Таким образом, в качестве вашего ответа вы уже можете получить весь загруженный объект, а не отдельные фрагменты. Обработка чанков, вероятно, произойдет где-то на уровне OkHTTP.   -  person Lukasz    schedule 24.10.2017
comment
Не понимаю. Пожалуйста, посмотрите мой код, у меня есть тело ответа только в методе onNext.   -  person Yamila    schedule 24.10.2017


Ответы (1)


Добавление аннотации @Streaming приводит к тому, что модификация не перемещает весь файл в память, а сразу передает входящие байты. Это позволяет обрабатывать потоки данных, размер которых может превышать общую доступную память. Однако, если вы попытаетесь сделать это в основном потоке, вы получите android.os.NetworkOnMainThreadException, что, я полагаю, вы и получили. Так что проблема кроется в .observeOn(AndroidSchedulers.mainThread).

Изменить: справедливое предупреждение. Я не запускал это.

    observable
            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.io())
            .subscribe(new Observer<ResponseBody>() {
                @Override
                public void onSubscribe(Disposable disposable) {

                }

                @Override
                public void onNext(ResponseBody responseBody) {
                    InputStream inputStream = responseBody.byteStream();
                    BufferedReader br = null;
                    StringBuilder sb = new StringBuilder();

                    String line;
                    try {

                        br = new BufferedReader(new InputStreamReader(inputStream));
                        while (br.ready()) {
                            line = br.readLine();
                            sb.append(line);
                        }

                    } catch (IOException e) {
                        e.printStackTrace();
                    } finally {
                        if (br != null) {
                            try {
                                br.close();
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        }
                    }

                    Log.d("streamed string", sb.toString()); // replace log with whatever you want to do with it.
                }

                @Override
                public void onError(Throwable throwable) {

                }

                @Override
                public void onComplete() {

                }
            });
person Lukasz    schedule 24.10.2017
comment
когда я пытаюсь его отладить, он застревает на: StringBuilder sb = new StringBuilder(); и когда я не отлаживаю его и просто нажимаю «Выполнить», он ничего не регистрирует - person Yamila; 25.10.2017
comment
@Yamila А ошибок нет? Насколько велик загружаемый вами контент? В строке StringBuilder sb = new StringBuilder(); нет ничего, что могло бы пойти не так. Может быть, он все еще загружается и остается в цикле while? Добавьте оператор журнала внутри цикла. - person Lukasz; 25.10.2017
comment
Да! Это остается в петле. Я редактирую свой вопрос с ответом сервера, используя curl. Он возвращает ответ каждые 5 секунд - person Yamila; 25.10.2017
comment
В порядке. Я изменил условие в цикле while, ссылаясь на это, теперь оно должно работать. - person Lukasz; 25.10.2017
comment
Давайте продолжим обсуждение в чате. - person Lukasz; 25.10.2017
comment
ну, каждая строка содержит нужные мне данные! А поскольку это бесконечный стрим, то нормально иметь бесконечное время. проблема в том, как остановить его и, наконец, вызвать, когда пользователь закрывает активность - person Yamila; 26.10.2017
comment
Переполнение стека @Yamila не подходит для дискуссионного форума. Если ответ удовлетворяет ваш первоначальный вопрос, вы должны принять его и задать новый вопрос относительно вашей новой проблемы. - person Lukasz; 26.10.2017