Асинхронный HTTP-клиент с Netty

Я новичок в netty и все еще пытаюсь найти свой путь. Я хочу создать http-клиент, который работает асинхронно. Netty-примеры http показывают только то, как ждать операции ввода-вывода, а не то, как использовать addListener, поэтому я пытался понять это последние несколько дней.

Я пытаюсь создать класс запроса, который будет обрабатывать все различные состояния запроса, от подключения, отправки данных, обработки ответа и затем закрытия соединения. Для этого мой класс расширяет SimpleChannelUpstreamHandler и реализует ChannelFutureListener. Я использую ChannelPipelineFactory, который добавляет (этот) экземпляр класса (как SimpleChannelUpstreamHandler) в конвейер в качестве обработчика.

Соединение создается так:

this.state = State.Connecting;
this.clientBootstrap.connect(this.address).addListener(this);

Затем метод operationComplete:

@Override
public void operationComplete(ChannelFuture future) throws Exception {
    State oldState = this.state;

    if (!future.isSuccess()) {
        this.status = Status.Failed;
        future.getChannel().disconnect().addListener(this);
    }
    else if (future.isCancelled()) {
        this.status = Status.Canceled;
        future.getChannel().disconnect().addListener(this);
    }
    else switch (this.state) {
        case Connecting:
            this.state = State.Sending;
            Channel channel = future.getChannel();
            channel.write(this.createRequest()).addListener(this);
            break;

        case Sending:
            this.state = State.Disconnecting;
            future.getChannel().disconnect().addListener(this);
            break;

        case Disconnecting:
            this.state = State.Closing;
            future.getChannel().close().addListener(this);
            break;

        case Closing:
            this.state = State.Finished;
            break;
    }
    System.out.println("request operationComplete start state: " + oldState + ", end state: " + this.state + ", status: " + this.status);
}

private HttpRequest createRequest() {
    String url = this.url.toString();

    HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url);
    request.setHeader(HttpHeaders.Names.HOST, this.url.getHost());
    request.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
    request.setHeader(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP);

    return request;
}

Класс также переопределяет метод messageReceived:

@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
    System.out.println("messageReceived");
    HttpResponse response = (HttpResponse) e.getMessage();

    ChannelBuffer content = response.getContent();
    if (content.readable()) {
        System.out.println("CONTENT: " + content.toString(CharsetUtil.UTF_8));
    }
}

Проблема в том, что я получаю этот вывод:

request operationComplete start state: Connecting, end state: Sending, status: Unknown
request operationComplete start state: Sending, end state: Disconnecting, status: Unknown
request operationComplete start state: Closing, end state: Finished, status: Unknown
request operationComplete start state: Disconnecting, end state: Finished, status: Unknown

Как вы можете видеть, messageReceived не выполняется по какой-то причине, даже несмотря на то, что конвейерная фабрика добавляет экземпляр этого класса в конвейер.

Любые идеи, что мне здесь не хватает? Спасибо.


Редактировать

Мне удалось, наконец, заставить это работать благодаря помощи @JestanNirojan, если кому-то будет интересно решение:

public class ClientRequest extends SimpleChannelUpstreamHandler {

    ....

    public void connect() {
        this.state = State.Connecting;
        System.out.println(this.state);
        this.clientBootstrap.connect(this.address);
    }

    @Override
    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        this.state = State.Sending;
        System.out.println(this.state);
        ctx.getChannel().write(this.createRequest());
    }

    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        HttpResponse response = (HttpResponse) e.getMessage();

        ChannelBuffer content = response.getContent();
        if (content.readable()) {
            System.out.println("CONTENT: " + content.toString(CharsetUtil.UTF_8));
        }

        this.state = State.Disconnecting;
        System.out.println(this.state);
    }

    @Override
    public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        this.state = State.Closing;
        System.out.println(this.state);
    }

    @Override
    public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        this.state = State.Finished;
        System.out.println(this.state);
    }

    private HttpRequest createRequest() {
        String url = this.url.toString();

        HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url);
        request.setHeader(HttpHeaders.Names.HOST, this.url.getHost());
        request.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
        request.setHeader(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP);

        return request;
    }
}

person Nitzan Tomer    schedule 16.03.2012    source источник
comment
является ли HttpResponse полным HttpResponse или может быть фрагментом? У меня возвращаются 1000 фрагментов, и мне нужно одно событие на фрагмент, иначе память взорвется, что приведет к нехватке памяти.   -  person Dean Hiller    schedule 01.11.2012
comment
HttpResponse — это полный ответ, насколько я знаю, вы не можете разбить его на части. Вы должны пойти ниже, возможно, с помощью HttpResponseDecoder.   -  person Nitzan Tomer    schedule 02.11.2012
comment
Если вас не интересует фрагментация, используйте легкий http-клиент здесь @ https://github.com/arungeorge81/netty-http-client   -  person Arun George    schedule 13.01.2015


Ответы (1)


Вы используете ChannelFutureListener для выполнения всех операций в канале (что плохо), и будущий прослушиватель будет выполняться сразу после вызова этих операций канала.

Проблема в том, что после отправки сообщения канал немедленно отключается, и обработчик не может получить ответное сообщение, которое приходит позже.

        ........
    case Sending:
        this.state = State.Disconnecting;
        future.getChannel().disconnect().addListener(this);
        break;
        ........

Вы не должны блокировать будущую ветку канала вообще. Лучший подход - расширить SimpleChannelUpstreamHandler.

    channelConnected(..) {} 
    messageReceived(..) {} 
    channelDisconnected(..) {} 

методы и реагировать на эти события. вы также можете сохранить состояние в этом обработчике.

person Jestan Nirojan    schedule 17.03.2012
comment
Ой. Это было просто. Большое спасибо за информацию, я бы хотел, чтобы у Нетти была лучшая документация по этому вопросу. - person Nitzan Tomer; 17.03.2012