Изящно остановить запрос структурированной потоковой передачи

Я использую Spark 2.1 и пытаюсь аккуратно остановить потоковый запрос.

StreamingQuery.stop() изящная остановка, потому что я не видел подробной информации об этом методе в документация:

void stop() Останавливает выполнение этого запроса, если он выполняется. Этот метод блокируется до тех пор, пока выполняющиеся потоки не остановятся. Начиная с: 2.0.0

В то время как в прошлом мире потоковой передачи (DStreams) существует опция остановки выполнения потоков с возможностью обеспечения обработки всех полученных данных:

def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit Остановить выполнение потоков с возможностью убедиться, что все полученные данные были обработаны.

stopSparkContext, если true, останавливает связанный SparkContext. Базовый SparkContext будет остановлен независимо от того, был ли запущен этот StreamingContext.

stopGracefully если истина, плавно останавливается, ожидая завершения обработки всех полученных данных.

Итак, вопрос в том, как изящно остановить структурированный потоковый запрос?


person shiv455    schedule 16.08.2017    source источник


Ответы (5)


Если под «изящно» вы имеете в виду, что потоковый запрос должен завершить обработку данных, void stop() этого не сделает. Он просто будет ждать, пока потоки, выполняющие выполнение, не остановятся (как указано в документации). Это не означает, что обработка завершится.

Для этого нам нужно заставить запрос ждать, пока не будет выполнен текущий триггер запроса. Что мы можем проверить через StreamingQueryStatus, вот так:

while (query.status.isTriggerActive) {// ничего не делаем}

Он будет ждать, пока запрос не завершит обработку. И тогда мы можем вызвать query.stop().

Я надеюсь, что это помогает!

person himanshuIIITian    schedule 21.01.2018

такой код может помочь остановить поток микропакетов, если для потребления больше нет записей

def stopStreamQuery(query: StreamingQuery, awaitTerminationTimeMs: Long) {
   while (query.isActive) {
      val msg = query.status.message
      if (!query.status.isDataAvailable
          && !query.status.isTriggerActive
             && !msg.equals("Initializing sources")) {
      query.stop()
    }
    query.awaitTermination(awaitTerminationTimeMs)
  }
}
person ASe    schedule 24.10.2019

Смотря что значит "изящно" :)

StreamingQuery останавливает только определенный запрос. Он ждет, пока поток MicroBatch не остановится и не будет готов к отключению источников. Это «ожидание» означает, что данные будут обработаны, а затем поток остановится.

person T. Gawęda    schedule 16.08.2017
comment
не могли бы вы сослаться на источник, в котором это упоминается? Я запутался, потому что есть метод с именем processAllavaialble (), который делает то же самое, но в документации говорится, что он предназначен только для тестирования. spark.apache.org/docs/2.1.0/api/java/org/apache/spark/sql/ - person shiv455; 16.08.2017

Для пользователей PySpark это Python-порт ответа @ASe.

# Helper method to stop a streaming query
def stop_stream_query(query, wait_time):
    """Stop a running streaming query"""
    while query.isActive:
        msg = query.status['message']
        data_avail = query.status['isDataAvailable']
        trigger_active = query.status['isTriggerActive']
        if not data_avail and not trigger_active and msg != "Initializing sources":
            print('Stopping query...')
            query.stop()
        time.sleep(0.5)

    # Okay wait for the stop to happen
    print('Awaiting termination...')
    query.awaitTermination(wait_time)
person Briford Wylie    schedule 13.01.2021

StreamingQuery.stop не останавливает запрос корректно, он вызывает sparkContext.cancelJobGroup(all jobs generated by streaming query).

Чтобы избежать этого и дождаться завершения текущего пакета, я использую https://gist.github.com/GrigorievNick/bf920e32f70cb1cf8308cd601e415d12 обратите внимание, он работает только с MicroBatchExectuion

person Grigoriev Nick    schedule 23.07.2021