Dask - это библиотека параллельных вычислений для Python. Я думаю об этом как о MPI без необходимости писать код MPI, что я очень ценю!
Dask изначально масштабирует Python
Dask обеспечивает расширенный параллелизм для аналитики, обеспечивая масштабируемую производительность для ваших любимых инструментов
Одним из наиболее интересных аспектов Dask является возможность масштабирования между компьютерами / серверами / узлами / модулями / контейнером и т. Д. Вот почему я говорю, что это как MPI.
Сегодня мы поговорим о следующем:
- Преимущества использования Kubernetes
- Недостатки использования Kubernetes
- Установите диаграмму Dask Helm
- Увеличивайте / уменьшайте масштаб ваших Dask Workers с
kubectl scale
- Измените диаграмму Dask Helm, чтобы добавить дополнительные пакеты
- Автоматическое масштабирование ваших Dask Workers с помощью горизонтальных AutoScaler Pod
Преимущества Dask на Kubernetes
Давайте поговорим о некоторых (многих!) Преимуществах использования Kubernetes!
Настраиваемая конфигурация
Еще один очень важный аспект Dask, по крайней мере для меня, заключается в том, что я могу настроить его так, чтобы инфраструктура была полностью прозрачна для пользователя, и им никогда не приходилось к ней прикасаться. Это говорит мне, потому что традиционно я был на стороне DevOps и инфраструктуры решения технических проблем и работал с специалистами по данным, которые занимались разработкой приложений или моделей.
Мне нравится уровень гибкости, когда я могу получить хорошую настройку по умолчанию на стороне инфраструктуры, а затем пользователи, которые заботятся о настройке, могут при необходимости настроить на стороне приложения.
Возьмем пример из реального мира. Я работаю в команде с специалистом по данным, который создает веб-приложение Dash, которое выполняет визуализацию некоторого набора данных в реальном времени.
Идея состоит в том, что пользователь нажимает кнопку, и на заднем плане числа обрабатываются, параметры модели оцениваются, и наука движется вперед!
Мы не хотим обрабатывать их в браузере или даже на одном сервере, потому что вы видели наборы данных биоинформатики? Наборы данных постоянно становятся больше и выше в разрешении. (Хорошо, пока треп.)
В идеале мы хотим обрабатывать числа в распределенной вычислительной среде с автоматическим масштабированием, которая полностью независима от нашего веб-приложения. Вот тут-то и пригодится Даск!
Возможность интеграции
Вы редко используете только один инструмент или систему. Чаще всего я вижу что-то вроде:
- Данные хранятся в сетевой файловой системе или S3.
- Исследовательский анализ выполняется на JupyterHub, RStudio или вычислительном кластере HPC с использованием только SSH.
- Окончательный анализ помещается в инструмент конвейера, такой как Apache Airflow, при этом задачи выполняются в Kubernetes или в системе очередей заданий, такой как AWS Batch.
Каждый из них можно рассматривать как отдельный компонент, и, используя что-то вроде Terraform, вы можете легко развернуть свои файловые системы, приложения и кластеры.
Затем идет интеграция с самим пакетом Dask. Dask может использоваться как капля замены для многих функций SKLearn, использующих dask_ml
библиотеку.
Возможность автоматического масштабирования
Тогда, конечно, мы не хотим платить за то, чтобы куча серверов просто валялась без дела и ничего не делала. Вот почему мы в облаке! Если вы используете Dask в Kubernetes, вы можете либо масштабировать вручную, либо масштабировать по мере необходимости, создав правила автоматического масштабирования в своей конфигурации Kubernetes.
Даск обратит внимание на то, когда работы идут вверх или вниз, и воспользуется всем этим скрытым образом, и вам не придется ничего делать, кроме как направить это в планировщик!
Недостатки Dask в Kubernetes
Вам нужно изучить Kubernetes и иметь хотя бы базовое представление о том, как работают контейнеры Docker. Если у вас нет ни одного из них, вам предстоит довольно крутая кривая обучения.
Используемые технологии
Для этого урока вам необходимо установить kubectl и helm.
Вам понадобится какой-нибудь кластер Kubernetes или MiniKube. Kubernetes не зависит от платформы, поэтому не имеет значения, выполняете ли вы развертывание на AWS, GCP или внутри компании.
Если вам интересно, как я развертываю свои кластеры AWS EKS, вы можете прочитать это сообщение в блоге, которое я написал.
Оттуда мы будем использовать Helm V3, который является своего рода диспетчером пакетов для приложений в Kubernetes, для развертывания нашего Dask Cluster.
Установить Dask
На этом этапе вы должны запустить и запустить свой кластер Kubernetes или MiniKube.
Единственное отличие, которое нас волнует в нашей настройке Kubernetes / MiniKube, - это предоставление сервисов. Если вы используете облачного провайдера, например AWS, вам нужно выполнить развертывание с помощью LoadBalancer serviceType. Если вы используете на сайте или MiniKube, вам нужно будет развернуть с помощью NodePort, а затем использовать kubectl для перенаправления адреса. Если вы используете Minikube, это лучший исчерпывающий ресурс, который я нашел для демонстрации услуг.
Для этого первого прохода мы просто собираемся установить Dask и осмотреться. Затем мы сделаем несколько крутых вещей с автомасштабированием!
Мы собираемся использовать официальную Dask Helm Chart.
helm repo add dask https://helm.dask.org/
helm repo update
# Kubernetes cluster on a cloud provider (AWS, GCP, Azure)
# The default serviceType is already set as LoadBalancer
# Make sure to remember your release names!
export RELEASE="dask"
helm install ${RELEASE} dask/dask
# Kubernetes cluster on an in house server/minikube
# helm install dask dask/dask --set serviceType=NodePort
Вы заметите, что при установке диаграммы с помощью Helm появятся несколько очень удобных инструкций. Если вам когда-нибудь понадобится получить к ним доступ снова, и вы не хотите тратить остаток своей жизни на прокрутку, запустите команду helm status $RELEASE
с именем версии, которое вы указали ранее, в данном случае dask
.
Это может занять несколько минут, но давайте выполним несколько команд.
kubectl get pods |grep dask
kubectl get svc |grep dask
Вы должны увидеть один или несколько модулей в таком состоянии, как Init
или Running
. Чтобы приступить к работе, нужно несколько минут, так что иди выпей чаю!
Если вы пользуетесь услугами облачного провайдера, вы увидите службу LoadBalancer для записной книжки Jupyter и Dask Scheduler. Если вы используете MiniKube, вам нужно будет запустить команду MiniKube, чтобы получить URL-адрес службы.
Изучите нашу среду JupyterHub
Теперь у нас есть Jupyterhub, который очень удобен, потому что дает нам консоль Python и терминал.
Перейдите по URL-адресу, отображаемому в dask-jupyter
svc.
kubectl get svc dask-jupyter
# Or if you can't remember the name of all the services just grep for your release name
# kubectl get svc |grep dask
Пароль по умолчанию - dask
. Если вы хотите изменить его, прочтите dask helm chart docs.
Слева вы увидите File Browser
. Перейдите к examples/01-custom-delayed.ipynb
. Начните пробегать по ячейкам кода, пока не дойдете до клиента dask.
Настроить клиент Dask
Это одна из тех вещей, которые действительно сбивают с толку людей. Под людьми я также подразумеваю себя, когда изучал Docker. ;-)
Вы можете (как правило) получить доступ к службам одним из двух способов. Если вы работаете внутри кластера, как мы на Jupyterhub Notebook, вы можете использовать имя службы, в данном случае dask-scheduler
, потому что сетевая магия. У нас также есть внешний IP-адрес, поэтому мы собираемся его использовать. Вы можете использовать любой из них, и он отлично работает. Вероятно, лучше использовать имя службы, потому что IP-адрес может измениться, но я хотел продемонстрировать, что здесь он также работает с внешним IP.
Не забывайте tcp
перед IP-адресом. Этот меня все время достает!
Нажмите кнопку запуска, и вы должны увидеть некоторую информацию о кластере Dask!
Просмотрите остальные ячейки, чтобы у нас было что-то интересное, на что можно посмотреть на панели мониторинга состояния Dask.
Изучите Dask Dashboard
Вернитесь к своим svcs.
kubectl get svc dask-scheduler
И получите Внешний IP. Откройте этот веб-адрес с портом 80 или без него (порт по умолчанию для любого сайта - порт 80, поэтому вводить его не нужно) и проверьте Dask Dashboard.
from dask.distributed import Client, progress
import dask
import distributed
dask.config.set({
"distributed.comm.timeouts.tcp": "50s",
"distributed.scheduler.allowed-failures": 999
})
c = Client("tcp://dask-scheduler:8786")
# c = Client("tcp://EXTERNAL_IP:8786")
c
# TRUST BUT VERIFY
dask.config.get('distributed.comm.timeouts')
dask.config.get('distributed.scheduler.allowed-failures')
Статус Dask
Это часть панели мониторинга по умолчанию. Если вы пробежались по остальным ячейкам, вы должны кое-что увидеть здесь.
Если вы хотите немного поиграть, проверьте функции, которые фактически выполняются с Dask, и измените их параметры.
%%time
zs = []
# Change the range value to see how it affects tasks
for i in range(10000):
x = inc(i)
y = dec(x)
z = add(x, y)
zs.append(z)
zs = dask.persist(*zs)
total = dask.delayed(sum)(zs)
Масштабируйте рабочих Dask
Теперь, когда мы выполнили базовые проверки, чтобы убедиться, что все работает, давайте изменим количество рабочих задач. Я сделал это по двум причинам. Я хотел удостовериться, что это не заставит Dask Scheduler сойти с ума. Затем я хотел заложить основы для автомасштабирования или масштабирования в зависимости от спроса!
Для этого воспользуемся командой kubectl scale
. То, что мы масштабируем, основано на типе Kubernetes, который обычно равен Deployment
, StatefulSet
или SVC
. Для получения дополнительной информации ознакомьтесь с шпаргалкой по kubectl и масштабированием развертывания.
Наш Dask Worker - это Deployment, поэтому мы будем использовать стратегию масштабирования Deployment. Если вы не знаете, что масштабируете, перейдите к диаграмме штурвала, а затем перейдите к шаблону того, что вы хотите масштабировать.
Вот шаблон dask-worker.
Масштабируем нашего рабочего до 4 реплик!
kubectl scale deployment.v1.apps/dask-worker --replicas=4
Вы должны увидеть сообщение об успешном завершении:
deployment.apps/dask-worker scaled
Получите модули, и вы увидите, что модули dask-worker
либо активированы, либо инициализируются!
(base) root@6e0a2f79446d:~# kubectl get pods |grep dask
dask-jupyter-6596ccb45f-2cl64 1/1 Running 0 6d23h
dask-scheduler-689c44ccf7-c6j58 1/1 Running 0 6d23h
dask-worker-7df97cb9d-bvkhm 1/1 Running 0 6d23h
dask-worker-7df97cb9d-dwq45 1/1 Running 0 7m55s
dask-worker-7df97cb9d-k7xdb 1/1 Running 0 43s
dask-worker-7df97cb9d-qvkl6 1/1 Running 0 7m55s
Вы можете перейти на панель управления Dask - ›Рабочие, и вы увидите 4 рабочих!
Чтобы подготовиться к следующему разделу, давайте уменьшим число наших рабочих до одной копии.
kubectl scale deployment.v1.apps/dask-worker --replicas=1
Автоматическое масштабирование рабочих Dask
Следующий раздел не является обязательным. В нем описывается, как автоматически масштабировать ваших рабочих Dask для динамического увеличения или уменьшения в зависимости от спроса. Это сложная тема, и точные цифры во многом основаны на вашем собственном кластере и сценарии Kubernetes.
Во-первых, вам необходимо установить сервер метрик helm.
helm repo add stable https://kubernetes-charts.storage.googleapis.com
helm repo update
helm install metrics stable/metrics-server
Предупреждение, при этом невероятно легко разрушить вашу систему. Не играйте с автомасштабированием в производственной системе!
Если вы нарушили установку, чаще всего в форме получения одного из модулей Evicted
, проще всего просто удалить выпуск с помощью helm delete dask
, а затем переустановить с помощью helm install dask dask/dask
.
Ранее мы рассматривали масштабирование. Масштабирование - это круто, но его недостаточно для того, что мы хотим. Что мы действительно хотим, так это иметь автомасштабирование, что означает, что мы масштабируемся в зависимости от спроса!
kubectl scale deployment.v1.apps/dask-worker --replicas=1
kubectl autoscale deployment.v1.apps/dask-worker --min=1 --max=3 --cpu-percent=1
Это добавит автоматическое масштабирование горизонтальных модулей.
kubectl get hpa
# To delete it run
# kubectl delete hpa dask-worker
Обновите наше развертывание Dask с помощью Dask-ML
Теперь, чтобы сделать этот пример более интересным, давайте обновим наши развертывания, добавив dask-ml
библиотеки.
Перед тем, как это сделать, убедитесь, что вы экспортировали все интересующие вас работы! Это воссоздает все контейнеры!
wget https://raw.githubusercontent.com/dask/helm-chart/master/dask/values.yaml
Откройте values.yaml
и найдите рабочий ›ключ env. Измените переменную среды EXTRA_CONDA_PACKAGES, чтобы включить dask-ml
и matplotlib
как в определение worker, так и в jupyterhub.
worker:
name: worker
image:
repository: "daskdev/dask"
tag: 2.14.0
pullPolicy: IfNotPresent
# dask_worker: "dask-cuda-worker"
dask_worker: "dask-worker"
pullSecrets:
# - name: regcred
replicas: 3
default_resources: # overwritten by resource limits if they exist
cpu: 1
memory: "4GiB"
env:
# - name: EXTRA_APT_PACKAGES
# value: build-essential openssl
# - name: EXTRA_CONDA_PACKAGES
# value: numba xarray -c conda-forge
# - name: EXTRA_PIP_PACKAGES
# value: s3fs dask-ml --upgrade
To:
worker:
name: worker
image:
repository: "daskdev/dask"
tag: 2.14.0
pullPolicy: IfNotPresent
# dask_worker: "dask-cuda-worker"
dask_worker: "dask-worker"
pullSecrets:
# - name: regcred
replicas: 3
default_resources: # overwritten by resource limits if they exist
cpu: 1
memory: "4GiB"
env:
# - name: EXTRA_APT_PACKAGES
# value: build-essential openssl
############## ADD IN EXTRA PACKAGES HERE!
- name: EXTRA_CONDA_PACKAGES
value: numba xarray dask-ml matplotlib -c conda-forge
# - name: EXTRA_PIP_PACKAGES
# value: s3fs dask-ml --upgrade
Сделайте то же самое с переменными jupyterhub!
Теперь обновите установку:
helm upgrade --install dask dask/dask --values values.yaml
Пойдите и посмотрите, как поживают ваши стручки:
kubectl get pods |grep dask
Вы можете увидеть что-то вроде «Создание контейнера» или «Инициировать что-то», потому что по умолчанию Kubernetes выполняет непрерывное обновление, при котором он не обновляет ваши поды до тех пор, пока не будет готов следующий.
Обновите страницу Jupyterhub и снова войдите в систему с паролем dask
.
Откройте блокнот examples/ml/incremental.ipynb
.
Обязательно измените конфигурацию клиента!
from dask.distributed import Client, progress
import dask
import distributed
dask.config.set({
"distributed.comm.timeouts.tcp": "50s",
"distributed.scheduler.allowed-failures": 999
})
c = Client("tcp://dask-scheduler:8786")
Пройдите по ячейкам, пока не дойдете до «Создать случайный набор данных».
Вы увидите код, который выглядит примерно так, только с разными номерами для n_samples
и n_features
.
Что вы хотите сделать, так это изменить их на очень маленькие числа, просто чтобы протестировать и не рисковать разрушить всю вашу систему.
Я начал с n_samples=100
и n_features=2
. Просто поиграйте с этим!
n_samples = 100
n_features = 2
# These were my final numbers to get anything interesting with the autoscaling
# n_samples = 10000000
# n_features = 500
chunks = n_samples // 50
X, y = dask_ml.datasets.make_classification(
n_samples=n_samples, n_features=n_features,
chunks=chunks,
random_state=0
)
Теперь вы можете прогонять код, пока не дойдете до Create a Scikit-Learn Model, SGDClassifier и блока% time.
%time _ = inc.fit(X_train, y_train, classes=[0, 1])
Здесь и происходит волшебство dask! Просмотрите весь код своими маленькими числами. Затем постепенно увеличивайте, чтобы увидеть, как это повлияет на автомасштабирование.
Вы можете проверить автомасштабирование несколькими способами. Один из них - перейти в раздел Workers на панели инструментов Dask. Другой - просто запустить watch kubectl get pods
.
Когда именно ваши поды будут масштабироваться, зависит от количества ЦП / памяти, доступного вашему кластеру Kubernetes и отдельным контейнерам. Я попытался установить его на очень низком уровне, чтобы каждый мог видеть автоматическое масштабирование в действии, но вам придется немного поэкспериментировать с n_samples
и n_features
, чтобы занять достаточно ЦП и памяти для масштабирования ваших модулей.
Вот несколько модулей в состоянии ожидания:
И вот они регистрируются в Dask Scheduler!
Устранение неполадок выселенных
Если ваши поды продолжают появляться как «выселенные», это означает, что вы пытаетесь запланировать действия, для которых в вашем кластере нет ресурсов. Вы можете перейти к values.yaml
и поиграть с различными resources
клавишами, чтобы запросить контейнеры меньшего размера.
Это действительно одна из тех вещей, с которыми вам нужно поиграть, пока вы не получите точные настройки.
Исправление проблем
# Get your dask-worker pod names
kubectl get pods |grep dask-worker
# Get various metrics and logs
kubectl logs pod-name
kubectl describe pod pod-name
kubectl top pod pod-name
Заворачивать
Вот и все! Надеюсь, этот пост оказался для вас полезным! Если у вас есть какие-либо вопросы или вы хотите запросить учебное пособие, не стесняйтесь обращаться ко мне по адресу [email protected] или загляните на мой сайт.
Документы и ресурсы
Модули горизонтального масштабирования Kubernetes
Таблица управления сервером метрик
Таблица Dask Helm - Пароль Jupyter
Диаграмма Dask Helm - настройка среды Python
Первоначально опубликовано на https://www.dabbleofdevops.com.