внешний вызов API в потоке данных Apache Beam

У меня есть случай использования, когда я читаю элементы новой строки json, хранящиеся в облачном хранилище Google, и начинаю обрабатывать каждый json. При обработке каждого json я должен вызывать внешний API для дедупликации, был ли этот элемент json обнаружен ранее. Я делаю ParDo с DoFn для каждого json.

Я не видел ни одного онлайн-руководства, в котором говорилось бы, как вызвать внешнюю конечную точку API из apache beam DoFn Dataflow.

Я использую JAVA SDK Beam. В некоторых инструкциях, которые я изучал, объясняется, что использование startBundle и FinishBundle, но я не понимаю, как его использовать


person bigbounty    schedule 17.11.2019    source источник
comment
Это потоковый конвейер или пакетный конвейер?   -  person Pablo    schedule 18.11.2019
comment
Это пакетный конвейер   -  person bigbounty    schedule 19.11.2019


Ответы (2)


Если вам нужно проверять дубликаты во внешнем хранилище для каждой записи JSON, вы все равно можете использовать для этого DoFn. Есть несколько аннотаций, таких как @Setup, @StartBundle, @FinishBundle и т. Д., Которые можно использовать для аннотирования методов в вашем DoFn.

Например, если вам нужно создать экземпляр клиентского объекта для отправки запросов во внешнюю базу данных, вы можете сделать это в @Setup методе (например, конструкторе POJO), а затем использовать этот клиентский объект в своем @ProcessElement методе.

Рассмотрим простой пример:

static class MyDoFn extends DoFn<Record, Record> {

    static transient MyClient client;

    @Setup
    public void setup() {
        client = new MyClient("host");
    }

    @ProcessElement
    public void processElement(ProcessContext c) {
        // process your records
        Record r = c.element();
        // check record ID for duplicates
        if (!client.isRecordExist(r.id()) {
            c.output(r);
        }
    }

    @Teardown
    public void teardown() {
        if (client != null) {
            client.close();
            client = null;
        }
    }
}

Кроме того, чтобы избежать удаленных вызовов для каждой записи, вы можете группировать записи в пакетном режиме во внутренний буфер (входные данные Beam разделяются на пакеты) и проверять дубликаты в пакетном режиме (если ваш клиент поддерживает это). Для этой цели вы можете использовать аннотированные методы @StartBundle и @FinishBundle, которые будут вызываться непосредственно до и после обработки пакета Beam соответственно.

Для более сложных примеров я бы рекомендовал взглянуть на реализации Sink в разных IO Beam, например KinesisIO ​​, например.

person Alexey Romanenko    schedule 21.11.2019

Пример пакетного вызова внешней системы с использованием DoFn с отслеживанием состояния приведен в следующем сообщении блога: https://beam.apache.org/blog/2017/08/28/timely-processing.html, может быть полезным.

person Paweł Kordek    schedule 20.11.2019