Синхронизация потоков для DoFn в Apache Beam

Я пишу DoFn, в котором его переменная экземпляра elements (то есть общий ресурс) может быть изменена в методе @ProcessElement:

import java.util.ArrayList;
import java.util.List;

import org.apache.beam.sdk.transforms.DoFn;

public class DemoDoFn extends DoFn<String, Void> {
  private final int batchSize;

  private transient List<String> elements;

  public DemoDoFn(int batchSize) {
    this.batchSize = batchSize;
  }

  @StartBundle
  public void startBundle() {
    elements = new ArrayList<>();
  }

  @ProcessElement
  public void processElement(@Element String element, ProcessContext context) {
    elements.add(element); // <-------- mutated

    if (elements.size() >= batchSize) {
      flushBatch();
    }
  }

  @FinishBundle
  public void finishBundle() {
    flushBatch();
  }

  private void flushBatch() {
    // Flush all elements, e.g., send all elements in a single API call to a server

    // Initialize a new array list for next batch
    elements = new ArrayList<>(); // <-------- mutated
  }
}

Вопрос 1. Нужно ли мне добавлять ключевое слово synchronized в метод @ProcessElement, чтобы избежать состояния гонки?

Согласно совместимости потоков: "Каждый экземпляр объекта вашей функции (DoFn) одновременно осуществляется одним потоком на рабочем экземпляре, если вы явно не создаете свои собственные потоки. Однако обратите внимание, что пакеты SDK Beam не являются потокобезопасными. Если вы создаете свои собственные потоки в ваш код пользователя, вы должны обеспечить собственную синхронизацию. "

Вопрос 2: Означает ли «Каждый экземпляр вашего функционального объекта одновременно обращается к одному потоку на рабочем экземпляре», что Beam будет синхронизировать @ProcessElement или весь DoFn за кулисами?

Это IBM указывает на это, и я цитирую

  1. "В-третьих, руководство по программированию Beam гарантирует, что каждый экземпляр определяемой пользователем функции будет выполняться только одним потоком за раз. Это означает, что исполнитель должен синхронизировать весь вызов функции, что может привести к к значительным узким местам в производительности ".
  2. "Beam обещает приложениям, что только один поток будет выполнять свои пользовательские функции одновременно. Поэтому, если механизм подчеркивания порождает несколько потоков, исполнитель должен синхронизировать весь вызов DoFn или GroupByKey. . "
  3. «Поскольку Beam запрещает нескольким потокам входить в один и тот же экземпляр PTransform, движки теряют возможность использовать параллелизм операторов».

В документе, кажется, указывается, что весь вызов DoFn синхронизирован.


person Kuan Lin    schedule 21.07.2019    source источник


Ответы (1)


Я знаю, что это старый вопрос, но поскольку я исследовал то же самое - нет, вам не нужна синхронизация для вашего processElement, потому что, как вы процитировали: каждый экземпляр вашего объекта функции (DoFn) доступен для одного потока за раз на рабочий экземпляр

Вот пример официального класса луча, который изменяет переменную экземпляра https://github.com/apache/beam/blob/0c01636fc8610414859d946cb93eabc904123fc8/sdks/java/io/elasticsearch/src/main/java/org/apache/луч/sdk/io/elasticsearch/ElasticsearchIO.java#L1369

person Kazuki    schedule 28.11.2020