Оценка водяного знака для времени события в луче

Я пытаюсь использовать Beam для агрегирования по набору данных, используя время события из данных и Kafka в качестве источника данных. Это работает, если все мои разделы kafka заполнены данными. Однако, как только раздел еще не был записан, водяной знак не может быть оценен и расширен. Моя политика TimeStampPolicy следующая:

public class CustomTimeStampPolicy
    extends TimestampPolicy<String, titan.ccp.model.records.ActivePowerRecord> {
  protected Instant currentWatermark;

  public CustomTimeStampPolicy(final Optional<Instant> previousWatermark) {
    this.currentWatermark = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE);
  }


  @Override
  public Instant getTimestampForRecord(final PartitionContext ctx,
      final KafkaRecord<String, titan.ccp.model.records.ActivePowerRecord> record) {
    this.currentWatermark = new Instant(record.getKV().getValue().getTimestamp());
    return this.currentWatermark;
  }

  @Override
  public Instant getWatermark(final PartitionContext ctx) {
    System.out.println("Current Watermark: " + this.currentWatermark);
    return this.currentWatermark;
  }
}

С тремя разделами Kafka, из которых только один заполнен данными, мои журналы показывают мне эти водяные знаки:

Current Watermark: -290308-12-21T19:59:05.225Z
Current Watermark: 2020-12-09T10:42:29.909Z
Current Watermark: -290308-12-21T19:59:05.225Z

По умолчанию мои окна не срабатывают. Я предполагаю, что выходной водяной знак является минимальным по сравнению с водяными знаками разделов. И поэтому не будет продвигаться, пока некоторые из моих разделов пусты. Как я могу обрабатывать пустые разделы с обработкой времени события?


person Robert156    schedule 09.12.2020    source источник


Ответы (1)


Если в раздел Kafka не записаны данные, Beam не имеет возможности узнать, что после записи элемента у него не будет произвольной временной метки в прошлом, отсюда и очень старый водяной знак.

Вы можете попробовать обновить конструктор политики отметок времени до previousWatermark.orElse(wallTime - someMaximumSkew)

где someMaximumSkew - самая большая задержка, которую вы можете ожидать для данных, записанных в kafka. Вы также можете подумать о том, чтобы взять минимум предыдущего значения (если есть) и wallTime - someMaximumSkew для продвижения, когда никакие данные не записывались какое-то время.

person robertwb    schedule 09.12.2020