Как разделить переменную счетчика между потоками с помощью threadpool.executor и увеличить ее?

Ниже приведен исполнитель пула потоков, который я реализовал в python 3.x.

  with ThreadPoolExecutor(max_workers=15) as ex:
        f = open(filename, 'r', encoding='UTF-8')
        results = {ex.submit(callreadline, files ): files for files in f.readlines() }

Переменная results содержит значения в следующем формате:

слова и соответствующие им 200-мерные вложения

Вы можете видеть, что значения являются кортежами. Первое значение — это слово, а второе значение — 200-мерный массив. Всего значений 400000. Итак, есть 400000 кортежей.

Теперь я хочу создать еще один исполнитель пула потоков, который выполняет следующую задачу.

  1. Создайте упорядоченный словарь первых значений в списке кортежей. Это означает, что слова say из первых четырех значений кортежа — это the, is ,are, say. Тогда упорядоченный словарь будет содержать:

{это: 0, это: 1, это: 2, сказал: 3, ...…………… .привет: 399999}

  1. Создайте массив numpy nd, который содержит 200-мерный массив соответствующих слов в упорядоченном словаре (под соответствующим словом я подразумеваю, что первая запись будет состоять из 200-мерного массива слов the, затем 200-мерного массива < strong>является... и этот список можно продолжить). Таким образом, массив numpy nd будет иметь размерность 400000 * 200.

Я использовал цикл for со следующим кодом

    count = 0
    word_to_idx = OrderedDict()
    vectors = []
    for future in results.result:
            b = future.result()
            word_to_idx[count] = b[0]
            if(count == 0):
                vectors =  np.array([b[1]])
            else:    
                vectors = np.append(vectors,np.array([b[1]]),axis=0)
            count = count +1

В конце вышеупомянутой функции я вернул word_to_idx и векторы, которые выполнили свою работу. Однако зацикливание 400 000 кортежей и присвоение одной переменной переменной заняло очень много времени (около 10 часов).

Поэтому я подумал, есть ли способ распараллелить эту функциональность с помощью исполнителя пула потоков.

Я думал создать потоки, а затем совместно использовать переменную счетчика, чтобы каждый поток получал доступ к общей переменной по одному. Затем поток увеличит эту переменную, а затем другой поток получит доступ к увеличенному счетчику. Может ли кто-нибудь указать мне правильное направление?

Редактировать:

Вот вызов функции readline, которая работает очень быстро, так как вызывается с 15 воркерами:

def callreadline(line):
        # word_to_idx = OrderedDict() 
        word_to_idx = OrderedDict()
        vectors = []
        vocabulary = None
        word_to_idx = read_w2v_word(line.split(' ')[0])
        try:
            vectors = np.append(vectors, [np.array(line.split(' ')[1:])], axis=0)
        except:
            vectors = np.array(line.split(' ')[1:],dtype=float)
        if vocabulary is not None:
            word_to_idx, vectors = filter_words(word_to_idx, vectors, vocabulary)
        return word_to_idx,vectors

person Pratik.S    schedule 25.06.2019    source источник
comment
Комментарии не для расширенного обсуждения; этот разговор был перешел в чат.   -  person Samuel Liew♦    schedule 26.06.2019


Ответы (1)


У меня есть ощущение, что функция callreadline даже близко не работает так быстро, как могла бы быть, но это не было частью вопроса, поэтому позвольте мне попытаться исправить остальное для вас:

with ThreadPoolExecutor(max_workers=15) as ex:
        f = open(filename, 'r', encoding='UTF-8')
        results = [ex.submit(callreadline, files) for files in f.readlines()]

word_to_idx = dict()
vectors = []
for count, future in enumerate(results):
    b = future.result()
    word_to_idx[b[0]] = count
    vectors.append(b[1])

vectors = np.array(vectors)

person Finomnis    schedule 25.06.2019
comment
Я думаю, что это заняло так много времени, потому что изначально я применил цикл if else. Теперь с вашим кодом требуется минимум. Спасибо - person Pratik.S; 26.06.2019
comment
Я мог бы сказать вам, что OrderedDict медленный, что ваш ThreadPoolExecutor ничего не делает, но я думаю, что ДЕЙСТВИТЕЛЬНО причина, по которой он длился вечно, - это эта строка: vectors = np.append(vectors,np.array([b[1]]),axis=0), потому что библиотека указывает: Values are appended to a copy of this array.. Это означает, что каждый раз, когда вы добавляете одну строку, вы копируете ВЕСЬ массив. Просто соберите его в питоне, а затем слейте в конце, это должно быть быстрее. - person Finomnis; 26.06.2019