Обработчики параллельного завершения Java NIO2

Я пишу сервер NIO2, и мне нужно выполнить асинхронные операции чтения в AsynchronousSocketChannel, каждая из этих операций состоит из чтения целого числа и дальнейшего чтения из того же канала количества байтов, равного этому целому числу. Проблема в том, что когда я помещаю два или более CompletionHandler на канал подряд (потому что есть запросы на несколько операций чтения), и первый из этих обработчиков срабатывает, мой дальнейший код чтения в методе complete() первого обработчика не может работать должным образом, потому что второй обработчик срабатывает мгновенно, когда на канале появляется информация. Как я могу заблокировать канал, пока дальнейшее чтение complete() не завершится без Future? Я не могу использовать Future, потому что мне нужно поместить обработчик в сокет, а затем перейти к другим задачам.

for (final Map.Entry<String, AsynchronousSocketChannel> entry : ipSocketTable.entrySet()) {
        try {
            final AsynchronousSocketChannel outSocket = entry.getValue();
            synchronized (outSocket) {
                final ByteBuffer buf = ByteBuffer.allocateDirect(9);
                outSocket.read(buf, null, new DataServerResponseHandler(buf, outSocket, resultTable, server, entry.getKey()));
            }

        } catch (Exception e) {

        }
    }

Вот класс DataServerResponseHandler:

class DataServerResponseHandler implements CompletionHandler<Integer, Void> {

    private ConcurrentHashMap<String, Boolean> resultTable = null;
    private AsynchronousSocketChannel channel = null;
    private TcpServer server;
    private String ip;
    private ByteBuffer msg;

    public DataServerResponseHandler(ByteBuffer msg, AsynchronousSocketChannel channel,
            ConcurrentHashMap<String, Boolean> resultTable, TcpServer server, String ip) {
        this.msg = msg;
        this.channel = channel;
        this.resultTable = resultTable;
        this.server = server;
        this.ip = ip;
    }

    @Override
    public void completed(Integer result, Void attachment) {
            try {
                msg.rewind();
                int resultCode = msg.get() & 0xff;
                int ipOne = msg.get() & 0xff;
                int ipTwo = msg.get() & 0xff;
                int ipThree = msg.get() & 0xff;
                int ipFour = msg.get() & 0xff;
                int length = msg.getInt();
                msg.rewind();
                ByteBuffer buf = ByteBuffer.allocateDirect(length);
                channel.read(buf).get();
                buf.rewind();
            } catch (Exception e) {
                e.printStackTrace();
            }

        }

    @Override
    public void failed(Throwable exc, Void attachment) {
        throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
    }

}

person Z1kkurat    schedule 29.03.2015    source источник


Ответы (1)


У этого кода есть несколько проблем.
Первое чтение не гарантирует, что будут прочитаны все оставшиеся байты, но вызывает обработчик завершения, как только он прочитает хотя бы один байт. Поэтому вам нужно проверить положение буфера и повторно вызвать чтение, пока у вас не будет 9 байтов для заголовка или длины для полезной нагрузки.

if (msg.position() < 9) {
    channel.read(msg, null, this);
    return;
}

Для второй части, чтобы продолжить асинхронный подход, снова запустите обработчик чтения с завершением. Вы можете создать новый, который будет специально обрабатывать полезную нагрузку, или повторно использовать существующий, и вам нужно будет запомнить состояние:

switch (state) {
case READ_HEADER:
    if (msg.remaining() > 0) {
        channel.read(msg, null, this);
        return;
    }
    // do the parsing the IP and length
    state = READ_PAYLOAD;
    channel.read(payloadBuf, null, this);
    break;

case READ_PAYLOAD:
    if (payloadBuf.remaining() > 0) {
        channel.read(payloadBuf, null, this);
        return;
    }
    payloadBuf.flip();
    // get content from payloadBuf
    break;
}
person Zbynek Vyskovsky - kvr000    schedule 09.10.2015