структурированная потоковая запись в несколько потоков

мой сценарий

  1. Gets data from a stream and call a UDF which return a json string. one of the attribute in JSON string is UniqueId, which UDF is generating as guid.newGuid() (C#).
    1. DataFrame output of UDF is written to multiple streams/sinks based on some fiter.

проблема:

  1. каждый приемник получает новое значение для уникального идентификатора, созданного UDF. Как я могу поддерживать один и тот же уникальный идентификатор для всех приемников.
  2. Если каждый приемник получает разные значения для UniqueId, означает ли это, что мой UDF вызывается несколько раз для каждого приемника?
  3. Если UDF вызывается дважды, как можно вызвать его один раз, а затем просто записать одни и те же данные в разные приемники?
inData = spark.readstream().format("eventhub")

udfdata = indata.select(from_json(myudf("column"), schema)).as("result").select(result.*)

filter1 =  udfdata.filter("column =='filter1'")
filter 2 = udfdata.filter("column =='filter2'") 

# write filter1 to two differnt sinks
filter1.writestream().format(delta).start(table1)
filter1.writestream().format(eventhub).start()

# write filter2 to two differnt sinks
filter2.writestream().format(delta).start(table2)
filter2.writestream().format(eventhub).start()

person user9297554    schedule 05.08.2019    source источник


Ответы (1)


Каждый раз, когда вы вызываете .writestream()....start(), вы создаете новый независимый потоковый запрос.

Это означает, что для каждого выходного приемника, который вы определяете, Spark будет снова считывать данные из источника ввода и обрабатывать кадр данных.

Если вы хотите прочитать и обработать только один раз, а затем вывести на несколько приемников, вы можете использовать приемник foreachBatch в качестве обходного пути:

inData = spark.readstream().format("eventhub")
udfdata = indata.select(from_json(myudf("column"), schema)).as("result").select(result.*)

udfdata.writeStream().foreachBatch(filter_and_output).start()
def filter_and_output(udfdata, batchId):
    # At this point udfdata is a batch dataframe, no more a streaming dataframe
    udfdata.cache()
    filter1 = udfdata.filter("column =='filter1'")
    filter2 = udfdata.filter("column =='filter2'") 

    # write filter1
    filter1.write().format(delta).save(table1)
    filter1.write().format(eventhub).save()

    # write filter2
    filter2.write().format(delta).save(table2)
    filter2.write().format(eventhub).save()

    udfdata.unpersist()

Вы можете узнать больше о foreachBatch в Документация по структурированной потоковой передаче Spark.

Чтобы ответить на ваши вопросы

  1. Если вы используете foreachBatch, ваши данные будут обработаны только один раз, и у вас будет один и тот же уникальный идентификатор для всех приемников.
  2. Да
  3. Использование foreachBatch решит проблему
person vinsce    schedule 06.08.2019
comment
Есть ли другой вариант вместо использования foreachbatch/foreach? Я использую .net для apache spark C# и похоже, что в настоящее время API foreachbatch еще не поддерживается. - person user9297554; 07.08.2019
comment
будет ли пересчитываться весь запрос на запись в искровом сеансе или будет вычисляться только запрос на запись из того же потока чтения? в моем случае, могу ли я записать результат моего UDF в поток (постановка дельта-озера), а затем прочитать из постановки дельта-озера, а затем отфильтровать, а затем записать в 2 отдельных приемника? - person user9297554; 07.08.2019
comment
Да, решение может состоять в том, чтобы временно записать результат UDF в дельта-озеро и прочитать его вторым запросом. Помните, что и в этом случае, если у вас есть несколько приемников (writeStream()), Spark будет считывать несколько раз из озера дельты. - person vinsce; 08.08.2019
comment
спасибо, моя главная забота - не выполнять UDF дважды. чтение несколько раз на поэтапном результате UDF в моем сценарии нормально - person user9297554; 08.08.2019