Как загрузить файл в каждом исполнителе один раз?

Я определяю следующий код для загрузки предварительно обученной модели встраивания:

import gensim

from gensim.models.fasttext import FastText as FT_gensim
import numpy as np

class Loader(object):
    cache = {}
    emb_dic = {}
    count = 0
    def __init__(self, filename):
        print("|-------------------------------------|")
        print ("Welcome to Loader class in python")
        print("|-------------------------------------|")
        self.fn = filename

    @property
    def fasttext(self):
        if Loader.count == 1:
                print("already loaded")
        if self.fn not in Loader.cache:
            Loader.cache[self.fn] =  FT_gensim.load_fasttext_format(self.fn)
            Loader.count = Loader.count + 1
        return Loader.cache[self.fn]


    def map(self, word):
        if word not in self.fasttext:
            Loader.emb_dic[word] = np.random.uniform(low = 0.0, high = 1.0, size = 300)
            return Loader.emb_dic[word]
        return self.fasttext[word]

я называю этот класс как:

inputRaw = sc.textFile(inputFile, 3).map(lambda line: (line.split("\t")[0], line.split("\t")[1])).map(Loader(modelpath).map)
  1. Меня смущает, сколько раз будет загружен файл пути к модели? Я хочу, чтобы каждый исполнитель загружался один раз и использовался всеми его ядрами. Мой ответ на этот вопрос: путь к модели будет загружен 3 раза (= количество разделов). Если мой ответ правильный, недостаток такого моделирования связан с размером файла modelpath. Предположим, этот файл имеет размер 10 ГБ и предположим, что у меня 200 разделов. Таким образом, в этом случае нам понадобится 10 * 200 ГБ = 2000, что очень много (это решение может работать только с небольшим количеством разделов).

Предположим, у меня есть rdd =(id, sentence) =[(id1, u'patina californian'), (id2, u'virgil american'), (id3', u'frensh'), (id4, u'american')]

и я хочу суммировать векторы встраивания слов для каждого предложения:

def test(document):
    print("document is = {}".format(document))
    documentWords = document.split(" ")
    features = np.zeros(300)
    for word in documentWords:
        features = np.add(features, Loader(modelpath).fasttext[word])
    return features

def calltest(inputRawSource):

    my_rdd = inputRawSource.map(lambda line: (line[0], test(line[1]))).cache()
    return my_rdd

В этом случае сколько раз будет загружен файл modelpath? Обратите внимание, что я установил spark.executor.instances" to 3


person bib    schedule 05.02.2019    source источник
comment
Поиск широковещательных переменных, внешнее кэширование с помощью Spark и т. д.   -  person skjagini    schedule 06.02.2019
comment
трансляция очень большого файла 10гб не работает   -  person bib    schedule 06.02.2019
comment
Если файл небольшой, вы можете выполнить широковещательную рассылку, если файл большой, вы разделите его и выполните соединение. В памяти все это не удержишь, на мой взгляд, волшебной палочки нет.   -  person skjagini    schedule 06.02.2019
comment
Можете ли вы попробовать пометить поле как «Transient Lazy», которое не будет сериализовано, и каждый исполнитель должен создавать его только один раз при первой попытке доступа. nathankleyn.com /2017/12/29/   -  person skjagini    schedule 06.02.2019
comment
@skjagini, как сделать Tranient ленивым в pyspark?   -  person bib    schedule 08.02.2019
comment
@skjagini Я думаю, что то, как я определил класс Loader, совпадает с Transient Lazy, не так ли?   -  person bib    schedule 08.02.2019
comment
@jaceklaskowski, не могли бы вы дать мне совет, как решить эту проблему?   -  person bib    schedule 13.02.2019
comment
@howie, не могли бы вы помочь в решении этой проблемы   -  person bib    schedule 26.02.2019
comment
Я не думаю, что это будет работать и не эффективно. когда вы вызываете Loader(modelpath).map, что означает отправку всех данных из spark в python. Я попытаюсь использовать модель искрового фрейма данных + UDF. Вы можете записать свой загрузчик в формате UTF. И позвольте искре управлять вашим распределением данных.   -  person howie    schedule 27.02.2019
comment
@howie, не могли бы вы быть более конкретными? если хотите, мы можем перенести обсуждение в чат   -  person bib    schedule 27.02.2019
comment
@bib вы можете увидеть это первым changhsinlee.com/pyspark-udf   -  person howie    schedule 27.02.2019
comment
@howie Я думаю, вы имеете в виду то же самое, что и forums.databricks.com/questions/11374/, но похоже, что она не работает. Я застрял с одного месяца на этом вопросе   -  person bib    schedule 27.02.2019
comment
ok ~ Теперь я понимаю, что ваша проблема в том, что вы используете модель Python, а модель слишком велика. Я блуждаю, можете ли вы использовать файл модели в искровом фрейме данных, чтобы без загрузки загрузить в gensim.models.KeyedVectors.load_word2vec_format   -  person howie    schedule 27.02.2019
comment
Это похоже на ваш вопрос? towardsdatascience.com/   -  person howie    schedule 27.02.2019
comment
@howie я точно не знаю. Я хочу вычислить вектор документа bu, суммируя вектор каждого слова в этом документе (см., пожалуйста, тест функции). Вектор слова взят из модели встраивания (wiki.en.bin). Моя проблема заключается в том, как загрузить wiki.en.bin в каждом исполнителе, я пробовал все, что я отправил, и я получаю исключения.   -  person bib    schedule 27.02.2019
comment
Я знаю вашу проблему, и я не знаю, как решить ее по-вашему, например, загрузить полные данные модели в память (Loader.cache[self.fn] = FT_gensim.load_fasttext_format(self.fn)). Я предлагаю вам загрузить данные модели в фрейм данных и попытаться сделать все, что вы хотите, таким образом.   -  person howie    schedule 27.02.2019


Ответы (1)


По умолчанию количество разделов равно общему количеству ядер на всех узлах-исполнителях в кластере Spark. Предположим, вы обрабатываете 10 ГБ в кластере Spark (или суперкомпьютерном исполнителе), который содержит в общей сложности 200 ядер ЦП, это означает, что Spark по умолчанию может использовать 200 разделов для обработки ваших данных.

Кроме того, чтобы все ядра вашего процессора работали на каждого исполнителя, это можно решить в python (используя 100% всех ядер с модулем многопроцессорности).

person Ahmad    schedule 09.02.2019