Фабрика данных Azure - очистка файлов пакетных задач

Я работаю с фабрикой данных Azure v2, используя пул учетных записей пакетной службы с выделенными узлами для обработки. Со временем я обнаружил, что пакетная активность не выполняется из-за того, что на диске D: / temp больше нет места на узлах. Для каждого задания ADF он создает рабочий каталог на узле, и после завершения задания я обнаружил, что он не очищает файлы. Интересно, сталкивался ли кто-нибудь еще с этим раньше и какое решение лучше всего реализовать.

РЕДАКТИРОВАТЬ: В настоящее время в ADF кажется, что это параметр сохранения файлов, которого не было, когда я задавал вопрос. Для любого, кто в будущем столкнется с той же проблемой, это возможное решение.


person Mike R    schedule 13.01.2019    source источник


Ответы (4)


Выяснили решение, разместив сообщение, чтобы, надеюсь, помочь следующему человеку, который придет.

Я нашел пакет SDK для Azure Python для пакетной обработки, я создал небольшой скрипт, который будет перебирать все пулы + узлы в учетной записи и удалять любые файлы в каталоге рабочих элементов старше 1 дня.

import azure.batch as batch
import azure.batch.operations.file_operations as file_operations
from azure.batch.batch_auth import SharedKeyCredentials
import azure.batch.operations
import msrest.service_client
from datetime import datetime

program_datetime = datetime.utcnow()

batch_account = 'batchaccount001'
batch_url = 'https://batchaccount001.westeurope.batch.azure.com'
batch_key = '<BatchKeyGoesHere>'
batch_credentials = SharedKeyCredentials(batch_account, batch_key)

#Create Batch Client with which to do operations
batch_client = batch.BatchServiceClient(credentials=batch_credentials,
                                        batch_url = batch_url
                                        )

service_client = msrest.service_client.ServiceClient(batch_credentials, batch_client.config)

#List out all the pools
pools = batch_client.pool.list()
pool_list = [p.id for p in pools]

for p in pool_list:
    nodes = batch_client.compute_node.list(p)
    node_list = [n.id for n in nodes]
    for n in node_list:
        pool_id = p
        node_id = n
        print(f'Pool = {pool_id}, Node = {node_id}')
        fo_client = azure.batch.operations.FileOperations(service_client,
                                                          config=batch_client.config,
                                                          serializer=batch_client._serialize,
                                                          deserializer=batch_client._deserialize)
        files = fo_client.list_from_compute_node(pool_id,
                                                 node_id,
                                                 recursive=True,
                                                 file_list_from_compute_node_options=None,
                                                 custom_headers=None,
                                                 raw=False
                                                )

        for file in files:
            # Check to make sure it's not a directory. Directories do not have a last_modified property.
            if not file.is_directory:
                file_datetime = file.properties.last_modified.replace(tzinfo=None)
                file_age_in_seconds = (program_datetime - file_datetime).total_seconds()
                # Delete anything older than a day in the workitems directory.
                if file_age_in_seconds > 86400 and file.name.startswith('workitems'):
                    print(f'{file_age_in_seconds} : {file.name}')
                    fo_client.delete_from_compute_node(pool_id, node_id, file.name)
person Mike R    schedule 15.01.2019

Я инженер фабрики данных Azure. Мы использовали пакет SDK для Azure до 2018-12-01.8.0, поэтому для пакетных задач, созданных с помощью ADF, по умолчанию использовался бесконечный период хранения, как упоминалось ранее. Мы развертываем исправление, чтобы по умолчанию период хранения для пакетных задач, созданных с помощью ADF, составлял 30 дней в будущем, а также вводим свойство retentionTimeInDays в typeProperties настраиваемого действия, которое клиенты могут установить в своих конвейерах ADF, чтобы переопределить это значение по умолчанию. . Когда это будет развернуто, документация по адресу https://docs.microsoft.com/en-us/azure/data-factory/transform-data-using-dotnet-custom-activity#custom-activity будет обновлен с более подробной информацией. Спасибо за терпеливость.

person Minh Do    schedule 17.01.2019

Очистка задач выполняется либо при удалении задачи, либо по истечении времени хранения задач (https://docs.microsoft.com/en-us/rest/api/batchservice/task/add#taskconstraints). Любой из них должен решить возникшую проблему.

Примечание. Время хранения по умолчанию было уменьшено с бесконечного до 7 дней в последней версии REST API (2018-12-01.8.0), чтобы разрешить очистку задачи по умолчанию. Задачи, созданные в более ранних версиях, не будут иметь этого нового значения по умолчанию.

person brklein    schedule 13.01.2019
comment
Интересно. Эти задачи создаются с помощью фабрики данных Azure. Есть ли в JSON конкретное свойство для ссылки на действие? docs. microsoft.com/en-us/azure/data-factory/ - person Mike R; 14.01.2019

Вы можете использовать конфигурацию retentionTimeInDays в typeProperties при развертывании с помощью шаблона ARM.

Обратите внимание, что вы должны указать конфигурацию retentionTimeInDays в Double, а не String.

person cold_coder    schedule 02.04.2019