Akka поток объектов по http

У меня есть фрагмент кода (см. ниже), который порождает сервер, который повторяет каждый поток ByteString, который он получает с порта 6001. В примере также определяется клиент, который подключается к серверу и отправляет поток ByteString, содержащий список символов из буква от «а» до «з».

На данный момент у меня возникает вопрос: предлагает ли akka способ отправки и получения объектов Stream вместо ByStreams через http? Например, объекты класса Client.

Если да, то как я могу отправлять и получать такой поток объектов? Не могли бы вы предоставить мне фрагмент, который показывает, как это выполнить?

Документация Akka неудобна для неигровых примеров...

Спасибо за вашу помощь

открытый класс TcpEcho {

/**
 * Use without parameters to start both client and server.
 *
 * Use parameters `server 0.0.0.0 6001` to start server listening on port
 * 6001.
 *
 * Use parameters `client 127.0.0.1 6001` to start client connecting to
 * server on 127.0.0.1:6001.
 *
 */
public static void main(String[] args) throws IOException {
    if (args.length == 0) {
        ActorSystem system = ActorSystem.create("ClientAndServer");
        InetSocketAddress serverAddress = new InetSocketAddress("127.0.0.1", 6000);
        server(system, serverAddress);
        client(system, serverAddress);
    } else {
        InetSocketAddress serverAddress;
        if (args.length == 3) {
            serverAddress = new InetSocketAddress(args[1], Integer.valueOf(args[2]));
        } else {
            serverAddress = new InetSocketAddress("127.0.0.1", 6000);
        }
        if (args[0].equals("server")) {
            ActorSystem system = ActorSystem.create("Server");
            server(system, serverAddress);
        } else if (args[0].equals("client")) {
            ActorSystem system = ActorSystem.create("Client");
            client(system, serverAddress);
        }
    }
}

public static void server(ActorSystem system, InetSocketAddress serverAddress) {
    final ActorMaterializer materializer = ActorMaterializer.create(system);

    final Sink<IncomingConnection, CompletionStage<Done>> handler = Sink.foreach(conn -> {
        System.out.println("Client connected from: " + conn.remoteAddress());
        conn.handleWith(Flow.<ByteString> create(), materializer);
    });

    final CompletionStage<ServerBinding> bindingFuture = Tcp.get(system)
            .bind(serverAddress.getHostString(), serverAddress.getPort()).to(handler).run(materializer);

    bindingFuture.whenComplete((binding, throwable) -> {
        System.out.println("Server started, listening on: " + binding.localAddress());
    });

    bindingFuture.exceptionally(e -> {
        System.err.println("Server could not bind to " + serverAddress + " : " + e.getMessage());
        system.terminate();
        return null;
    });

}

public static void client(ActorSystem system, InetSocketAddress serverAddress) {
    final ActorMaterializer materializer = ActorMaterializer.create(system);

    final List<ByteString> testInput = new ArrayList<>();
    for (char c = 'a'; c <= 'z'; c++) {
        testInput.add(ByteString.fromString(String.valueOf(c)));
    }

    Source<ByteString, NotUsed> responseStream = Source.from(testInput)
            .via(Tcp.get(system).outgoingConnection(serverAddress.getHostString(), serverAddress.getPort()));

    CompletionStage<ByteString> result = responseStream.runFold(ByteString.empty(), (acc, in) -> acc.concat(in),
            materializer);

    result.whenComplete((success, failure) -> {

        if (failure != null) {
            System.err.println("Failure: " + failure.getMessage());
        } else {
            System.out.println("Result: " + success.utf8String());
        }
        System.out.println("Shutting down client");
        system.terminate();

    });
}

}


person broga    schedule 03.04.2016    source источник
comment
Вы видели этот пример о том, как создать поток биди, я думаю, он делает более или менее то, о чем вы просите?   -  person lpiepiora    schedule 03.04.2016
comment
Не смотрел, но посмотрю. В любом случае, у вас есть лучшая идея/предложение в виде фрагмента кода? Спасибо   -  person broga    schedule 04.04.2016


Ответы (1)


akka.stream.{javadsl,scaladsl}.Framing содержит утилиты, помогающие создавать согласованные сообщения. Например, вы можете отправлять свои сообщения через Framing.simpleFramingProtocolEncoder(maxLength), чтобы автоматически добавлять к ним информацию о длине. С другой стороны, Framing.simpleFramingProtocolDecoder(maxLength) позаботится о декодировании сообщения в соответствии с приложенной к нему информацией о длине.

Если вы хотите манипулировать простыми объектами, вам просто нужно сериализовать их в ByteString перед отправкой через кодировщик и десериализовать их из ByteString после получения их представления от декодера.

person Samuel Tardieu    schedule 04.04.2016