Приложение зависает, когда я подключаюсь к PipelinesRDD и RDD из DStream

Я использую Spark 1.6.0 со Spark Streaming и имею одну проблему с широкими операциями.

Пример кода: существует RDD под названием «a», который имеет тип: class 'pyspark.rdd.PipelinedRDD'.

"а" было получено как:

# Load a text file and convert each line to a Row.
    lines = sc.textFile(filename)
    parts = lines.map(lambda l: l.split(","))
    clients = parts.map(lambda p: Row(client_id=int(p[0]), clientname=p[1] ...))

    # Infer the schema, and register the DataFrame as a table.
    schemaPeople = sqlContext.createDataFrame(clients)
    schemaPeople.registerTempTable("clients")

    client_list = sqlContext.sql("SELECT * FROM clients")

и после:

a = client_list.map(lambda entry: (int(entry[1]), value_from_entry(entry)))

Есть вторая часть «b» с классом типа pyspark.streaming.dstream.TransformedDStream. Я получаю "b" от Flume:

DStreamB = flumeStream.map(lambda tup: function_for_map(tup[1].encode('ascii','ignore')))

и

b = DStreamB.map(lambda event: (int(event[2]), value_from_event(event)))

Проблема: когда я пытаюсь присоединиться как:

mult = b.transform(lambda rdd: rdd.join(a))

мое приложение зависает на этом этапе (теперь я показываю экран после b.pprint () и до этапа .join ())

введите описание изображения здесь

Но когда я добавляю:

  1. Объявить RDD "test":

    test = sc.parallelize(range(1, 100000)).map(lambda k: (k, 'value'))
    

    и делать:

    mult0 = a.join(test)
    mult = b.transform(lambda rdd: rdd.join(mult0))`
    

    Тогда работает (!!):

    экран 2

  2. Также я могу:

    mult0 = b.transform(lambda rdd: rdd.join(test))
    

Таким образом:

Имею СДР «а» и «тест». DStream "b". И я могу размножить:

  • а * тест * б
  • б * тест

Но я не умею делать «б * а».

Любая помощь приветствуется! Спасибо!


person Anna Ivanova    schedule 17.02.2017    source источник


Ответы (1)


По совету пользователя 6910411 сделал кеширование "а" как

a = client_list.map(lambda entry: (int(entry[1]), value_from_entry(entry))).cache() 

и проблема была решена.

person Anna Ivanova    schedule 21.02.2017