Отправляйте 35000 jms-сообщений в минуту

У нас есть приложение весенней загрузки для выполнения нагрузочного теста на другом компоненте. Нам нужно отправлять максимум 35000 сообщений JMS в минуту, и по этой причине я использую планировщик для запуска задачи каждую минуту.

Проблема в том, что когда я держу интенсивность низкой, ему удается отправлять сообщения в течение указанного интервала времени (одна минута). Но когда интенсивность высока, для отправки блока сообщений требуется более 1 минуты. Любые предложения по реализации ниже?

Класс планировщика

@Component
public class MessageScheduler {

private final Logger log = LoggerFactory.getLogger(getClass());
private static ScheduledExecutorService executorService = Executors.newScheduledThreadPool(16);
private final static int TIME_PERIOD = ConfigFactory.getConfig().getInt("messages.period").orElse(60000);

@Autowired
JmsSender sender;

    public void startScheduler() {
       Runnable runnableTask = sender::sendMessagesChunk;
       executorService.scheduleAtFixedRate(runnableTask, 0, TIME_PERIOD, 
       TimeUnit.MILLISECONDS);
    }
}

Класс для отправки сообщений

@Component
public class JmsSender {

@Autowired
TrackingManager manager;

private final Logger log = LoggerFactory.getLogger(getClass());
private final static int TOTAL_MESSAGES = ConfigFactory.getConfig().getInt("total.tracking.messages").orElse(10);
private final static int TIME_PERIOD = ConfigFactory.getConfig().getInt("messages.period").orElse(60000);
private static int failedPerPeriod=0;
private static int totalFailed=0;
private static int totalMessageCounter=0;

public void sendMessagesChunk() {
    log.info("Started  at: {}", Instant.now());
    log.info("Sending messages with intensity {} messages/minute", TOTAL_MESSAGES);
    for (int i=0; i<TOTAL_MESSAGES; i++) {
        try {
            long start = System.currentTimeMillis();
            MessageDTO msg = manager.createMessage();
            send(msg);
            long stop = System.currentTimeMillis();
            if (timeOfDelay(stop-start)>=0L) {
                Thread.sleep(timeOfDelay(stop-start));
            }
        } catch (Exception e) {
            log.info("Error :  " + e.getMessage());
            failedPerPeriod++;
        }
    }
    totalMessageCounter += TOTAL_MESSAGES;
    totalFailed += failedPerPeriod;
    log.info("Finished  at: {}", Instant.now());
    log.info("Success rate(of last minute): {} %, Succeeded: {}, Failed: {}, Success rate(in total): {} %, Succeeded: {}, Failed: {}"
            ,getSuccessRatePerPeriod(), getSuccededPerPeriod(), failedPerPeriod,
            getTotalSuccessRate(), getTotalSucceded(), totalFailed);
    failedPerPeriod =0;
}

private long timeOfDelay(Long elapsedTime){
    return (TIME_PERIOD / TOTAL_MESSAGES) - elapsedTime;
}
private int getSuccededPerPeriod(){
    return TOTAL_MESSAGES - failedPerPeriod;
}

private int getTotalSucceded(){
    return totalMessageCounter - totalFailed;
}

private double getSuccessRatePerPeriod(){
    return getSuccededPerPeriod()*100D / TOTAL_MESSAGES;
}

private double getTotalSuccessRate(){
    return getTotalSucceded()*100D / totalMessageCounter;
}

private void send(MessageDTO messageDTO) throws Exception {
    requestContextInitializator();
    JmsClient client = JmsClientBuilder.newClient(UriScheme.JmsType.AMQ);
    client.target(new URI("activemq:queue:" + messageDTO.getDestination()))
            .msgTypeVersion(messageDTO.getMsgType(), messageDTO.getVersion())
            .header(Header.MSG_VERSION, messageDTO.getVersion())
            .header(Header.MSG_TYPE, messageDTO.getMsgType())
            .header(Header.TRACKING_ID, UUID.randomUUID().toString())
            .header(Header.CLIENT_ID, "TrackingJmsClient")
            .post(messageDTO.getPayload());
}

person Dimitris Baltas    schedule 10.06.2019    source источник
comment
IDK что-нибудь о Spring, но похоже, что постоянное создание JmsClient в методе отправки может быть проблемой производительности?   -  person OldProgrammer    schedule 10.06.2019
comment
Спасибо за ваш комментарий! Определенно мне нужно создать экземпляр jms-клиента только один раз...   -  person Dimitris Baltas    schedule 11.06.2019


Ответы (1)


Вы должны решить две задачи:

  1. общее время операции отправки должно быть меньше максимального времени.
  2. сообщения должны отправляться не максимально быстро, а равномерно в течение всего доступного времени.

Очевидно, что если ваш метод send слишком медленный, максимальное время будет превышено.

Более быстрый способ отправки сообщений — использовать какую-либо массовую операцию. Неважно, если ваш MQ API не поддерживает массовые операции, вы не сможете его использовать! из-за второго ограничения («равномерно»).

Вы можете отправлять сообщения асинхронно, но если ваш MQ API создает для этого потоки вместо «неблокирующего» асинхронного режима, у вас могут возникнуть проблемы с памятью.

Используя javax.jms.MessageProducer.send, вы можете отправлять сообщения асинхронно, но для каждого будет создан новый поток (будет создано много памяти и серверных потоков).

Другим ускорением может быть создание только одного JMS-клиента (ваш метод send).

Чтобы выполнить второе требование, вы должны исправить свою функцию timeOfDelay, она неверна. Действительно, вы должны принять во внимание распределение вероятностей функции send, чтобы оценить правильное значение, но вы можете просто сделать:

    long accTime = 0L;
    for (int i=0; i<TOTAL_MESSAGES; i++) {
        try {
            long start = System.currentTimeMillis();
            MessageDTO msg = manager.createMessage();
            send(msg);
            long stop = System.currentTimeMillis();
            accTime += stop - start;
            if(accTime < TIME_PERIOD)
                Thread.sleep((TIME_PERIOD - accTime) / (TOTAL_MESSAGES - i));
        } catch (Exception e) {
            log.info("Error :  " + e.getMessage());
            failedPerPeriod++;
        }
    }
person josejuan    schedule 10.06.2019
comment
Спасибо за ваш ответ! К сожалению, мой MQ API не поддерживает массовые операции. Теперь я вижу свою ошибку в методе timeOfDelay. Я попытаюсь реализовать ваши предложения и проверить еще раз. У меня есть один вопрос. Я предполагаю, что если accTime›TIME_PERIOD я должен прекратить отправлять больше сообщений в течение этого периода времени... Верно? - person Dimitris Baltas; 11.06.2019
comment
@DimitrisBaltas Массовая операция не имеет значения. Остановить отправку зависит от того, что важнее, отправить все сообщения или не использовать больше времени. - person josejuan; 11.06.2019