Экспоненциально убывающая скользящая средняя в окне скачкообразного изменения в Flink SQL: время кастинга

Теперь у нас есть SQL с причудливым оконным интерфейсом во Flink, я пытаюсь указать на убывающую скользящую среднюю как «то, что будет возможно в будущих выпусках Flink как для Table API, так и для SQL». из их дорожной карты SQL / предварительной версии сообщения за 2017-03 гг.:

table
  .window(Slide over 1.hour every 1.second as 'w)
  .groupBy('productId, 'w)
  .select(
    'w.end,
    'productId,
    ('unitPrice * ('rowtime - 'w.start).exp() / 1.hour).sum / (('rowtime - 'w.start).exp() / 1.hour).sum)

Вот моя попытка (также вдохновленная примером разложения кальцита) :

SELECT                                                                              
  lb_index one_key,                                                           
  HOP_START(proctime, INTERVAL '0.05' SECOND, INTERVAL '5' SECOND) start_time,  
  SUM(Y * 
      EXP(
        proctime - 
        HOP_START(proctime, INTERVAL '0.05' SECOND, INTERVAL '5' SECOND)
      ))                                                             
FROM write_position                                                                
GROUP BY lb_index, HOP(proctime, INTERVAL '0.05' SECOND, INTERVAL '5' SECOND)

Время - это время обработки, которое мы получаем как proctime при создании write_position из таблицы AppendStream как:

tEnv.registerTable(
    "write_position", 
    tEnv.fromDataStream(appendStream, "lb_index, Y, proctime.proctime"))

Я получаю такую ​​ошибку:

Cannot apply '-' to arguments of type '<TIME ATTRIBUTE(PROCTIME)> - <TIME ATTRIBUTE(PROCTIME)>'. 
Supported form(s): '<NUMERIC> - <NUMERIC>' '<DATETIME_INTERVAL> - <DATETIME_INTERVAL>' '<DATETIME> - <DATETIME_INTERVAL>'

Я пробовал применить proctime ко всем другим известным мне типам (в попытке достичь земли обетованной NUMERIC), и я просто не могу найти, как заставить его работать.

Я что-то упускаю? Является ли proctime каким-то особым видом «числа изменения системы», которое вы не можете преобразовать? Если это так, все равно должен быть способ сравнить его со значением HOP_START (proctime, ...).


person BenoitParis    schedule 12.02.2019    source источник


Ответы (1)


Вы можете использовать timestampDiff для вычитания двух временных точек (см. документы). Вы используете это так

TIMESTAMPDIFF(timepointunit, timepoint1, timepoint2)

где единицей измерения времени может быть СЕКУНДА, МИНУТА, ЧАС, ДЕНЬ, МЕСЯЦ или ГОД.

Я не пробовал это со временем обработки, но он работает с полями времени события, так что, надеюсь, так и будет.

person David Anderson    schedule 12.02.2019
comment
Отлично работает, большое спасибо! Я застрял на другом: заголовок stackoverflow.com/questions/54668892/ Однако не хочу вас слишком беспокоить. Начинаю думать, что мне может понадобиться платное обучение Flink, ха-ха;) - person BenoitParis; 13.02.2019