Dask - это библиотека параллельных вычислений для Python. Я думаю об этом как о MPI без необходимости писать код MPI, что я очень ценю!

Dask изначально масштабирует Python

Dask обеспечивает расширенный параллелизм для аналитики, обеспечивая масштабируемую производительность для ваших любимых инструментов

https://dask.org

Одним из наиболее интересных аспектов Dask является возможность масштабирования между компьютерами / серверами / узлами / модулями / контейнером и т. Д. Вот почему я говорю, что это как MPI.

Сегодня мы поговорим о следующем:

  • Преимущества использования Kubernetes
  • Недостатки использования Kubernetes
  • Установите диаграмму Dask Helm
  • Увеличивайте / уменьшайте масштаб ваших Dask Workers с kubectl scale
  • Измените диаграмму Dask Helm, чтобы добавить дополнительные пакеты
  • Автоматическое масштабирование ваших Dask Workers с помощью горизонтальных AutoScaler Pod

Преимущества Dask на Kubernetes

Давайте поговорим о некоторых (многих!) Преимуществах использования Kubernetes!

Настраиваемая конфигурация

Еще один очень важный аспект Dask, по крайней мере для меня, заключается в том, что я могу настроить его так, чтобы инфраструктура была полностью прозрачна для пользователя, и им никогда не приходилось к ней прикасаться. Это говорит мне, потому что традиционно я был на стороне DevOps и инфраструктуры решения технических проблем и работал с специалистами по данным, которые занимались разработкой приложений или моделей.

Мне нравится уровень гибкости, когда я могу получить хорошую настройку по умолчанию на стороне инфраструктуры, а затем пользователи, которые заботятся о настройке, могут при необходимости настроить на стороне приложения.

Возьмем пример из реального мира. Я работаю в команде с специалистом по данным, который создает веб-приложение Dash, которое выполняет визуализацию некоторого набора данных в реальном времени.

Идея состоит в том, что пользователь нажимает кнопку, и на заднем плане числа обрабатываются, параметры модели оцениваются, и наука движется вперед!

Мы не хотим обрабатывать их в браузере или даже на одном сервере, потому что вы видели наборы данных биоинформатики? Наборы данных постоянно становятся больше и выше в разрешении. (Хорошо, пока треп.)

В идеале мы хотим обрабатывать числа в распределенной вычислительной среде с автоматическим масштабированием, которая полностью независима от нашего веб-приложения. Вот тут-то и пригодится Даск!

Возможность интеграции

Вы редко используете только один инструмент или систему. Чаще всего я вижу что-то вроде:

  • Данные хранятся в сетевой файловой системе или S3.
  • Исследовательский анализ выполняется на JupyterHub, RStudio или вычислительном кластере HPC с использованием только SSH.
  • Окончательный анализ помещается в инструмент конвейера, такой как Apache Airflow, при этом задачи выполняются в Kubernetes или в системе очередей заданий, такой как AWS Batch.

Каждый из них можно рассматривать как отдельный компонент, и, используя что-то вроде Terraform, вы можете легко развернуть свои файловые системы, приложения и кластеры.

Затем идет интеграция с самим пакетом Dask. Dask может использоваться как капля замены для многих функций SKLearn, использующих dask_ml библиотеку.

Возможность автоматического масштабирования

Тогда, конечно, мы не хотим платить за то, чтобы куча серверов просто валялась без дела и ничего не делала. Вот почему мы в облаке! Если вы используете Dask в Kubernetes, вы можете либо масштабировать вручную, либо масштабировать по мере необходимости, создав правила автоматического масштабирования в своей конфигурации Kubernetes.

Даск обратит внимание на то, когда работы идут вверх или вниз, и воспользуется всем этим скрытым образом, и вам не придется ничего делать, кроме как направить это в планировщик!

Недостатки Dask в Kubernetes

Вам нужно изучить Kubernetes и иметь хотя бы базовое представление о том, как работают контейнеры Docker. Если у вас нет ни одного из них, вам предстоит довольно крутая кривая обучения.

Используемые технологии

Для этого урока вам необходимо установить kubectl и helm.

Вам понадобится какой-нибудь кластер Kubernetes или MiniKube. Kubernetes не зависит от платформы, поэтому не имеет значения, выполняете ли вы развертывание на AWS, GCP или внутри компании.

Если вам интересно, как я развертываю свои кластеры AWS EKS, вы можете прочитать это сообщение в блоге, которое я написал.

Оттуда мы будем использовать Helm V3, который является своего рода диспетчером пакетов для приложений в Kubernetes, для развертывания нашего Dask Cluster.

Установить Dask

На этом этапе вы должны запустить и запустить свой кластер Kubernetes или MiniKube.

Единственное отличие, которое нас волнует в нашей настройке Kubernetes / MiniKube, - это предоставление сервисов. Если вы используете облачного провайдера, например AWS, вам нужно выполнить развертывание с помощью LoadBalancer serviceType. Если вы используете на сайте или MiniKube, вам нужно будет развернуть с помощью NodePort, а затем использовать kubectl для перенаправления адреса. Если вы используете Minikube, это лучший исчерпывающий ресурс, который я нашел для демонстрации услуг.

Для этого первого прохода мы просто собираемся установить Dask и осмотреться. Затем мы сделаем несколько крутых вещей с автомасштабированием!

Мы собираемся использовать официальную Dask Helm Chart.

helm repo add dask https://helm.dask.org/
helm repo update
# Kubernetes cluster on a cloud provider (AWS, GCP, Azure)
# The default serviceType is already set as LoadBalancer
# Make sure to remember your release names!
export RELEASE="dask"
helm install ${RELEASE} dask/dask
# Kubernetes cluster on an in house server/minikube
# helm install dask dask/dask --set serviceType=NodePort

Вы заметите, что при установке диаграммы с помощью Helm появятся несколько очень удобных инструкций. Если вам когда-нибудь понадобится получить к ним доступ снова, и вы не хотите тратить остаток своей жизни на прокрутку, запустите команду helm status $RELEASE с именем версии, которое вы указали ранее, в данном случае dask.

Это может занять несколько минут, но давайте выполним несколько команд.

kubectl get pods |grep dask 
kubectl get svc |grep dask

Вы должны увидеть один или несколько модулей в таком состоянии, как Init или Running. Чтобы приступить к работе, нужно несколько минут, так что иди выпей чаю!

Если вы пользуетесь услугами облачного провайдера, вы увидите службу LoadBalancer для записной книжки Jupyter и Dask Scheduler. Если вы используете MiniKube, вам нужно будет запустить команду MiniKube, чтобы получить URL-адрес службы.

Изучите нашу среду JupyterHub

Теперь у нас есть Jupyterhub, который очень удобен, потому что дает нам консоль Python и терминал.

Перейдите по URL-адресу, отображаемому в dask-jupyter svc.

kubectl get svc dask-jupyter 
# Or if you can't remember the name of all the services just grep for your release name 
# kubectl get svc |grep dask

Пароль по умолчанию - dask. Если вы хотите изменить его, прочтите dask helm chart docs.

Слева вы увидите File Browser. Перейдите к examples/01-custom-delayed.ipynb. Начните пробегать по ячейкам кода, пока не дойдете до клиента dask.

Настроить клиент Dask

Это одна из тех вещей, которые действительно сбивают с толку людей. Под людьми я также подразумеваю себя, когда изучал Docker. ;-)

Вы можете (как правило) получить доступ к службам одним из двух способов. Если вы работаете внутри кластера, как мы на Jupyterhub Notebook, вы можете использовать имя службы, в данном случае dask-scheduler, потому что сетевая магия. У нас также есть внешний IP-адрес, поэтому мы собираемся его использовать. Вы можете использовать любой из них, и он отлично работает. Вероятно, лучше использовать имя службы, потому что IP-адрес может измениться, но я хотел продемонстрировать, что здесь он также работает с внешним IP.

Не забывайте tcp перед IP-адресом. Этот меня все время достает!

Нажмите кнопку запуска, и вы должны увидеть некоторую информацию о кластере Dask!

Просмотрите остальные ячейки, чтобы у нас было что-то интересное, на что можно посмотреть на панели мониторинга состояния Dask.

Изучите Dask Dashboard

Вернитесь к своим svcs.

kubectl get svc dask-scheduler

И получите Внешний IP. Откройте этот веб-адрес с портом 80 или без него (порт по умолчанию для любого сайта - порт 80, поэтому вводить его не нужно) и проверьте Dask Dashboard.

from dask.distributed import Client, progress
import dask
import distributed
dask.config.set({
"distributed.comm.timeouts.tcp": "50s",
"distributed.scheduler.allowed-failures": 999
})
c = Client("tcp://dask-scheduler:8786")
# c = Client("tcp://EXTERNAL_IP:8786")
c

# TRUST BUT VERIFY
dask.config.get('distributed.comm.timeouts')
dask.config.get('distributed.scheduler.allowed-failures')

Статус Dask

Это часть панели мониторинга по умолчанию. Если вы пробежались по остальным ячейкам, вы должны кое-что увидеть здесь.

Если вы хотите немного поиграть, проверьте функции, которые фактически выполняются с Dask, и измените их параметры.

%%time
zs = []
# Change the range value to see how it affects tasks
for i in range(10000):
    x = inc(i)
    y = dec(x)
    z = add(x, y)
    zs.append(z)

zs = dask.persist(*zs)
total = dask.delayed(sum)(zs)

Масштабируйте рабочих Dask

Теперь, когда мы выполнили базовые проверки, чтобы убедиться, что все работает, давайте изменим количество рабочих задач. Я сделал это по двум причинам. Я хотел удостовериться, что это не заставит Dask Scheduler сойти с ума. Затем я хотел заложить основы для автомасштабирования или масштабирования в зависимости от спроса!

Для этого воспользуемся командой kubectl scale. То, что мы масштабируем, основано на типе Kubernetes, который обычно равен Deployment, StatefulSet или SVC. Для получения дополнительной информации ознакомьтесь с шпаргалкой по kubectl и масштабированием развертывания.

Наш Dask Worker - это Deployment, поэтому мы будем использовать стратегию масштабирования Deployment. Если вы не знаете, что масштабируете, перейдите к диаграмме штурвала, а затем перейдите к шаблону того, что вы хотите масштабировать.

Вот шаблон dask-worker.

Масштабируем нашего рабочего до 4 реплик!

kubectl scale deployment.v1.apps/dask-worker --replicas=4

Вы должны увидеть сообщение об успешном завершении:

deployment.apps/dask-worker scaled

Получите модули, и вы увидите, что модули dask-worker либо активированы, либо инициализируются!

(base) root@6e0a2f79446d:~# kubectl get pods |grep dask
dask-jupyter-6596ccb45f-2cl64     1/1     Running   0          6d23h
dask-scheduler-689c44ccf7-c6j58   1/1     Running   0          6d23h
dask-worker-7df97cb9d-bvkhm       1/1     Running   0          6d23h
dask-worker-7df97cb9d-dwq45       1/1     Running   0          7m55s
dask-worker-7df97cb9d-k7xdb       1/1     Running   0          43s
dask-worker-7df97cb9d-qvkl6       1/1     Running   0          7m55s

Вы можете перейти на панель управления Dask - ›Рабочие, и вы увидите 4 рабочих!

Чтобы подготовиться к следующему разделу, давайте уменьшим число наших рабочих до одной копии.

kubectl scale deployment.v1.apps/dask-worker --replicas=1

Автоматическое масштабирование рабочих Dask

Следующий раздел не является обязательным. В нем описывается, как автоматически масштабировать ваших рабочих Dask для динамического увеличения или уменьшения в зависимости от спроса. Это сложная тема, и точные цифры во многом основаны на вашем собственном кластере и сценарии Kubernetes.

Во-первых, вам необходимо установить сервер метрик helm.

helm repo add stable https://kubernetes-charts.storage.googleapis.com
helm repo update
helm install metrics stable/metrics-server

Предупреждение, при этом невероятно легко разрушить вашу систему. Не играйте с автомасштабированием в производственной системе!

Если вы нарушили установку, чаще всего в форме получения одного из модулей Evicted, проще всего просто удалить выпуск с помощью helm delete dask, а затем переустановить с помощью helm install dask dask/dask.

Ранее мы рассматривали масштабирование. Масштабирование - это круто, но его недостаточно для того, что мы хотим. Что мы действительно хотим, так это иметь автомасштабирование, что означает, что мы масштабируемся в зависимости от спроса!

kubectl scale deployment.v1.apps/dask-worker --replicas=1 
kubectl autoscale deployment.v1.apps/dask-worker --min=1 --max=3 --cpu-percent=1

Это добавит автоматическое масштабирование горизонтальных модулей.

kubectl get hpa 
# To delete it run 
# kubectl delete hpa dask-worker

Обновите наше развертывание Dask с помощью Dask-ML

Теперь, чтобы сделать этот пример более интересным, давайте обновим наши развертывания, добавив dask-ml библиотеки.

Перед тем, как это сделать, убедитесь, что вы экспортировали все интересующие вас работы! Это воссоздает все контейнеры!

wget https://raw.githubusercontent.com/dask/helm-chart/master/dask/values.yaml

Откройте values.yaml и найдите рабочий ›ключ env. Измените переменную среды EXTRA_CONDA_PACKAGES, чтобы включить dask-ml и matplotlib как в определение worker, так и в jupyterhub.

worker:
  name: worker
  image:
    repository: "daskdev/dask"
    tag: 2.14.0
    pullPolicy: IfNotPresent
    # dask_worker: "dask-cuda-worker"
    dask_worker: "dask-worker"
    pullSecrets:
    #  - name: regcred
  replicas: 3
  default_resources:  # overwritten by resource limits if they exist
    cpu: 1
    memory: "4GiB"
  env:
  #  - name: EXTRA_APT_PACKAGES
  #    value: build-essential openssl
  #  - name: EXTRA_CONDA_PACKAGES
  #    value: numba xarray -c conda-forge
  #  - name: EXTRA_PIP_PACKAGES
  #    value: s3fs dask-ml --upgrade

To:

worker:
  name: worker
  image:
    repository: "daskdev/dask"
    tag: 2.14.0
    pullPolicy: IfNotPresent
    # dask_worker: "dask-cuda-worker"
    dask_worker: "dask-worker"
    pullSecrets:
    #  - name: regcred
  replicas: 3
  default_resources:  # overwritten by resource limits if they exist
    cpu: 1
    memory: "4GiB"
  env:
  #  - name: EXTRA_APT_PACKAGES
  #    value: build-essential openssl
  ############## ADD IN EXTRA PACKAGES HERE!
    - name: EXTRA_CONDA_PACKAGES
      value: numba xarray dask-ml matplotlib -c conda-forge
  #  - name: EXTRA_PIP_PACKAGES
  #    value: s3fs dask-ml --upgrade

Сделайте то же самое с переменными jupyterhub!

Теперь обновите установку:

helm upgrade --install dask dask/dask --values values.yaml

Пойдите и посмотрите, как поживают ваши стручки:

kubectl get pods |grep dask

Вы можете увидеть что-то вроде «Создание контейнера» или «Инициировать что-то», потому что по умолчанию Kubernetes выполняет непрерывное обновление, при котором он не обновляет ваши поды до тех пор, пока не будет готов следующий.

Обновите страницу Jupyterhub и снова войдите в систему с паролем dask.

Откройте блокнот examples/ml/incremental.ipynb.

Обязательно измените конфигурацию клиента!

from dask.distributed import Client, progress
import dask
import distributed
dask.config.set({
"distributed.comm.timeouts.tcp": "50s",
"distributed.scheduler.allowed-failures": 999
})
c = Client("tcp://dask-scheduler:8786")

Пройдите по ячейкам, пока не дойдете до «Создать случайный набор данных».

Вы увидите код, который выглядит примерно так, только с разными номерами для n_samples и n_features.

Что вы хотите сделать, так это изменить их на очень маленькие числа, просто чтобы протестировать и не рисковать разрушить всю вашу систему.

Я начал с n_samples=100 и n_features=2. Просто поиграйте с этим!

n_samples = 100
n_features = 2

# These were my final numbers to get anything interesting with the autoscaling
# n_samples = 10000000
# n_features = 500
chunks = n_samples // 50

X, y = dask_ml.datasets.make_classification(
        n_samples=n_samples, n_features=n_features,
        chunks=chunks,
        random_state=0
)

Теперь вы можете прогонять код, пока не дойдете до Create a Scikit-Learn Model, SGDClassifier и блока% time.

%time _ = inc.fit(X_train, y_train, classes=[0, 1])

Здесь и происходит волшебство dask! Просмотрите весь код своими маленькими числами. Затем постепенно увеличивайте, чтобы увидеть, как это повлияет на автомасштабирование.

Вы можете проверить автомасштабирование несколькими способами. Один из них - перейти в раздел Workers на панели инструментов Dask. Другой - просто запустить watch kubectl get pods.

Когда именно ваши поды будут масштабироваться, зависит от количества ЦП / памяти, доступного вашему кластеру Kubernetes и отдельным контейнерам. Я попытался установить его на очень низком уровне, чтобы каждый мог видеть автоматическое масштабирование в действии, но вам придется немного поэкспериментировать с n_samples и n_features, чтобы занять достаточно ЦП и памяти для масштабирования ваших модулей.

Вот несколько модулей в состоянии ожидания:

И вот они регистрируются в Dask Scheduler!

Устранение неполадок выселенных

Если ваши поды продолжают появляться как «выселенные», это означает, что вы пытаетесь запланировать действия, для которых в вашем кластере нет ресурсов. Вы можете перейти к values.yaml и поиграть с различными resources клавишами, чтобы запросить контейнеры меньшего размера.

Это действительно одна из тех вещей, с которыми вам нужно поиграть, пока вы не получите точные настройки.

Исправление проблем

# Get your dask-worker pod names 
kubectl get pods |grep dask-worker 
# Get various metrics and logs 
kubectl logs pod-name 
kubectl describe pod pod-name 
kubectl top pod pod-name

Заворачивать

Вот и все! Надеюсь, этот пост оказался для вас полезным! Если у вас есть какие-либо вопросы или вы хотите запросить учебное пособие, не стесняйтесь обращаться ко мне по адресу [email protected] или загляните на мой сайт.

Документы и ресурсы

Модули горизонтального масштабирования Kubernetes

Таблица управления сервером метрик

Таблица Даск-Шлем

Таблица Dask Helm - Пароль Jupyter

Диаграмма Dask Helm - настройка среды Python

Первоначально опубликовано на https://www.dabbleofdevops.com.