Используйте Locust для распределения веса трафика по моделям

Производство моделей машинного обучения — сложная практика. Существует много итераций вокруг различных параметров модели, конфигураций оборудования, шаблонов трафика, которые вам придется протестировать, чтобы попытаться завершить развертывание производственного уровня. Нагрузочное тестирование является важной практикой разработки программного обеспечения, но также имеет решающее значение для применения в пространстве MLOps, чтобы увидеть, насколько производительна ваша модель в реальных условиях.

Как мы можем загрузить тест? Простым, но очень эффективным фреймворком является пакет Python: Locust. Locust можно использовать как в базовом, так и в распределенном режиме для имитации до тысяч транзакций в секунду (TPS). В сегодняшнем блоге мы предполагаем базовое понимание этого пакета и кратко рассмотрим основы, но для более общего ознакомления, пожалуйста, обратитесь к этой статье.

Какую модель/конечную точку мы будем тестировать? SageMaker Real-Time Inference — один из лучших вариантов для обслуживания ваших моделей машинного обучения на конечных точках REST, предназначенных для рабочих нагрузок с низкой задержкой и высокой пропускной способностью. В этом блоге мы специально рассмотрим расширенный вариант хостинга, известный как Мультимодельные конечные точки SageMaker. Здесь мы можем разместить тысячи моделей за единственной конечной точкой REST и указать целевую модель, которую мы хотим вызывать для каждого вызова API. Нагрузочное тестирование здесь становится сложной задачей, потому что мы имеем дело с несколькими точками вызова, а не с одной моделью/конечной точкой. Хотя можно случайным образом генерировать трафик для всех моделей, иногда пользователям нужно контролировать, какие модели получают больше трафика. В этом примере мы рассмотрим, как вы можете распределить вес трафика по конкретным моделям, чтобы максимально точно имитировать реальный вариант использования.

ПРИМЕЧАНИЕ. В этой статье предполагается базовое знание AWS и SageMaker, а также свободное владение Python при написании кода, а также базовое понимание пакета Locust. Чтобы понять нагрузочное тестирование конечных точек одной модели SageMaker с Locust в качестве основы, обратитесь к этой статье.

Цитирование набора данных

В этом примере мы будем использовать набор данных Abalone для задачи регрессии, этот набор данных получен из репозитория UCI ML (CC BY 4.0), и вы можете найти официальную цитату здесь.

Создание мультимодельной конечной точки SageMaker

Прежде чем мы сможем приступить к нагрузочному тестированию, нам нужно создать мультимодельную конечную точку SageMaker. Вся разработка для создания конечной точки будет происходить в экземпляре SageMaker Notebook на ядре conda_python3.

В этом примере мы будем использовать набор данных Abalone и запустим на нем алгоритм SageMaker XGBoost для регрессионной модели. Вы можете загрузить набор данных из общедоступных наборов данных Amazon. Мы будем использовать этот набор данных для запуска обучения и создания копий артефактов нашей модели для создания конечной точки мультимодели.

#retreive data
aws s3 cp s3://sagemaker-sample-files/datasets/tabular/uci_abalone/train_csv/abalone_dataset1_train.csv .

Мы можем сначала запустить тренировочное задание, используя встроенный алгоритм SageMaker XGBoost, полное руководство по этому процессу см. в этой статье.

model_path = f's3://{default_bucket}/{s3_prefix}/xgb_model'

image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type=training_instance_type,
)

xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=training_instance_type,
    instance_count=1,
    output_path=model_path,
    sagemaker_session=sagemaker_session,
    role=role
)

xgb_train.set_hyperparameters(
    objective="reg:linear",
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
    silent=0,
)

xgb_train.fit({'train': train_input})

После того, как мы завершим эту обучающую работу, мы возьмем сгенерированные артефакты модели (формат model.tar.gz в SageMaker) и создадим еще одну копию этого артефакта, чтобы иметь две модели позади нашей мультимодельной конечной точки. Очевидно, что в реальных условиях эти модели могут обучаться на разных наборах данных или масштабироваться до тысяч моделей за конечной точкой.

model_artifacts = xgb_train.model_data
model_artifacts # model.tar.gz artifact 
%%sh

s3_bucket='sagemaker-us-east-1-474422712127'

for i in {0..1}
do
  aws s3 cp model.tar.gz s3://$s3_bucket/mme-xgboost/xgboost-$i.tar.gz 
done

После того, как мы сделали эти две копии, мы можем указать путь S3 с нашими артефактами модели для обеих моделей в нашем API-вызове Boto3 create_model.

from time import gmtime, strftime
model_name = 'mme-source' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())

print('Model name: ' + model_name)
print('Model data Url: ' + model_url)

create_model_response = client.create_model(
    ModelName=model_name,
    Containers=[
        {
            "Image": image_uri,
            "Mode": "MultiModel",
            "ModelDataUrl": model_url
        }
    ],
    ExecutionRoleArn=sagemaker.get_execution_role(),
)
print("Model Arn: " + create_model_response["ModelArn"])

Мы можем определить тип нашего экземпляра и количество за конечной точкой в ​​объекте конфигурация конечной точки, который мы затем передаем нашему API-вызову create_endpoint.

#Step 2: EPC Creation
xgboost_epc_name = "mme-source" + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
endpoint_config_response = client.create_endpoint_config(
    EndpointConfigName=xgboost_epc_name,
    ProductionVariants=[
        {
            "VariantName": "xgboostvariant",
            "ModelName": model_name,
            "InstanceType": "ml.m5.xlarge",
            "InitialInstanceCount": 1,
            #"Environment": {} 
        },
    ],
)
print("Endpoint Configuration Arn: " + endpoint_config_response["EndpointConfigArn"])
#Step 3: EP Creation
endpoint_name = "mme-source" + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
create_endpoint_response = client.create_endpoint(
    EndpointName=endpoint_name,
    EndpointConfigName=xgboost_epc_name,
)
print("Endpoint Arn: " + create_endpoint_response["EndpointArn"])

Мы можем проверить, работает ли наша конечная точка с вызовом выборки точки данных из набора данных Abalone. Обратите внимание, что мы указываем целевую модель для мультимодельной конечной точки, здесь мы указываем model.tar.gz или артефакт модели, который мы хотим вызвать.

import boto3

resp = runtime.invoke_endpoint(EndpointName=endpoint_name, Body=b'.345,0.224414,.131102,0.042329,.279923,-0.110329,-0.099358,0.0', 
                           ContentType='text/csv', TargetModel = "xgboost-1.tar.gz")

print(resp['Body'].read())

Этот вызов API invoke_endpoint очень важен, так как это точка контакта, которую мы оцениваем в наших нагрузочных тестах. Теперь у нас есть работающая мультимодельная конечная точка, давайте приступим к тестированию!

Нагрузочное тестирование с Locust

Прежде чем углубляться в настройку нашего скрипта, давайте кратко рассмотрим Locust. Locust — это среда Python, которая позволяет вам определять поведение пользователя с помощью кода Python. Locust определяет выполнение как задачу. Задача в Locust — это, по сути, API или, в нашем случае, вызов invoke_endpoint, который мы хотим протестировать. Каждый пользователь будет выполнять задачи, которые мы определяем для них в скрипте Python, который мы создаем.

У Locust есть ванильный режим, который использует один процесс для запуска ваших тестов, но когда вы хотите увеличить масштаб, он также имеет распределенную функцию генерации нагрузки, которая, по сути, позволяет вам работать с несколькими процессами и даже с несколькими клиентскими машинами.

В этом случае мы хотим бомбардировать нашу мультимодельную конечную точку более чем 1000 TPS, поэтому нам нужна мощная клиентская машина, которая может справиться с нагрузкой, которую мы пытаемся создать. Мы можем развернуть экземпляр EC2, в этом случае мы используем ml.c5d.18xlarge, и мы проведем нагрузочное тестирование в этой среде, чтобы убедиться, что мы не запускаем из сока на стороне клиента. Чтобы понять, как настроить инстанс EC2, пожалуйста, прочтите следующую документацию. Для нашего AMI мы используем Deep Learning AMI GPU TensorFlow 2.9.1 (Ubuntu 20.04). Эти AMI для глубокого обучения поставляются с множеством предустановленных платформ машинного обучения, поэтому я считаю их удобными в этих случаях использования. Обратите внимание, что пока мы используем EC2 для тестирования и вызова нашей конечной точки, вы также можете использовать другой клиентский источник, если он имеет достаточную вычислительную мощность для обработки TPS Locust.

После того, как вы подключитесь к своему экземпляру EC2 по SSHd, мы можем приступить к определению нашего сценария саранчи. Сначала мы определяем клиент boto3, который будет выполнять измеряемый нами вызов invoke_endpoint. Мы параметризовали некоторые из них с помощью сценария распределенной оболочки, который мы рассмотрим позже.

class BotoClient:
    def __init__(self, host):


        #Consider removing retry logic to get accurate picture of failure in locust
        config = Config(
            retries={
                'max_attempts': 100,
                'mode': 'standard'
            }
        )

        self.sagemaker_client = boto3.client('sagemaker-runtime',config=config)
        self.endpoint_name = host.split('/')[-1]
        self.region = region
        self.content_type = content_type
        self.payload = b'.345,0.224414,.131102,0.042329,.279923,-0.110329,-0.099358,0.0'

Теперь, когда мы конкретизируем многомодельные конечные точки. Мы определяем два метода, каждый метод будет воздействовать на одну из наших двух целевых моделей.

#model that receives more traffic
    def sendPopular(self):

        request_meta = {
            "request_type": "InvokeEndpoint",
            "name": "SageMaker",
            "start_time": time.time(),
            "response_length": 0,
            "response": None,
            "context": {},
            "exception": None,
        }
        start_perf_counter = time.perf_counter()
        try:
            response = self.sagemaker_client.invoke_endpoint(
                EndpointName=self.endpoint_name,
                Body=self.payload,
                ContentType=self.content_type,
                TargetModel = 'xgboost-0.tar.gz'
            )
            response_body = response["Body"].read()
        except Exception as e:
            request_meta['exception'] = e

        request_meta["response_time"] = (time.perf_counter() - start_perf_counter) * 1000

        events.request.fire(**request_meta)
#model that receives rest of traffic
    def sendRest(self):

        request_meta = {
            "request_type": "InvokeEndpoint",
            "name": "SageMaker",
            "start_time": time.time(),
            "response_length": 0,
            "response": None,
            "context": {},
            "exception": None,
        }
        start_perf_counter = time.perf_counter()
   
        try:
            response = self.sagemaker_client.invoke_endpoint(
                EndpointName=self.endpoint_name,
                Body=self.payload,
                ContentType=self.content_type,
                TargetModel = 'xgboost-1.tar.gz'
            )
            response_body = response["Body"].read()
        except Exception as e:
            request_meta['exception'] = e

        request_meta["response_time"] = (time.perf_counter() - start_perf_counter) * 1000

        events.request.fire(**request_meta)

Теперь в будущем, если у вас будет 200 моделей, вам нужен метод для каждой? Не обязательно, вы можете указать строку целевой модели, чтобы она соответствовала нужным вам моделям. Например, если у вас есть 200 моделей и вы хотите, чтобы 5 моделей вызывались для определенного метода, мы можем установить для параметра TargetModel что-то вроде следующего фрагмента.

f'xgboost-{random.randint(0,4)}.tar.gz' #specifies 5 models to receive traffic in method

Чем более конкретно вы хотите получить, тем больше методов вам, возможно, придется определить, но если у вас есть общее представление о том, что определенное количество моделей будет получать большую часть трафика, то некоторых манипуляций со строками, подобных приведенным выше, будет достаточно.

Наконец, мы можем определить вес задачи через декоратор. Наша первая модель теперь имеет в три раза больше шансов получить трафик, чем вторая.

class MyUser(BotoUser):

    #This model is 3 times more likely to receive traffic
    @task(3)
    def send_request(self):
        self.client.sendPopular()

    @task
    def send_request_major(self):
        self.client.sendRest()

С помощью декоратора задач мы можем определить вес, а вы можете расширять и манипулировать им в зависимости от схемы вашего трафика.

Наконец, в этом репозитории мы определили сценарий оболочки, который вы можете использовать для увеличения или уменьшения трафика.

#replace with your endpoint name in format https://<<endpoint-name>>
export ENDPOINT_NAME=https://$1

export REGION=us-east-1
export CONTENT_TYPE=text/csv
export USERS=200
export WORKERS=40
export RUN_TIME=2mg
export LOCUST_UI=false # Use Locust UI


#replace with the locust script that you are testing, this is the locust_script that will be used to make the InvokeEndpoint API calls. 
export SCRIPT=locust_script.py

#make sure you are in a virtual environment
#. ./venv/bin/activate

if $LOCUST_UI ; then
    locust -f $SCRIPT -H $ENDPOINT_NAME --master --expect-workers $WORKERS -u $USERS -t $RUN_TIME --csv results &
else
locust -f $SCRIPT -H $ENDPOINT_NAME --master --expect-workers $WORKERS -u $USERS -t $RUN_TIME --csv results --headless &
fi

for (( c=1; c<=$WORKERS; c++ ))
do 
    locust -f $SCRIPT -H $ENDPOINT_NAME --worker --master-host=localhost &
done

Здесь мы определяем параметры, которые считывает наш скрипт саранчи, а также, что наиболее важно, два специфических параметра саранчи для пользователей и рабочих. Здесь вы можете определить количество пользователей, которые будут распределены по разным воркерам. Вы можете масштабировать их как кратные вверх или вниз, чтобы попытаться достичь целевого показателя TPS. Мы можем выполнить наш распределенный тест, выполнив следующую команду.

./distributed.sh <endpoint_name>

Как только мы запустим это, мы увидим в CLI нашего экземпляра EC2, что нагрузочный тест запущен и работает.

Мониторинг

Прежде чем мы закончим, есть несколько различных способов мониторинга нагрузочных тестов. Один из них — через Locust, как видно на скриншоте выше, вы можете отслеживать свой TPS и задержку в режиме реального времени. В конце создается общий файл результатов, содержащий ваши сквозные показатели процентиля задержки и TPS. Чтобы отрегулировать продолжительность теста, проверьте флаг RUN_TIME в вашем скрипте Distributed.sh.

Наконец, чтобы проверить результаты нагрузочного теста, вы можете провести перекрестную проверку с помощью показателей SageMaker CloudWatch Metrics, которые можно найти в консоли.

С Invocation Metrics мы можем получить представление о вызовах, а также о показателях задержки. С Instance Metrics мы можем видеть, насколько хорошо наше оборудование загружено, и нужно ли нам увеличивать или уменьшать масштаб. Чтобы полностью понять, как интерпретировать эти показатели, обратитесь к этой документации.

Здесь мы видим, что мы масштабировались почти до 77 000 вызовов в минуту, что составляет немногим более 1000 TPS, как показали наши показатели Locust. Рекомендуется отслеживать эти показатели как на уровне экземпляра, так и на уровне вызова, чтобы при необходимости можно было правильно определить Автомасштабирование для вашего оборудования.

Дополнительные ресурсы и заключение



Весь код примера можно найти по ссылке выше. Еще раз, если вы новичок в Locust и SageMaker Real-Time Inference, я настоятельно рекомендую вам проверить начальные блоги, связанные с обеими функциями. Сценарии нагрузочного тестирования, прикрепленные к этому репозиторию, можно легко настроить не только для конечных точек SageMaker, но и для любых API, которые вы размещаете и которые необходимо протестировать. Как всегда, любые отзывы приветствуются, и не стесняйтесь обращаться с любыми вопросами или комментариями, спасибо за чтение!

Если вам понравилась эта статья, не стесняйтесь связаться со мной в LinkedIn и подписаться на мою Информационную рассылку. Если вы новичок в Medium, зарегистрируйтесь с помощью моего Реферала для участников.