Я пытаюсь создать Verticle, который запускает принимающий сервер QuickFIX/J (сервер TCP FIX). При запуске поток-акцептор запускается в отдельном потоке, и Vert.x не знает об этом (не блокирует цикл обработки событий). Однако я мог получить доступ к шине событий из принимающего потока и передавать сообщения другим вершинам.
Вопрос в том, является ли это хорошей практикой?
package com.millenniumit.fixgateway.service.impl.quickfix;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
import io.vertx.core.eventbus.MessageConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import quickfix.*;
import quickfix.fix42.NewOrderSingle;
public class FIXServerVerticle extends AbstractVerticle {
private DynamicSessionProviderConfigHelper dynamicSessionProviderConfig;
private ThreadedSocketAcceptor threadedSocketAcceptor;
private static final Logger LOGGER = LoggerFactory.getLogger(FIXServerVerticle.class);
/**
* You can’t block waiting for the tcp server to bind in the start method as that would break the Golden Rule.
* To prevent this, implement the asynchronous start method. This version of the method takes a Future as a parameter.
* When the method returns the verticle will not be considered deployed.
* @param startPromise
*/
@Override
public void start(Promise<Void> startPromise) {
Application serverApplication = new Application() {
@Override
public void onCreate(SessionID sessionID) {
LOGGER.info("Session Created : " + sessionID);
}
@Override
public void onLogon(SessionID sessionID) {
}
@Override
public void onLogout(SessionID sessionID) {
}
@Override
public void toAdmin(Message message, SessionID sessionID) {
}
@Override
public void fromAdmin(Message message, SessionID sessionID) throws FieldNotFound, IncorrectDataFormat, IncorrectTagValue, RejectLogon {
}
@Override
public void toApp(Message message, SessionID sessionID) throws DoNotSend {
}
@Override
public void fromApp(Message message, SessionID sessionID) throws FieldNotFound, IncorrectDataFormat, IncorrectTagValue, UnsupportedMessageType {
LOGGER.info("Processing in worker thread: " + message);
//Offload processing logic from event loop
getVertx().executeBlocking(future -> {
//blocking code, run on the worker thread
LOGGER.info("Processing in worker thread: " + message);
//processing logic
future.complete(message);
}, res -> {
//non blocking code running on the event loop thread
getVertx().eventBus().request("in.message", res.result(), ar -> {
if (ar.succeeded()) {
Session.lookupSession(sessionID).send((Message) ar.result().body());
}
});
});
}
};
//Offload acceptor initialization from event loop
getVertx().executeBlocking(future -> {
//blocking code, run on the worker thread
MessageStoreFactory messageStoreFactory = new NoopStoreFactory();
MessageFactory messageFactory = new DefaultMessageFactory();
dynamicSessionProviderConfig = new DynamicSessionProviderConfigHelper();
try {
SessionSettings sessionSettings = new SessionSettings("acceptor-config");
threadedSocketAcceptor = new ThreadedSocketAcceptor(serverApplication, messageStoreFactory, sessionSettings, messageFactory);
dynamicSessionProviderConfig.configure(threadedSocketAcceptor, serverApplication, messageStoreFactory, sessionSettings, messageFactory);
threadedSocketAcceptor.start();
future.complete();
} catch (ConfigError | FieldConvertError configError) {
configError.printStackTrace();
future.fail(configError.getMessage());
}
}, res -> {
//non blocking code running on the event loop thread
if(res.succeeded()){
startPromise.complete();
}else{
startPromise.fail(res.cause().getMessage());
}
});
}
public void stop(Promise<Void> stopPromise) {
//Offload acceptor stop method from event loop
getVertx().executeBlocking(future -> {
//blocking code, run on the worker thread
threadedSocketAcceptor.stop();
future.complete();
}, res -> {
//non blocking code running on the event loop thread
if(res.succeeded()){
stopPromise.complete();
}else{
stopPromise.fail(res.cause().getMessage());
}
});
}
}