Создание / запись в разделенную таблицу BigQuery через поток данных Google Cloud

Я хотел воспользоваться преимуществами новой функции BigQuery для таблиц с временным разделением, но не уверен, что в настоящее время это возможно в версии 1.6 Dataflow SDK.

Посмотрите на BigQuery JSON API, чтобы создать дневную секционированную таблицу нужно пройти в

"timePartitioning": { "type": "DAY" }

вариант, но интерфейс com.google.cloud.dataflow.sdk.io.BigQueryIO позволяет указывать только TableReference.

Я подумал, что, может быть, я смогу заранее создать таблицу и проникнуть в декоратор раздела с помощью лямбда-выражения BigQueryIO.Write.toTableReference ..? Кто-нибудь еще добился успеха с созданием / записью секционированных таблиц через поток данных?

Это похоже на проблему, аналогичную установке срока действия таблицы, которая в настоящее время также недоступна. .


person ptf    schedule 30.06.2016    source источник


Ответы (6)


Как говорит Паван, запись в таблицы разделов с помощью Dataflow определенно возможна. Вы используете DataflowPipelineRunner в потоковом или пакетном режиме?

Предложенное вами решение должно работать. В частности, если вы предварительно создаете таблицу с настроенным разделением по дате, вы можете использовать BigQueryIO.Write.toTableReference лямбда для записи в раздел по дате. Например:

/**
 * A Joda-time formatter that prints a date in format like {@code "20160101"}.
 * Threadsafe.
 */
private static final DateTimeFormatter FORMATTER =
    DateTimeFormat.forPattern("yyyyMMdd").withZone(DateTimeZone.UTC);

// This code generates a valid BigQuery partition name:
Instant instant = Instant.now(); // any Joda instant in a reasonable time range
String baseTableName = "project:dataset.table"; // a valid BigQuery table name
String partitionName =
    String.format("%s$%s", baseTableName, FORMATTER.print(instant));
person Dan Halperin    schedule 01.07.2016
comment
Этот метод очень хорош, но он позволяет управлять меткой даты только с параметрами вне конвейера. Что, если бы мы хотели использовать метки времени из самих данных, чтобы разделить их по датам, а затем записать в соответствующие таблицы? - person nembleton; 02.07.2016
comment
@nembleton: если у элементов есть временные метки, вы можете использовать окно, чтобы сопоставить их с дневными окнами. Измените этот код: PCollection<Integer> windowedItems = items.apply( Window.<Integer>into(FixedWindows.of(Duration.standardMinutes(10))));. Затем TableSpecFun, который читает окна, будет отображать элементы в правильные дни. Код взят из Исправлено Windows javadoc - person Dan Halperin; 02.07.2016
comment
Спасибо @DanHalperin, это в значительной степени то, что я делаю, включая управление окнами, но использую .apply(Window.into(CalendarWindows.days(1))). Единственная проблема заключается в том, что данные могут быть в разных часовых поясах, и мы хотим, чтобы BigQuery возвращал данные в исходном часовом поясе, мы делаем некоторые забавы в более раннем PTransform с outputWithTimestamp вызовом - person ptf; 04.07.2016
comment
@ JulianV.Modesto прав, 1.6 SDK переключается на запись в BigQuery в потоковом режиме, если указана ссылка на таблицу .. что еще не позволяет использовать декораторы таблиц - person ptf; 06.07.2016
comment
Я считаю, что это правильно, если использовать API потоковой записи BigQuery. - person Dan Halperin; 06.07.2016

Подход, который я использовал (работает и в потоковом режиме):

  • Определите настраиваемое окно для входящей записи
  • Преобразуйте окно в название таблицы / раздела

    p.apply(PubsubIO.Read
                .subscription(subscription)
                .withCoder(TableRowJsonCoder.of())
            )
            .apply(Window.into(new TablePartitionWindowFn()) )
            .apply(BigQueryIO.Write
                           .to(new DayPartitionFunc(dataset, table))
                           .withSchema(schema)
                           .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
            );
    

Установив окно на основе входящих данных, End Instant можно игнорировать, так как начальное значение используется для настройки раздела:

public class TablePartitionWindowFn extends NonMergingWindowFn<Object, IntervalWindow> {

private IntervalWindow assignWindow(AssignContext context) {
    TableRow source = (TableRow) context.element();
    String dttm_str = (String) source.get("DTTM");

    DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy-MM-dd").withZoneUTC();

    Instant start_point = Instant.parse(dttm_str,formatter);
    Instant end_point = start_point.withDurationAdded(1000, 1);

    return new IntervalWindow(start_point, end_point);
};

@Override
public Coder<IntervalWindow> windowCoder() {
    return IntervalWindow.getCoder();
}

@Override
public Collection<IntervalWindow> assignWindows(AssignContext c) throws Exception {
    return Arrays.asList(assignWindow(c));
}

@Override
public boolean isCompatible(WindowFn<?, ?> other) {
    return false;
}

@Override
public IntervalWindow getSideInputWindow(BoundedWindow window) {
    if (window instanceof GlobalWindow) {
        throw new IllegalArgumentException(
                "Attempted to get side input window for GlobalWindow from non-global WindowFn");
    }
    return null;
}

Динамическая установка раздела таблицы:

public class DayPartitionFunc implements SerializableFunction<BoundedWindow, String> {

String destination = "";

public DayPartitionFunc(String dataset, String table) {
    this.destination = dataset + "." + table+ "$";
}

@Override
public String apply(BoundedWindow boundedWindow) {
    // The cast below is safe because CalendarWindows.days(1) produces IntervalWindows.
    String dayString = DateTimeFormat.forPattern("yyyyMMdd")
                                     .withZone(DateTimeZone.UTC)
                                     .print(((IntervalWindow) boundedWindow).start());
    return destination + dayString;
}}

Есть ли лучший способ достичь того же результата?

person Evgeny Minkevich    schedule 29.11.2016
comment
какую версию библиотеки лучей Apache вы использовали для настройки вышеуказанного потока данных? - person Darshan Mehta; 31.10.2017

Я считаю, что можно использовать декоратор разделов, когда вы не используете потоковую передачу. Мы активно работаем над поддержкой декораторов разделов посредством потоковой передачи. Пожалуйста, дайте нам знать, если вы видите какие-либо ошибки сегодня в режиме без потоковой передачи.

person Pavan Edara    schedule 30.06.2016
comment
Привет, @Pavan, мы используем BlockingDataflowPipelineRunner и работаем в пакетном режиме, но шаг BigQueryIO.Write не работает с 400 Bad Request и "Table decorators cannot be used with streaming insert.". Есть ли способ не использовать потоковую запись в BigQuery? Я думал, что это действительно будет массовая загрузка. И есть ли временная шкала для поддержки потокового режима? - person ptf; 06.07.2016
comment
Ах, похоже, функция ссылки на таблицу заставляет ее перейти в потоковый режим :( - person ptf; 06.07.2016
comment
Привет @Pavan, есть ли временная шкала, когда декораторы таблиц будут поддерживаться во время потоковой передачи? - person manishpal; 09.07.2016
comment
Надеюсь, к концу этого месяца - person Pavan Edara; 10.07.2016

Apache Beam версии 2.0 поддерживает сегментирование выходных таблиц BigQuery прямо из коробки.

person Tobi    schedule 15.06.2017

Если вы передадите имя таблицы в формате table_name_YYYYMMDD, BigQuery будет рассматривать ее как сегментированную таблицу, которая может имитировать функции таблицы разделов. См. Документацию: https://cloud.google.com/bigquery/docs/partitioned-tables

person Rajesh Hegde    schedule 31.03.2018
comment
Неправильный! BigQuery будет рассматривать ее как обычную таблицу! единственное, что может заставить вас думать, что BigQuery каким-то образом специально обрабатывает такую ​​таблицу, - это то, что пользовательский интерфейс BigQuery объединяет такие таблицы под одной записью table_name (NN), но вместо этого пользователь должен знать значение такого именования, а таблица НЕ разделяется на основе имени - person Mikhail Berlyant; 31.03.2018
comment
@MikhailBerlyant, Да, это не будет таблица разделов, но она создаст сегментированную таблицу, которая может имитировать особенности таблицы разделов. Это последнее средство, пока луч не предоставит возможность передавать столбец раздела в качестве параметра. - person Rajesh Hegde; 31.03.2018
comment
Итак, по крайней мере, то, как вы обновили ответ, теперь не так уж плохо: o) - person Mikhail Berlyant; 31.03.2018
comment
Вам также необходимо использовать нотацию $ - person Gerard; 17.09.2019

Я записал данные в секционированные таблицы bigquery через поток данных. Эти записи являются динамическими, если данные в этом разделе уже существуют, я могу либо добавить к нему, либо перезаписать его.

Я написал код на Python. Это операция записи в bigquery в пакетном режиме.

client = bigquery.Client(project=projectName)
dataset_ref = client.dataset(datasetName)
table_ref = dataset_ref.table(bqTableName)       
job_config = bigquery.LoadJobConfig()
job_config.skip_leading_rows = skipLeadingRows
job_config.source_format = bigquery.SourceFormat.CSV
if tableExists(client, table_ref):            
    job_config.autodetect = autoDetect
    previous_rows = client.get_table(table_ref).num_rows
    #assert previous_rows > 0
    if allowJaggedRows is True:
        job_config.allowJaggedRows = True
    if allowFieldAddition is True:
        job_config._properties['load']['schemaUpdateOptions'] = ['ALLOW_FIELD_ADDITION']
    if isPartitioned is True:
        job_config._properties['load']['timePartitioning'] = {"type": "DAY"}
    if schemaList is not None:
        job_config.schema = schemaList            
    job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
else:            
    job_config.autodetect = autoDetect
    job_config._properties['createDisposition'] = 'CREATE_IF_NEEDED'
    job_config.schema = schemaList
    if isPartitioned is True:             
        job_config._properties['load']['timePartitioning'] = {"type": "DAY"}
    if schemaList is not None:
        table = bigquery.Table(table_ref, schema=schemaList)            
load_job = client.load_table_from_uri(gcsFileName, table_ref, job_config=job_config)        
assert load_job.job_type == 'load'
load_job.result()       
assert load_job.state == 'DONE'

Работает нормально.

person anshuman sharma    schedule 29.06.2018