Как лучше всего дождаться Google Dataproc SparkJob в Java?

В настоящее время я запускаю Spark Job через службу Spring REST с использованием клиентского API Dataproc Java. Основы искровой работы:

  1. Инициализировать искру
  2. Данные обработки
  3. Сохранение результатов в файле .json корзины GS.

Причина, по которой я сохраняю данные, заключается в том, что когда мое задание Spark выполнено и результаты сохранены в файле JSON, я могу прочитать сохраненные результаты из службы REST. Однако Java Client API Dataproc просто запускает задание и не ждет его завершения. Таким образом, как лучше всего дождаться завершения работы искры? Я не хочу использовать Object.wait(int time), потому что разные задания искры будут иметь разное время выполнения.


person pashupati    schedule 29.02.2016    source источник


Ответы (1)


Через Dataproc REST API вызов GET для задания вернет информацию о статусе задания. В общем, вы можете просто иметь цикл опроса:

public static final ImmutableSet<String> TERMINAL_JOB_STATES =
    ImmutableSet.of("CANCELLED", "DONE", "ERROR");

// Initialize this as normal with credentials, setAppName, HttpTransport, etc.
private Dataproc dataproc;

public void waitJob(String projectId, String jobId) throws IOException, InterruptedException {
  Job job = dataproc.projects().regions().jobs().get(projectId, "global", jobId).execute();
  while (!TERMINAL_JOB_STATES.contains(job.getStatus().getState())) {
    System.out.println("Job not done yet; current state: " + job.getStatus().getState());
    Thread.sleep(5000);
    job = dataproc.projects().regions().jobs().get(projectId, "global", jobId).execute();
  }
  System.out.println("Job terminated in state: " + job.getStatus().getState());
}

Вы также можете обернуть вызовы .execute() внутри операторов try/catch, перехватывающих IOException, на случай, если ошибка является какой-то временной ошибкой сетевого подключения (любые ошибки 500 HTTP code следует просто повторить). Вам также может понадобиться максимальное время ожидания на случай, если что-то заблокирует выполнение задания или вы непреднамеренно повторите попытку из-за ошибки 404 not found.

Вы также должны уметь обнаруживать 404 not found ошибки из любого выброшенного IOException; это может произойти, если вы случайно вошли и удалили задание до завершения опроса, или если ошибка заставила вас ввести вызов waitJob, несмотря на неудачный вызов SubmitJob. Вы должны иметь возможность поэкспериментировать с попыткой GET несуществующего задания и посмотреть, как выглядит ошибка, чтобы в таком случае избежать бесконечного цикла.

person Dennis Huo    schedule 29.02.2016