Сервер QuickFIX/J с Vert.x

Я пытаюсь создать Verticle, который запускает принимающий сервер QuickFIX/J (сервер TCP FIX). При запуске поток-акцептор запускается в отдельном потоке, и Vert.x не знает об этом (не блокирует цикл обработки событий). Однако я мог получить доступ к шине событий из принимающего потока и передавать сообщения другим вершинам.

Вопрос в том, является ли это хорошей практикой?

package com.millenniumit.fixgateway.service.impl.quickfix;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
import io.vertx.core.eventbus.MessageConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import quickfix.*;
import quickfix.fix42.NewOrderSingle;

public class FIXServerVerticle extends AbstractVerticle {

    private DynamicSessionProviderConfigHelper dynamicSessionProviderConfig;
    private ThreadedSocketAcceptor threadedSocketAcceptor;
    private static final Logger LOGGER = LoggerFactory.getLogger(FIXServerVerticle.class);

    /**
     * You can’t block waiting for the tcp server to bind in the start method as that would break the Golden Rule.
     * To prevent this, implement the asynchronous start method. This version of the method takes a Future as a parameter.
     * When the method returns the verticle will not be considered deployed.
     * @param startPromise
     */
    @Override
    public void start(Promise<Void> startPromise) {
        Application serverApplication = new Application() {
            @Override
            public void onCreate(SessionID sessionID) {
                LOGGER.info("Session Created : " + sessionID);
            }

            @Override
            public void onLogon(SessionID sessionID) {

            }

            @Override
            public void onLogout(SessionID sessionID) {

            }

            @Override
            public void toAdmin(Message message, SessionID sessionID) {

            }

            @Override
            public void fromAdmin(Message message, SessionID sessionID) throws FieldNotFound, IncorrectDataFormat, IncorrectTagValue, RejectLogon {

            }

            @Override
            public void toApp(Message message, SessionID sessionID) throws DoNotSend {

            }

            @Override
            public void fromApp(Message message, SessionID sessionID) throws FieldNotFound, IncorrectDataFormat, IncorrectTagValue, UnsupportedMessageType {
                LOGGER.info("Processing in worker thread: " + message);
                //Offload processing logic from event loop
                getVertx().executeBlocking(future -> {
                    //blocking code, run on the worker thread
                    LOGGER.info("Processing in worker thread: " + message);
                    //processing logic
                    future.complete(message);
                }, res -> {
                    //non blocking code running on the event loop thread
                    getVertx().eventBus().request("in.message", res.result(), ar -> {
                        if (ar.succeeded()) {
                            Session.lookupSession(sessionID).send((Message) ar.result().body());
                        }
                    });
                });
            }
        };
        //Offload acceptor initialization from event loop
        getVertx().executeBlocking(future -> {
            //blocking code, run on the worker thread
            MessageStoreFactory messageStoreFactory = new NoopStoreFactory();
            MessageFactory messageFactory = new DefaultMessageFactory();
            dynamicSessionProviderConfig = new DynamicSessionProviderConfigHelper();
            try {
                SessionSettings sessionSettings = new SessionSettings("acceptor-config");
                threadedSocketAcceptor = new ThreadedSocketAcceptor(serverApplication, messageStoreFactory, sessionSettings, messageFactory);
                dynamicSessionProviderConfig.configure(threadedSocketAcceptor, serverApplication, messageStoreFactory, sessionSettings, messageFactory);
                threadedSocketAcceptor.start();
                future.complete();
            } catch (ConfigError | FieldConvertError configError) {
                configError.printStackTrace();
                future.fail(configError.getMessage());
            }
        }, res -> {
            //non blocking code running on the event loop thread
            if(res.succeeded()){
                startPromise.complete();
            }else{
                startPromise.fail(res.cause().getMessage());
            }
        });
    }

    public void stop(Promise<Void> stopPromise) {
        //Offload acceptor stop method from event loop
        getVertx().executeBlocking(future -> {
            //blocking code, run on the worker thread
            threadedSocketAcceptor.stop();
            future.complete();
        }, res -> {
            //non blocking code running on the event loop thread
            if(res.succeeded()){
                stopPromise.complete();
            }else{
                stopPromise.fail(res.cause().getMessage());
            }
        });
    }

}

person Jayamal Kulathunge    schedule 16.03.2021    source источник


Ответы (2)


Я не знаком с vert.x, однако обычно в приложениях QuickFIX/J принято разгружать входящие сообщения в отдельный поток/очередь, если требуется более высокая пропускная способность.

person Christoph John    schedule 16.03.2021

Этот подход мне кажется вполне приемлемым, я не вижу противоречий с тем, как работает vert.x.

Кстати, вы можете немного упростить свой код.

Этот:

getVertx().executeBlocking(
  future -> {},
  res -> {
            if(res.succeeded()){
                startPromise.complete();
            }else{
                startPromise.fail(res.cause().getMessage());
            }
        }
)

можно заменить на:

getVertx().executeBlocking(
  future -> {},
  startPromise
)
person injecteer    schedule 18.03.2021