Вопросы по теме 'apache-beam-io'
Создание / запись в разделенную таблицу BigQuery через поток данных Google Cloud
Я хотел воспользоваться преимуществами новой функции BigQuery для таблиц с временным разделением, но не уверен, что в настоящее время это возможно в версии 1.6 Dataflow SDK.
Посмотрите на BigQuery JSON API , чтобы создать дневную секционированную...
5427 просмотров
schedule
29.04.2022
Как повысить производительность TextIO или AvroIO при чтении очень большого количества файлов?
TextIO.read() и AvroIO.read() (а также некоторые другие операции ввода-вывода Beam) по умолчанию не очень хорошо работают в текущих средствах выполнения Apache Beam при чтении шаблона файла, который расширяется до очень большого количества файлов...
1239 просмотров
schedule
09.07.2023
Управление смещением Apache Beam KafkaIO во внешние хранилища данных
Я пытаюсь читать от нескольких брокеров kafka, используя KafkaIO на луче apache. Параметр по умолчанию для управления смещением - это сам раздел kafka (больше не используется zookeper из kafka> 0.9). При такой настройке, когда я перезапускаю задание...
616 просмотров
schedule
02.04.2022
Использование defaultNaming для динамической оконной записи в Apache Beam
Я следую вместе с ответом на этот сообщение и документация для выполнения динамической оконной записи моих данных в конце конвейера. Вот что у меня есть до сих пор:
static void applyWindowedWrite(PCollection<String> stream) {...
1275 просмотров
schedule
22.03.2023
Потоковая передача MutationGroups в Spanner
Я пытаюсь передать MutationGroups в гаечный ключ с помощью SpannerIO. Цель состоит в том, чтобы писать новые MuationGroups каждые 10 секунд, так как мы будем использовать гаечный ключ для запроса ближайших KPI.
Когда я не использую никаких окон, я...
655 просмотров
schedule
02.02.2022
Разница между beam.ParDo и beam.Map в типе вывода?
Я использую Apache-Beam для выполнения некоторых преобразований данных, включая извлечение данных из txt, csv и различных источников данных. Я заметил одну вещь: разницу в результатах при использовании beam.Map и beam.ParDo
В следующем...
5286 просмотров
schedule
10.03.2022
Kafka: конфигурация семантики ровно один раз с использованием Apache Beam
Я пытаюсь настроить ровно один раз семантику в Kafka (Apache Beam). Вот изменения, которые я собираюсь представить:
Производитель:
enable.idenpotence = верно
transactional.id = uniqueTransactionalId
Потребитель:...
411 просмотров
schedule
28.06.2022
Beam применяет PTransform к значениям с сохранением ключа
Кажется, я борюсь с этим шаблоном в Beam. Это потоковый конвейер.
На высоком уровне:
сообщение приходит кролику
содержимое сообщения включает ID и пути к файлу N S3
Я хочу произвести некоторую агрегацию по всем перечисленным файлам S3, но...
161 просмотров
schedule
21.04.2022
Луч Apache с Redis - выбрать базу данных и прочитать из хэша?
Я начинаю с Apache Beam и хочу прочитать хэш, который я сохранил в Redis, и мне также нужно будет выбрать базу данных (номер). Я просмотрел исходный код RedisIO, но не похоже, что он включает в себя возможность делать любую из этих вещей. Я что-то...
152 просмотров
schedule
04.05.2024
внешний вызов API в потоке данных Apache Beam
У меня есть случай использования, когда я читаю элементы новой строки json, хранящиеся в облачном хранилище Google, и начинаю обрабатывать каждый json. При обработке каждого json я должен вызывать внешний API для дедупликации, был ли этот элемент...
3587 просмотров
schedule
31.03.2023
Чтение файла Parquet с помощью Apache Beam Java SDK без предоставления схемы
Кажется, что метод org.apache.beam.sdk.io.parquet.ParquetIO.readFiles требует передачи схемы.
Есть ли способ избежать необходимости передавать схему?
Разве схема не включена в файл Parquet?
Что делать, если я пытаюсь прочитать несколько...
390 просмотров
schedule
06.04.2023
облачный поток данных облачный sql-поток данных, выдающий исключение нулевого указателя
Я пытаюсь обработать значительное количество записей, используя облачный поток данных. Мой источник — облачное хранилище Google, а мой приемник — облачный SQL (MySQL). У меня есть следующий код для записи в приемник (Cloud SQL).
p.apply()
.......
70 просмотров
schedule
12.10.2023
Ожидаемое ETA для использования параметров ввода-вывода и времени выполнения конвейера в конвейере потока данных GCP apache beam с использованием python?
Просто хотел узнать, есть ли у нас больше параметров ввода-вывода конвейера и времени выполнения, доступных в новой версии (3.X) python. Если я прав, то в настоящее время луч apache обеспечивает только файловые операции ввода-вывода: textio, avroio,...
211 просмотров
schedule
12.09.2022
Как обеспечить скорость вставки 1 вставку в секунду при использовании ClickhouseIO
Я использую Apache Beam Java SDK для обработки событий и записи их в базу данных Clickhouse. К счастью, есть готовый к использованию ClickhouseIO .
ClickhouseIO накапливает элементы и вставляет их в пакетном режиме, но из-за параллельного...
420 просмотров
schedule
09.07.2022
Как выбрать набор полей из входных данных в виде массива повторяющихся полей в луче SQL
Постановка задачи:
У меня есть вход PCollection со следующими полями:
{
firstname_1,
lastname_1,
dob,
firstname_2,
lastname_2,
firstname_3,
lastname_3,
}
затем я выполняю операцию Beam SQL, так что вывод...
208 просмотров
schedule
10.09.2023
Apache Beam Python SDK ReadFromKafka не получает данные
Я пробую простой пример чтения данных из темы Kafka в Apache Beam. Вот соответствующий фрагмент:
with beam.Pipeline(options=pipeline_options) as pipeline:
_ = (
pipeline
| 'Read from Kafka' >> ReadFromKafka(...
653 просмотров
schedule
01.06.2022
Как переопределить метаданные по умолчанию.lastModifiedMillis () FileIO луча Apache с фактическим временем последнего изменения файла?
Вариант использования: мне нужно фильтровать файлы на основе lastModifiedTime с использованием луча Apache (Java)
Мой код:
PCollection<String> readfile = pipeline
.apply(FileIO.match().filepattern(path)...
32 просмотров
schedule
20.03.2023