Буду признателен, если мне помогут с windowing
в apache beam 2.13.0
.
Я использую python 3.7.3
.
[ywatanabe@fedora-30-00 bdd100k-to-es]$ python3 -V
Python 3.7.3
Я хочу сделать именно то, что этот пример делает с ограниченным данные однако. То есть группировать события по каждому триггеру и передавать их следующему преобразованию.
8.4.1.2. Discarding mode
If our trigger is set to discarding mode, the trigger emits the following values on each firing:
First trigger firing: [5, 8, 3]
Second trigger firing: [15, 19, 23]
Third trigger firing: [9, 13, 10]
Ссылаясь на пример, я написал свой код, как показано ниже,
es = (gps | 'window:gps' >> WindowInto(
FixedWindows(1 * 60),
trigger=Repeatedly(
AfterAny(
AfterCount(1000000),
AfterProcessingTime(1 * 60)
)
),
accumulation_mode=AccumulationMode.DISCARDING
)
| 'bulk:gps' >> beam.ParDo(BulkToESFn(esHost), tag_gps))
Однако в приведенном выше коде триггер срабатывает почти каждую миллисекунду, а не каждую минуту или 1 000 000 событий.
2019-07-15 20:13:20,401 INFO Sending bulk request to elasticsearch. Doc counts: 11 Docs: {'track_id': '514df98862de83a07e7aff62dff77c3d', 'media_id': 'afe35b87-0a9acea6', 'ride_id': 'afe35b87d0b69e1928dd0a4fd75a1416', 'filename': '0a9acea6-62d6-4540-b048-41e34e2407c6.mov', 'timestamp': 1505287487.0, 'timezone': 'America/Los_Angeles', 'coordinates': {'lat': 37.786611081350365, 'lon': -122.3994713602353}, 'altitude': 16.06207275390625, 'vertical_accuracy': 4.0, 'horizantal_accuracy': 10.0, 'speed': 2.3399999141693115}
2019-07-15 20:13:20,403 INFO Sending bulk request to elasticsearch. Doc counts: 11 Docs: {'track_id': '514df98862de83a07e7aff62dff77c3d', 'media_id': 'afe35b87-0a9acea6', 'ride_id': 'afe35b87d0b69e1928dd0a4fd75a1416', 'filename': '0a9acea6-62d6-4540-b048-41e34e2407c6.mov', 'timestamp': 1505287488.0, 'timezone': 'America/Los_Angeles', 'coordinates': {'lat': 37.78659459994027, 'lon': -122.39945105706596}, 'altitude': 15.888671875, 'vertical_accuracy': 4.0, 'horizantal_accuracy': 10.0, 'speed': 2.3299999237060547}
2019-07-15 20:13:20,406 INFO Sending bulk request to elasticsearch. Doc counts: 11 Docs: {'track_id': '514df98862de83a07e7aff62dff77c3d', 'media_id': 'afe35b87-0a9acea6', 'ride_id': 'afe35b87d0b69e1928dd0a4fd75a1416', 'filename': '0a9acea6-62d6-4540-b048-41e34e2407c6.mov', 'timestamp': 1505287489.0, 'timezone': 'America/Los_Angeles', 'coordinates': {'lat': 37.78657796009011, 'lon': -122.39943055871701}, 'altitude': 15.741912841796875, 'vertical_accuracy': 4.0, 'horizantal_accuracy': 10.0, 'speed': 2.549999952316284}
Нужен ли мне какой-либо другой вариант окон для этого случая?