Вопросы по теме 'apache-beam-io'

Создание / запись в разделенную таблицу BigQuery через поток данных Google Cloud
Я хотел воспользоваться преимуществами новой функции BigQuery для таблиц с временным разделением, но не уверен, что в настоящее время это возможно в версии 1.6 Dataflow SDK. Посмотрите на BigQuery JSON API , чтобы создать дневную секционированную...
5427 просмотров

Как повысить производительность TextIO или AvroIO при чтении очень большого количества файлов?
TextIO.read() и AvroIO.read() (а также некоторые другие операции ввода-вывода Beam) по умолчанию не очень хорошо работают в текущих средствах выполнения Apache Beam при чтении шаблона файла, который расширяется до очень большого количества файлов...
1239 просмотров

Управление смещением Apache Beam KafkaIO во внешние хранилища данных
Я пытаюсь читать от нескольких брокеров kafka, используя KafkaIO на луче apache. Параметр по умолчанию для управления смещением - это сам раздел kafka (больше не используется zookeper из kafka> 0.9). При такой настройке, когда я перезапускаю задание...
616 просмотров

Использование defaultNaming для динамической оконной записи в Apache Beam
Я следую вместе с ответом на этот сообщение и документация для выполнения динамической оконной записи моих данных в конце конвейера. Вот что у меня есть до сих пор: static void applyWindowedWrite(PCollection<String> stream) {...
1275 просмотров

Потоковая передача MutationGroups в Spanner
Я пытаюсь передать MutationGroups в гаечный ключ с помощью SpannerIO. Цель состоит в том, чтобы писать новые MuationGroups каждые 10 секунд, так как мы будем использовать гаечный ключ для запроса ближайших KPI. Когда я не использую никаких окон, я...
655 просмотров

Разница между beam.ParDo и beam.Map в типе вывода?
Я использую Apache-Beam для выполнения некоторых преобразований данных, включая извлечение данных из txt, csv и различных источников данных. Я заметил одну вещь: разницу в результатах при использовании beam.Map и beam.ParDo В следующем...
5286 просмотров
schedule 10.03.2022

Kafka: конфигурация семантики ровно один раз с использованием Apache Beam
Я пытаюсь настроить ровно один раз семантику в Kafka (Apache Beam). Вот изменения, которые я собираюсь представить: Производитель: enable.idenpotence = верно transactional.id = uniqueTransactionalId Потребитель:...
411 просмотров

Beam применяет PTransform к значениям с сохранением ключа
Кажется, я борюсь с этим шаблоном в Beam. Это потоковый конвейер. На высоком уровне: сообщение приходит кролику содержимое сообщения включает ID и пути к файлу N S3 Я хочу произвести некоторую агрегацию по всем перечисленным файлам S3, но...
161 просмотров

Луч Apache с Redis - выбрать базу данных и прочитать из хэша?
Я начинаю с Apache Beam и хочу прочитать хэш, который я сохранил в Redis, и мне также нужно будет выбрать базу данных (номер). Я просмотрел исходный код RedisIO, но не похоже, что он включает в себя возможность делать любую из этих вещей. Я что-то...
152 просмотров
schedule 04.05.2024

внешний вызов API в потоке данных Apache Beam
У меня есть случай использования, когда я читаю элементы новой строки json, хранящиеся в облачном хранилище Google, и начинаю обрабатывать каждый json. При обработке каждого json я должен вызывать внешний API для дедупликации, был ли этот элемент...
3587 просмотров

Чтение файла Parquet с помощью Apache Beam Java SDK без предоставления схемы
Кажется, что метод org.apache.beam.sdk.io.parquet.ParquetIO.readFiles требует передачи схемы. Есть ли способ избежать необходимости передавать схему? Разве схема не включена в файл Parquet? Что делать, если я пытаюсь прочитать несколько...
390 просмотров
schedule 06.04.2023

облачный поток данных облачный sql-поток данных, выдающий исключение нулевого указателя
Я пытаюсь обработать значительное количество записей, используя облачный поток данных. Мой источник — облачное хранилище Google, а мой приемник — облачный SQL (MySQL). У меня есть следующий код для записи в приемник (Cloud SQL). p.apply() .......
70 просмотров
schedule 12.10.2023

Ожидаемое ETA для использования параметров ввода-вывода и времени выполнения конвейера в конвейере потока данных GCP apache beam с использованием python?
Просто хотел узнать, есть ли у нас больше параметров ввода-вывода конвейера и времени выполнения, доступных в новой версии (3.X) python. Если я прав, то в настоящее время луч apache обеспечивает только файловые операции ввода-вывода: textio, avroio,...
211 просмотров

Как обеспечить скорость вставки 1 вставку в секунду при использовании ClickhouseIO
Я использую Apache Beam Java SDK для обработки событий и записи их в базу данных Clickhouse. К счастью, есть готовый к использованию ClickhouseIO . ClickhouseIO накапливает элементы и вставляет их в пакетном режиме, но из-за параллельного...
420 просмотров

Как выбрать набор полей из входных данных в виде массива повторяющихся полей в луче SQL
Постановка задачи: У меня есть вход PCollection со следующими полями: { firstname_1, lastname_1, dob, firstname_2, lastname_2, firstname_3, lastname_3, } затем я выполняю операцию Beam SQL, так что вывод...
208 просмотров

Apache Beam Python SDK ReadFromKafka не получает данные
Я пробую простой пример чтения данных из темы Kafka в Apache Beam. Вот соответствующий фрагмент: with beam.Pipeline(options=pipeline_options) as pipeline: _ = ( pipeline | 'Read from Kafka' >> ReadFromKafka(...
653 просмотров

Как переопределить метаданные по умолчанию.lastModifiedMillis () FileIO луча Apache с фактическим временем последнего изменения файла?
Вариант использования: мне нужно фильтровать файлы на основе lastModifiedTime с использованием луча Apache (Java) Мой код: PCollection<String> readfile = pipeline .apply(FileIO.match().filepattern(path)...
32 просмотров