Как инициировать события по счету в оконном режиме для луча apache?

Буду признателен, если мне помогут с windowing в apache beam 2.13.0.

Я использую python 3.7.3 .

[ywatanabe@fedora-30-00 bdd100k-to-es]$ python3 -V
Python 3.7.3

Я хочу сделать именно то, что этот пример делает с ограниченным данные однако. То есть группировать события по каждому триггеру и передавать их следующему преобразованию.

8.4.1.2. Discarding mode
If our trigger is set to discarding mode, the trigger emits the following values on each firing:

  First trigger firing:  [5, 8, 3]
  Second trigger firing:           [15, 19, 23]
  Third trigger firing:                         [9, 13, 10]

Ссылаясь на пример, я написал свой код, как показано ниже,

    es = (gps | 'window:gps' >> WindowInto(
                   FixedWindows(1 * 60),
                   trigger=Repeatedly(
                       AfterAny(
                           AfterCount(1000000),
                           AfterProcessingTime(1 * 60)
                       )
                   ),
                   accumulation_mode=AccumulationMode.DISCARDING
                   )
              | 'bulk:gps' >> beam.ParDo(BulkToESFn(esHost), tag_gps))

Однако в приведенном выше коде триггер срабатывает почти каждую миллисекунду, а не каждую минуту или 1 000 000 событий.

2019-07-15 20:13:20,401 INFO Sending bulk request to elasticsearch. Doc counts: 11 Docs: {'track_id': '514df98862de83a07e7aff62dff77c3d', 'media_id': 'afe35b87-0a9acea6', 'ride_id': 'afe35b87d0b69e1928dd0a4fd75a1416', 'filename': '0a9acea6-62d6-4540-b048-41e34e2407c6.mov', 'timestamp': 1505287487.0, 'timezone': 'America/Los_Angeles', 'coordinates': {'lat': 37.786611081350365, 'lon': -122.3994713602353}, 'altitude': 16.06207275390625, 'vertical_accuracy': 4.0, 'horizantal_accuracy': 10.0, 'speed': 2.3399999141693115}
2019-07-15 20:13:20,403 INFO Sending bulk request to elasticsearch. Doc counts: 11 Docs: {'track_id': '514df98862de83a07e7aff62dff77c3d', 'media_id': 'afe35b87-0a9acea6', 'ride_id': 'afe35b87d0b69e1928dd0a4fd75a1416', 'filename': '0a9acea6-62d6-4540-b048-41e34e2407c6.mov', 'timestamp': 1505287488.0, 'timezone': 'America/Los_Angeles', 'coordinates': {'lat': 37.78659459994027, 'lon': -122.39945105706596}, 'altitude': 15.888671875, 'vertical_accuracy': 4.0, 'horizantal_accuracy': 10.0, 'speed': 2.3299999237060547}
2019-07-15 20:13:20,406 INFO Sending bulk request to elasticsearch. Doc counts: 11 Docs: {'track_id': '514df98862de83a07e7aff62dff77c3d', 'media_id': 'afe35b87-0a9acea6', 'ride_id': 'afe35b87d0b69e1928dd0a4fd75a1416', 'filename': '0a9acea6-62d6-4540-b048-41e34e2407c6.mov', 'timestamp': 1505287489.0, 'timezone': 'America/Los_Angeles', 'coordinates': {'lat': 37.78657796009011, 'lon': -122.39943055871701}, 'altitude': 15.741912841796875, 'vertical_accuracy': 4.0, 'horizantal_accuracy': 10.0, 'speed': 2.549999952316284}

Нужен ли мне какой-либо другой вариант окон для этого случая?


person Yu Watanabe    schedule 15.07.2019    source источник


Ответы (1)


Я думаю, что оконная стратегия и триггерная стратегия вступают в силу на этапе GBK. https://beam.apache.org/documentation/programming-guide/#windowing< /а>

В вашем случае, я думаю, вы можете реализовать DoFn (BulkToESFn) таким образом, чтобы он буферизировал данные и записывал в ES только тогда, когда счетчик превышает предопределенное значение.

class BulkToESFn(DoFn):
  def __init__(self,
               batch_size=1000000):
    self.batch_size = batch_size
    self.batch = []

  def finish_bundle(self):
    self._flush()

  def process(self, element, *args, **kwargs):
    self.batch.append(element)
    if len(self.batch) >= self.batch_size:
      self._flush()

  def _flush(self):
    writeToES(self.batch)
    self.batch = []
person Yichi Zhang    schedule 16.07.2019
comment
Это потрясающий ответ! Мне было неясно, что такое пакеты на самом деле и как они генерируются, поэтому я решил добавить сюда свое исследование. Оказывается, пакеты — это фрагменты ваших данных, которые отправляются рабочим сервером, и вы не контролируете, сколько элементов находится в пакете. Что хорошего в этом примере кода, так это то, что он будет сбрасываться при заполнении пакета И будет сбрасываться в конце этого пакета, чтобы обработать случай частично заполненного пакета. - person framebit; 23.01.2020
comment
Это не работает. В моем случае, используя приведенный выше код, всегда выполняйте _flush() только с finish_bundle. Поскольку размер пакета устанавливает луч apache, я никогда не достигаю batch_size. - person Mapad; 07.12.2020