Теперь у нас есть SQL с причудливым оконным интерфейсом во Flink, я пытаюсь указать на убывающую скользящую среднюю как «то, что будет возможно в будущих выпусках Flink как для Table API, так и для SQL». из их дорожной карты SQL / предварительной версии сообщения за 2017-03 гг.:
table
.window(Slide over 1.hour every 1.second as 'w)
.groupBy('productId, 'w)
.select(
'w.end,
'productId,
('unitPrice * ('rowtime - 'w.start).exp() / 1.hour).sum / (('rowtime - 'w.start).exp() / 1.hour).sum)
Вот моя попытка (также вдохновленная примером разложения кальцита) :
SELECT
lb_index one_key,
HOP_START(proctime, INTERVAL '0.05' SECOND, INTERVAL '5' SECOND) start_time,
SUM(Y *
EXP(
proctime -
HOP_START(proctime, INTERVAL '0.05' SECOND, INTERVAL '5' SECOND)
))
FROM write_position
GROUP BY lb_index, HOP(proctime, INTERVAL '0.05' SECOND, INTERVAL '5' SECOND)
Время - это время обработки, которое мы получаем как proctime при создании write_position из таблицы AppendStream как:
tEnv.registerTable(
"write_position",
tEnv.fromDataStream(appendStream, "lb_index, Y, proctime.proctime"))
Я получаю такую ошибку:
Cannot apply '-' to arguments of type '<TIME ATTRIBUTE(PROCTIME)> - <TIME ATTRIBUTE(PROCTIME)>'.
Supported form(s): '<NUMERIC> - <NUMERIC>' '<DATETIME_INTERVAL> - <DATETIME_INTERVAL>' '<DATETIME> - <DATETIME_INTERVAL>'
Я пробовал применить proctime ко всем другим известным мне типам (в попытке достичь земли обетованной NUMERIC), и я просто не могу найти, как заставить его работать.
Я что-то упускаю? Является ли proctime каким-то особым видом «числа изменения системы», которое вы не можете преобразовать? Если это так, все равно должен быть способ сравнить его со значением HOP_START (proctime, ...).