Создайте конвейер Apache Beam для чтения из Google Pub / Sub

Я пытаюсь создать потоковый конвейер с использованием apache-beam, который читает предложения из google pub / sub и записывает слова в таблицу Bigquery.

Я использую 0.6.0 версию apache-beam.

Следуя примерам, я сделал это:

public class StreamingWordExtract {

/**
 * A DoFn that tokenizes lines of text into individual words.
 */
static class ExtractWords extends DoFn<String, String> {
    @ProcessElement
    public void processElement(ProcessContext c) {
        String[] words = ((String) c.element()).split("[^a-zA-Z']+");
        for (String word : words) {
            if (!word.isEmpty()) {
                c.output(word);
            }
        }
    }
}

/**
 * A DoFn that uppercases a word.
 */
static class Uppercase extends DoFn<String, String> {
    @ProcessElement
    public void processElement(ProcessContext c) {
        c.output(c.element().toUpperCase());
    }
}


/**
 * A DoFn that uppercases a word.
 */
static class StringToRowConverter extends DoFn<String, TableRow> {
    @ProcessElement
    public void processElement(ProcessContext c) {
        c.output(new TableRow().set("string_field", c.element()));
    }

    static TableSchema getSchema() {
        return new TableSchema().setFields(new ArrayList<TableFieldSchema>() {
            // Compose the list of TableFieldSchema from tableSchema.
            {
                add(new TableFieldSchema().setName("string_field").setType("STRING"));
            }
        });
    }

}

private interface StreamingWordExtractOptions extends ExampleBigQueryTableOptions, ExamplePubsubTopicOptions {
    @Description("Input file to inject to Pub/Sub topic")
    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
    String getInputFile();

    void setInputFile(String value);
}

public static void main(String[] args) {
    StreamingWordExtractOptions options = PipelineOptionsFactory.fromArgs(args)
            .withValidation()
            .as(StreamingWordExtractOptions.class);

    options.setBigQuerySchema(StringToRowConverter.getSchema());

    Pipeline p = Pipeline.create(options);

    String tableSpec = new StringBuilder()
            .append(options.getProject()).append(":")
            .append(options.getBigQueryDataset()).append(".")
            .append(options.getBigQueryTable())
            .toString();

    p.apply(PubsubIO.read().topic(options.getPubsubTopic()))
            .apply(ParDo.of(new ExtractWords()))
            .apply(ParDo.of(new StringToRowConverter()))
            .apply(BigQueryIO.Write.to(tableSpec)
                    .withSchema(StringToRowConverter.getSchema())
                    .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                    .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

    PipelineResult result = p.run();


}

У меня ошибка рядом:

apply(ParDo.of(new ExtractWords()))

потому что предыдущий apply не возвращает String, а Object

Я полагаю, что проблема в типе, возвращенном PubsubIO.read().topic(options.getPubsubTopic()). Тип - PTransform<PBegin, PCollection<T>> вместо PTransform<PBegin, PCollection<String>>.

Как правильно читать из Google Pub / Sub с помощью apache-beam?


person theShadow89    schedule 20.03.2017    source источник


Ответы (1)


Вы столкнулись с недавним обратным несовместимым изменением в Beam - извините за это!

Начиная с Apache Beam версии 0.5.0, PubsubIO.Read и PubsubIO.Write необходимо создавать экземпляры с использованием PubsubIO.<T>read() и PubsubIO.<T>write() вместо статических фабричных методов, таких как PubsubIO.Read.topic(String).

Для Read требуется указание кодировщика через .withCoder(Coder) для типа вывода. Для Write требуется указание кодировщика для типа ввода или указание функции форматирования через .withAttributes(SimpleFunction<T, PubsubMessage>).

person Davor Bonaci    schedule 21.03.2017
comment
Мне сложно найти какую-либо документацию по написанию кодера, какие-либо руководства или документацию? - person Arqu; 20.04.2017
comment
Coder API: beam.apache.org/documentation/sdks/javadoc/0.6.0/org/apache/. Тем не менее, вам может не понадобиться писать кодировщик - для многих типов есть встроенный кодировщик. Наконец, вы также можете читать сообщения PubSub как байты и выполнять любое необходимое преобразование в последующем ParDo - это может быть (намного) проще. - person Davor Bonaci; 21.04.2017
comment
Спасибо! Да, действительно удалось все обойти, используя StringCoder и работая оттуда. - person Arqu; 21.04.2017