Я пытаюсь реализовать базовое окно для входного потока в сиддхи. Это запрос окна
executionPlan = "" +
"define stream inputStream (height int); " +
"" +
"@info(name = 'query1') " +
"from inputStream #window.length(5) " +
"select avg(height) as avgHt " +
"insert into outputStream ;";
И вот как я передаю данные во входной поток.
Object[] obj1 = {10};
Object[] obj2 = {5};
for (int i=0;i<10;i++) {
try {
inputHandler.send(obj1);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
for (int i=0;i<20;i++) {
try {
inputHandler.send(obj2);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
Я ошибаюсь, полагая, что запрос должен возвращать обратный вызов после каждого ввода в inputHandler
. Таким образом, для этого примера начальный вывод должен быть 10, а затем он должен постепенно уменьшаться и стать 5. В точке, где я отправил все 10 и 2 5, я должен получить обратный вызов со средним значением как (10 + 10 + 10 + 5 +5)/5= 8. Но сейчас этого не происходит. Для этой реализации я получаю два обратных вызова со средним значением 10 и 5 соответственно. Почему нет постепенного снижения с 10 до 5?
Вот как я добавляю обратный вызов
executionPlanRuntime.addCallback("query1", new QueryCallback() {
@Override
public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
// printing inEvents
EventPrinter.print(inEvents);
});
Что мне здесь не хватает?