Понимание типа запроса Window в Siddhi

Я пытаюсь реализовать базовое окно для входного потока в сиддхи. Это запрос окна

executionPlan = "" +
                "define stream inputStream (height int); " +
                "" +
                "@info(name = 'query1') " +
                "from inputStream #window.length(5) " + 
                "select avg(height) as avgHt " + 
                "insert into outputStream ;";

И вот как я передаю данные во входной поток.

    Object[] obj1 = {10};
    Object[] obj2 = {5};
    for (int i=0;i<10;i++) {
        try {
            inputHandler.send(obj1);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
    for (int i=0;i<20;i++) {
        try {
            inputHandler.send(obj2);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

Я ошибаюсь, полагая, что запрос должен возвращать обратный вызов после каждого ввода в inputHandler. Таким образом, для этого примера начальный вывод должен быть 10, а затем он должен постепенно уменьшаться и стать 5. В точке, где я отправил все 10 и 2 5, я должен получить обратный вызов со средним значением как (10 + 10 + 10 + 5 +5)/5= 8. Но сейчас этого не происходит. Для этой реализации я получаю два обратных вызова со средним значением 10 и 5 соответственно. Почему нет постепенного снижения с 10 до 5?

Вот как я добавляю обратный вызов

executionPlanRuntime.addCallback("query1", new QueryCallback() {
        @Override
        public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
            // printing inEvents
            EventPrinter.print(inEvents);

    });

Что мне здесь не хватает?


person shshnk    schedule 14.12.2015    source источник


Ответы (1)


Поскольку вы отправляете события в пакетном режиме, это группирует события внутри. Но если вы добавите Thread.Sleep(100) между отправляемыми вами событиями, он будет выводиться так, как вы ожидали.

person suho    schedule 14.12.2015
comment
Спасибо за ответы. Он работает, как и ожидалось, после установки Thread.Sleep(). Но есть ли способ избежать этой пакетной обработки событий. Я в порядке, если скорость вывода медленнее, чем скорость ввода. - person shshnk; 15.12.2015
comment
Есть ли способ избежать этой пакетной обработки? Ничего не упоминается об этой пакетной обработке в документации. docs.wso2.com/display/CEP300/Windows - person shshnk; 13.01.2016
comment
Как вы можете видеть в github.com/wso2/siddhi/blob/master/modules/siddhi-core/src/main/ мы группируем выходные данные разрушителя, и когда мы это делаем агрегации в окнах. Это делается в нескольких местах в siddhi, чтобы обеспечить высокую пропускную способность в реальном времени без ущерба для задержки. Этот процесс также уменьшает количество выводимых событий, что также помогает повысить производительность. В настоящее время в Siddhi 3.0.4 у нас нет способа изменить это поведение. Существуют ли допустимые сценарии, которые требовали исправления? - person suho; 15.01.2016