Подграф Dask compute с фьючерсами

Я хочу отправить задачу dask, которая будет делать следующее:

  1. Создайте ленивый граф dask с помощью dask.bag (def fakejob)
  2. Вычислите график из 1. и сохраните его на паркете (эта часть не учитывается, просто мотивация).

Мне нужно сделать это для нескольких входов, поэтому я пытался использовать фьючерсную функцию dask.distributed вот так.

from dask.distributed import Client

client = Client(processes=True)

def fakejob(path):
    return (
        dask.bag
        .read_text(path)
        .to_dataframe()
    )

futures = client.map(fakejob, [input_path1, input_path2])

Проблема в том, что я получаю: AssertionError: daemonic processes are not allowed to have children

Я пробовал перейти по этой ссылке и получил вторую версию (отличается на 1 строку от первой), но фьючерсы остаются «ожидающими» навсегда.

from dask.distributed import Client

client = Client(processes=True)

def fakejob(path):
    with dask.set_options(get=client.get):
        return (
            dask.bag
            .read_text(path)
            .to_dataframe()
        )

futures = client.map(fakejob, [input_path1, input_path2])

Есть какие-нибудь подсказки, как это сделать?

Ваше здоровье.


person Daniel Severo    schedule 26.07.2017    source источник


Ответы (1)


Странное и немного многочисленное сообщение об ошибке возникает из-за попытки построить граф dask (которым является мешок) в рабочем процессе, где все заканчивается, если вызывается с помощью client.map. Вторая попытка будет работать с локальным клиентом, если вы можете поместить в функцию весь рабочий процесс, включая запись на паркет, и не пытались передать пакет обратно вызывающему.

Решение проще.

bags = [dask.bag.read_text(path)
        .to_dataframe() for path in [input_path1, input_path2])
futures = client.compute(bags)   # run in background on the cluster
client.gather(futures)   # wait and get results

Здесь bags - это список dask-пакетов, то есть рабочих задач, определенных, но еще не запущенных. Вы можете заменить последние две строки на dask.compute(*bags), чтобы получить результат, не беспокоясь о фьючерсах.

person mdurant    schedule 26.07.2017
comment
Привет! Спасибо за ответ :) Решение, которое вы опубликовали, действительно проще, но я думаю, что оно не решает мою проблему. Понимание списка в bags = ... будет строить каждый граф последовательно, верно? На самом деле я хотел, чтобы построение графика тоже было параллельным. Другими словами, я хочу, чтобы понимание этого списка происходило параллельно. Имеет смысл, или я что-то не так понял? - person Daniel Severo; 27.07.2017
comment
Верно, построение списка идет последовательно в локальном потоке - но для вас это медленно? Я думаю, что единственный вариант - использовать dask.delayed и db.from_delayed, но это то, что bag делает внутренне с блоками байтов; единственное, что требует времени, - это определение размера каждого файла. - person mdurant; 27.07.2017
comment
Понятно. На самом деле это требует времени. Я экспериментирую с dask здесь, в офисе, в команде специалистов по анализу данных. Я использую dask.bag для анализа примерно 100 ГБ данных json в день и преобразования их в паркет для очень конкретного аналитического проекта. Он работает хорошо, но на построение ленивого объекта графа уходит около 10 минут. мы планируем развивать его дальше, и эти накладные расходы могут стать проблемой в будущем. На данный момент ваше предложение идеально. Спасибо! - person Daniel Severo; 29.07.2017