Распараллеливание fastText.get_sentence_vector с dask дает ошибку травления

Я пытался получить вложения предложений fastText для 80 миллионов английских твитов, используя механизм распараллеливания с использованием dask, как описано в этом ответе: Как вы распараллеливаете apply() на кадрах данных Pandas, используя все ядра на одной машине?

Вот мой полный код:

import dask.dataframe as dd
from dask.multiprocessing import get
import fasttext
import fasttext.util
import pandas as pd

print('starting langage: ' + 'en')
lang_output = pd.DataFrame()
lang_input = full_input.loc[full_input.name == 'en'] # 80 Million English tweets
ddata = dd.from_pandas(lang_input, npartitions = 96)
print('number of lines to compute: ' + str(len(lang_input)))
fasttext.util.download_model('en', if_exists='ignore')  # English
ft = fasttext.load_model('cc.'+'en'+'.300.bin')
fasttext.util.reduce_model(ft, 20)
lang_output['sentence_embedding'] = ddata.map_partitions(lambda lang_input: lang_input.apply((lambda x: get_fasttext_sentence_embedding(x.tweet_text, ft)), axis = 1)).compute(scheduler='processes')
print('finished en')

Это функция get_fasttext_sentence_embedding:

def get_fasttext_sentence_embedding(row, ft):
    if pd.isna(row):
        return np.zeros(20)
    return ft.get_sentence_vector(row)

Но я получаю ошибку травления в этой строке:

lang_output['sentence_embedding'] = ddata.map_partitions(lambda lang_input: lang_input.apply((lambda x: get_fasttext_sentence_embedding(x.tweet_text, ft)), axis = 1)).compute(scheduler='processes')

Это ошибка, которую я получаю:

TypeError: can't pickle fasttext_pybind.fasttext objects

Есть ли способ распараллелить модель get_sentence_vector fastText с помощью dask (или чего-либо еще)? Мне нужно распараллелить, потому что встраивание предложений для 80 миллионов твитов занимает два много времени, а одна строка моего фрейма данных полностью независима от другой.


person Sumit Sidana    schedule 25.04.2020    source источник


Ответы (1)


Проблема здесь в том, что объекты fasttext, по-видимому, не могут быть обработаны, а Dask не знает, как сериализовать и десериализовать эту структуру данных без обработки.

Самый простой способ использовать Dask здесь (но, вероятно, не самый эффективный) — это определить в каждом процессе саму модель ft, что позволит избежать необходимости ее передачи (и, таким образом, избежать попытки травления). Что-то вроде следующего будет работать. Обратите внимание, что ft определяется внутри функции, отображаемой на разделы.

Сначала несколько примеров данных.

import dask.dataframe as dd
import fasttext
import pandas as pd
import dask
import numpy as np

df = pd.DataFrame({"text":['this is a test sentence', None, 'this is another one.', 'one more']})
ddf = dd.from_pandas(df, npartitions=2)
ddf

Dask DataFrame Structure:
text
npartitions=2   
0   object
2   ...
3   ...
Dask Name: from_pandas, 2 tasks

Затем мы можем настроить ваши функции, чтобы определить ft в каждом процессе. Это дублирует усилия, но позволяет избежать необходимости переноса модели. При этом мы можем без проблем запустить его через map_partitions.

def get_embeddings(sent, model):
    return model.get_sentence_vector(sent) if not pd.isna(sent) else np.zeros(10)

def func(df):
    ft = fasttext.load_model("amazon_review_polarity.bin") # arbitrary model
    res = df['text'].apply(lambda x: get_embeddings(x, model=ft))
    return res

ddf['sentence_vector'] = ddf.map_partitions(func)
ddf.compute(scheduler='processes')

text    sentence_vector
0   this is a test sentence [-0.01934033, 0.03729743, -0.04679677, -0.0603...
1   None    [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...
2   this is another one.    [-0.0025579212, 0.0353713, -0.027139299, -0.05...
3   one more    [-0.014522496, 0.10396308, -0.13107553, -0.198...

Обратите внимание, что эта вложенная структура данных (список в столбце), вероятно, не является оптимальным способом обработки этих векторов, но это будет зависеть от вашего варианта использования. Кроме того, вероятно, есть способ выполнять это вычисление партиями, используя fastext, а не по одной строке за раз (в Python), но я не очень хорошо разбираюсь в нюансах fastext.

person Nick Becker    schedule 26.04.2020