Имя динамической таблицы при записи в BQ из конвейеров потока данных

В качестве дополнительного вопроса к следующему вопросу и ответу:

https://stackoverflow.com/questions/31156774/about-key-grouping-with-groupbykey < / а>

Я хотел бы подтвердить с командой разработчиков потока данных Google (@jkff), возможен ли третий вариант, предложенный Евгением. с потоком данных Google:

«иметь ParDo, который принимает эти ключи и создает таблицы BigQuery, и другой ParDo, который принимает данные и записывает потоки в таблицы»

Насколько я понимаю, ParDo / DoFn будет обрабатывать каждый элемент, как мы можем указать имя таблицы (функцию ключей, переданных из боковых входов) при записи из processElement ParDo / DoFn?

Спасибо.

Обновлено с помощью DoFn, который явно не работает, поскольку c.element (). value не является pcollection.

PCollection<KV<String, Iterable<String>>> output = ...;

public class DynamicOutput2Fn extends DoFn<KV<String, Iterable<String>>, Integer> {

private final PCollectionView<List<String>> keysAsSideinputs;
public DynamicOutput2Fn(PCollectionView<List<String>> keysAsSideinputs) {
        this.keysAsSideinputs = keysAsSideinputs;
    }

@Override
    public void processElement(ProcessContext c) {
        List<String> keys = c.sideInput(keysAsSideinputs);
        String key = c.element().getKey();

        //the below is not working!!! How could we write the value out to a sink, be it gcs file or bq table???
        c.element().getValue().apply(Pardo.of(new FormatLineFn()))
                .apply(TextIO.Write.to(key));

        c.output(1);
    }    
}    

person Alan    schedule 14.03.2016    source источник
comment
Теперь это доступно в готовой последней версии Beam stackoverflow.com/questions/43505534/   -  person jkff    schedule 19.04.2017


Ответы (1)


Преобразование BigQueryIO.Write не поддерживает это. Самое близкое, что вы можете сделать, - это использовать таблицы для каждого окна и кодировать любую информацию, необходимую для выбора таблицы в объектах окна, с помощью настраиваемого WindowFn.

Если вы не хотите этого делать, вы можете выполнять вызовы API BigQuery прямо из DoFn. При этом вы можете установить любое имя таблицы, которое хотите, в соответствии с вычислением вашего кода. Это может быть получено из бокового ввода или вычислено непосредственно из элемента, который в настоящее время обрабатывается DoFn. Чтобы избежать слишком большого количества мелких вызовов BigQuery, вы можете группировать запросы с помощью finishBundle ();

Вы можете увидеть, как средство выполнения потока данных выполняет потоковый импорт здесь: https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserter.java

person danielm    schedule 14.03.2016
comment
Спасибо за ответ. Однако я все еще не мог понять синтаксис DoFn для вызова BigQueryIO.Write в processElement. Мне нужна помощь по двум вопросам: 1. Не могли бы вы показать мне быстрый пример DoFn для упомянутого использования? 2. Вызов BigQueryIO.Write в processElement вызовет дополнительную проблему с производительностью, поскольку он будет вызываться при обработке каждого элемента? Спасибо. - person Alan; 15.03.2016
comment
Также добавлен DoFn в OP, пожалуйста, поделитесь, как записывать значения для ключей в PCollection ‹KV‹ T, U ››. Спасибо. - person Alan; 15.03.2016
comment
Спасибо, Даниэль, за обновление ответа. Очень надеюсь, что в Text и BQ IO Writer можно будет добавить новую функцию, позволяющую динамически именовать файлы или таблицы. Не говорю, что это будет легкая задача, но просто действительно полезные функции. Я принял ответ. - person Alan; 18.03.2016