Flink - скользящее окно во время события с пропущенными данными в окне из-за временных интервалов

Предположим, у меня есть поток торговых событий на фондовом рынке, например:

technical1, ALXN, 1/1/2016
technical1, CELG, 1/1/2016
technical2, ALXN, 1/2/2016
technical2, CELG, 1/2/2016
. . . 
technicalN, ALXN, 4/1/2018
technicalN, CELG, 4/1/2018

таким образом, TechnicalN (где N - некоторое число) представляет N-ю техническую торговую запись [Open (float), High (float), Low (float), Close (float), Volume (int)] дневных запасов на конец дня. рыночные торговые данные для данной компании. (т.е. технический1 для тикера GOOG отличается от технического1 для тикера MSFT.) Например:

12.52, 19.25, 09.11, 17.54, 120532, GOOG, 1/1/2017
14.37, 29.52, 01.53, 12.96, 627156, MSFT, 1/1/2017

(Обратите внимание, что эти торговые цены / объемы полностью вымышлены.)

Допустим, я хочу создать окно размером 2 с интервалом в 1 день, чтобы наши данные выглядели примерно так:

[technical1, GOOG, 12/26/2017; technical2, GOOG, 12/27/2017]
[technical1, MSFT, 12/26/2017; technical2, MSFT, 12/27/2017]
[technical2, GOOG, 12/27/2017; technical3, GOOG, 12/28/2017]
[technical2, MSFT, 12/27/2017; technical3, MSFT, 12/28/2017]
[technical3, GOOG, 12/28/2017; technical4, GOOG, 12/29/2017]
[technical3, MSFT, 12/28/2017; technical4, MSFT, 12/29/2017]
[technical4, GOOG, 12/29/2017; technical5, GOOG, 12/30/2017]
[technical4, MSFT, 12/29/2017; technical5, MSFT, 12/30/2017]
[technical5, GOOG, 12/30/2017; technical6, GOOG, 12/31/2017]
[technical5, MSFT, 12/30/2017; technical6, MSFT, 12/31/2017]
[technical6, GOOG, 12/31/2017; technical7, GOOG, 01/01/2018]
[technical6, MSFT, 12/31/2017; technical7, MSFT, 01/01/2018]
[technical7, GOOG, 01/01/2018; technical8, GOOG, 01/02/2018]
[technical7, MSFT, 01/01/2018; technical8, MSFT, 01/02/2018]
[technical8, GOOG, 01/02/2018; technical9, GOOG, 01/03/2018]
[technical8, MSFT, 01/02/2018; technical9, MSFT, 01/03/2018]
[. . .]
[technicalN, GOOG, 04/01/2018; technicalN+1, GOOG, 04/02/2018]
[technicalN, MSFT, 04/01/2018; technicalN+1, MSFT, 04/02/2018]
. . .

Это было бы хорошо, но это проблематично, потому что даты торгов на фондовом рынке не являются непрерывными ... Другими словами, если я правильно понимаю механику Flink (и я могу ошибаться), проблема с использованием скользящего окна во время события, например это:

DataStream<T> input = ...;

// sliding event-time windows
input
.keyBy((TechnicalDataEntry technical) -> technical.ticker)
.window(SlidingEventTimeWindows.of(Time.day(2), Time.day(1))) // Window size of 2 days, sliding interval of 1 day
.<windowed transformation>(<window function>);

в случае с такими данными значения дат не являются непрерывными (это означает, что они следуют дискретному ряду, содержащему разрывы в один или несколько пропущенных дней) потому что нет данных фондового рынка для дат, в которые фондовый рынок закрыт, например, в праздничные или выходные дни. Итак, с учетом этого, наш поток на самом деле будет выглядеть примерно так (потому что торговля закрыта 30.12.2017, 31.12.2017 и 01.01.2018):

[technical1, GOOG, 12/26/2017; technical2, GOOG, 12/27/2017]
[technical1, MSFT, 12/26/2017; technical2, MSFT, 12/27/2017]
[technical2, GOOG, 12/27/2017; technical3, GOOG, 12/28/2017]
[technical2, MSFT, 12/27/2017; technical3, MSFT, 12/28/2017]
[technical3, GOOG, 12/28/2017; technical4, GOOG, 12/29/2017]
[technical3, MSFT, 12/28/2017; technical4, MSFT, 12/29/2017]
[technical4, GOOG, 12/29/2017; NULL]
[technical4, MSFT, 12/29/2017; NULL]
[NULL; NULL]
[NULL; NULL]
[NULL; NULL]
[NULL; NULL]
[NULL; technical8, GOOG, 01/02/2018]
[NULL; technical8, MSFT, 01/02/2018]
[technical8, GOOG, 01/02/2018; technical9, GOOG, 01/03/2018]
[technical8, MSFT, 01/02/2018; technical9, MSFT, 01/03/2018]
[. . .]
[technicalN, GOOG, 04/01/2018; technicalN+1, GOOG, 04/02/2018]
[technicalN, MSFT, 04/01/2018; technicalN+1, MSFT, 04/02/2018]

Как мне заставить мой поток Flink игнорировать отсутствующие даты (и вместо этого окно или объединение или сопоставление последовательных непустых дат), чтобы мой поток вместо этого выглядел так:

[technical1, GOOG, 12/26/2017; technical2, GOOG, 12/27/2017]
[technical1, MSFT, 12/26/2017; technical2, MSFT, 12/27/2017]
[technical2, GOOG, 12/27/2017; technical3, GOOG, 12/28/2017]
[technical2, MSFT, 12/27/2017; technical3, MSFT, 12/28/2017]
[technical3, GOOG, 12/28/2017; technical4, GOOG, 12/29/2017]
[technical3, MSFT, 12/28/2017; technical4, MSFT, 12/29/2017]
[technical4, GOOG, 12/29/2017; technical5, GOOG, 01/02/2018]
[technical4, MSFT, 12/29/2017; technical5, MSFT, 01/02/2018]
[technical5, GOOG, 01/02/2018; technical6, GOOG, 01/03/2018]
[technical5, MSFT, 01/02/2018; technical6, MSFT, 01/03/2018]
[. . .]
[technicalN, GOOG, 04/01/2018; technicalN+1, GOOG, 04/02/2018]
[technicalN, MSFT, 04/01/2018; technicalN+1, MSFT, 04/02/2018]

?

(Примечание: пожалуйста, не обращайте внимания на то, как я увеличиваю число строкой "технический" (например, технический1, технический2 и т. Д.), Потому что, как я уже упоминал, это значение было просто для описательных целей в этом сообщении и не на самом деле существуют в данных. Единственный способ определить, являются ли две торговые записи последовательными, - это сгруппировать их по тикеру и упорядочить по дате торговли. Предположим, что повторяющихся событий не существует.)


person devinbost    schedule 09.10.2018    source источник


Ответы (1)


Если я правильно понимаю, ваша проблема в том, что, поскольку есть определенные периоды, когда вы не получаете события, окна не будут вести себя должным образом, поскольку они не знают о течении времени.

Один из вариантов, который у вас есть, - это перидиокальное создание водяного знака, например:

streamEnvironment.addSource(new SourceFunction<Object>() {
        @Override
        public void run(final SourceContext<Object> ctx) {
            (...)

            ctx.emitWatermark(new Watermark(timestamp));
        }

        @Override
        public void cancel() {

        }
    })

Имейте в виду, что если вы получаете события до водяного знака, они будут проигнорированы, поэтому периодичность вашего водяного знака является компромиссом между «точностью окна» (срабатывание при первой возможности) и терпимостью к поздним событиям.

person gcandal    schedule 12.10.2018