с concurrent.futures.ThreadPoolExecutor () в качестве исполнителя: не ждет

Я пытаюсь использовать ThreadPoolExecutor() в методе класса для создания пула потоков, которые будут выполнять другой метод в том же классе. У меня есть with concurrent.futures.ThreadPoolExecutor()..., но он не ждет, и выдается ошибка, говорящая, что в словаре, который я запрашиваю, не было ключа после оператора «with ...». Я понимаю, почему возникает ошибка, потому что словарь еще не обновлен, потому что потоки в пуле не завершили выполнение. Я знаю, что выполнение потоков не завершено, потому что у меня есть печать («готово») в методе, который вызывается в ThreadPoolExecutor, а «готово» не выводится на консоль.

Я новичок в обсуждениях, поэтому приветствую любые предложения о том, как это сделать лучше!

    def tokenizer(self):
        all_tokens = []
        self.token_q = Queue()
        with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
            for num in range(5):
                executor.submit(self.get_tokens, num)
            executor.shutdown(wait=True)

        print("Hi")
        results = {}
        while not self.token_q.empty():
            temp_result = self.token_q.get()
            results[temp_result[1]] = temp_result[0]
            print(temp_result[1])
        for index in range(len(self.zettels)):
            for zettel in results[index]:
                all_tokens.append(zettel)
        return all_tokens

    def get_tokens(self, thread_index):
        print("!!!!!!!")
        switch = {
            0: self.zettels[:(len(self.zettels)/5)],
            1: self.zettels[(len(self.zettels)/5): (len(self.zettels)/5)*2],
            2: self.zettels[(len(self.zettels)/5)*2: (len(self.zettels)/5)*3],
            3: self.zettels[(len(self.zettels)/5)*3: (len(self.zettels)/5)*4],
            4: self.zettels[(len(self.zettels)/5)*4: (len(self.zettels)/5)*5],
        }
        new_tokens = []
        for zettel in switch.get(thread_index):
            tokens = re.split('\W+', str(zettel))
            tokens = list(filter(None, tokens))
            new_tokens.append(tokens)
        print("done")
        self.token_q.put([new_tokens, thread_index])

'''

Ожидается, что вы увидите все операторы print("!!!!!!") и print("done") перед оператором print ("Hi"). Фактически показывает !!!!!!!, затем Hi, затем KeyError для словаря результатов.


person master_coder__    schedule 04.07.2019    source источник


Ответы (2)


Вам нужно выполнить цикл concurrent.futures.as_completed (), как показано здесь. Он будет давать значения по мере завершения каждого потока.

person new-dev-123    schedule 04.07.2019
comment
Спасибо за совет! Это помогло показать ошибку, которая препятствовала завершению метода, однако похоже, что ThreadPool не ждал. Ошибка, появившаяся после добавления этого, показала, что срезы списка в операторе switch get_tokens () отображались как числа с плавающей запятой, потому что я разделил на 5, вместо этого мне нужно было добавить второй /, чтобы сделать результат деления int. Спасибо за руководство @ new-dev-123! - person master_coder__; 04.07.2019

Как вы уже выяснили, пул ожидает; print('done') никогда не выполняется, потому что, предположительно, TypeError возникает раньше.
Пул не ожидает напрямую завершения задач, он ожидает присоединения своих рабочих потоков, что неявно требует завершения выполнения задач в одну сторону (успех ) или другое (исключение).

Причина, по которой вы не видите, что возникает исключение, заключается в том, что задача заключена в _ 3_. А Future

[...] инкапсулирует асинхронное выполнение вызываемого объекта.

Future экземпляры возвращаются _6 _, и они позволяют запрашивать состояние выполнения и получать доступ к любому его результату.

Это подводит меня к некоторым замечаниям, которые я хотел сделать.

Queue в self.token_q кажется ненужным
Судя по коду, который вы поделили, вы используете эту очередь только для передачи результатов ваших задач обратно в tokenizer функцию. В этом нет необходимости, вы можете получить к нему доступ из Future, которое возвращает вызов submit:

def tokenizer(self):
    all_tokens = []
    with ThreadPoolExecutor(max_workers=5) as executor:
        futures = [executor.submit(get_tokens, num) for num in range(5)]
        # executor.shutdown(wait=True) here is redundant, it is called when exiting the context:
        # https://github.com/python/cpython/blob/3.7/Lib/concurrent/futures/_base.py#L623

    print("Hi")
    results = {}
    for fut in futures:
        try:
            res = fut.result()
            results[res[1]] = res[0]
        except Exception:
            continue
    [...] 

def get_tokens(self, thread_index):
    [...]
    # instead of self.token_q.put([new_tokens, thread_index])
    return new_tokens, thread_index

Вполне вероятно, что ваша программа не выиграет от использования потоков
Из кода, который вы поделили, похоже, что операции в get_tokens связаны с ЦП, а не с вводом-выводом. Если вы запускаете свою программу в CPython (или в любом другом интерпретаторе, использующем глобальную блокировку интерпретатора), в этом случае не будет никакой пользы от использования потоков.

В CPython глобальная блокировка интерпретатора или GIL - это мьютекс, который защищает доступ к объектам Python, предотвращая одновременное выполнение байт-кода Python несколькими потоками.

Это означает, что для любого процесса Python в любой момент времени может выполняться только один поток. Это не такая уж большая проблема, если ваша задача связана с вводом-выводом, то есть часто приостанавливается для ожидания ввода-вывода (например, данных в сокете). Если ваши задачи должны постоянно выполнять байт-код в процессоре, нет никакого преимущества для приостановки одного потока, чтобы позволить другому выполнить некоторые инструкции. Фактически, результирующие переключения контекста могут даже оказаться вредными.
Возможно, вы захотите использовать параллелизм вместо параллелизма. Взгляните на ProcessPoolExecutor для этого.
Однако я рекомендую протестировать ваш код, выполняющийся последовательно, одновременно и параллельно. Создание процессов или потоков требует затрат и, в зависимости от выполняемой задачи, это может занять больше времени, чем просто выполнение одной задачи за другой последовательным образом.


Кстати, это выглядит немного подозрительно:

for index in range(len(self.zettels)):
    for zettel in results[index]:
        all_tokens.append(zettel)

results, кажется, всегда имеет пять элементов, потому что for num in range(5). Если длина self.zettels больше пяти, я бы ожидал, что здесь повысится KeyError.
Если self.zettels гарантированно будет иметь длину пять, тогда я бы увидел здесь потенциал для некоторой оптимизации кода.

person shmee    schedule 04.07.2019