Как можно использовать облачные секреты префекта со спиралью?

Я пытаюсь запустить flow, используя секрет, хранящийся в prefect cloud, с coiled.

введите здесь описание изображения

Ошибка flow с prefect client error:

prefect.utilities.exceptions.ClientError: Malformed response received from Cloud - please ensure that you have an API token properly configured.

Это ошибка аутентификации? Нужно ли передавать prefect cloud authentication token кластеру coiled? или добавьте конфигурацию в coiled, используя один из этих опций

Ниже приведен код и конфигурации, которые я использовал:

~/.prefect/config.toml

[cloud]
use_local_secrets = false


[flows]
checkpointing = true

[cloud.agent]
# Runner token
auth_token = "<auth_token>"

Flow

import prefect
from prefect import Flow, Parameter, task
from prefect.tasks.secrets import PrefectSecret

@task(log_stdout=True)
def add_last_name(first_name, last_name="Smith", pwd=None):
    
    if pwd=="hello":
        last_name="Hamilton"
    
    full_name = f"{first_name} {last_name}"
    print(full_name)

    return full_name

with Flow(name="tst-deploy") as flow:
    first_name = Parameter("first_name", default="Alexander")
    
    # Get secret  
    unlock_nm_secret = PrefectSecret("fake_pwd")

    # test prefect secret
    full_name = add_last_name(first_name, pwd=unlock_nm_secret)


if __name__ == "__main__":

    import coiled
    
    run_se = 1
    if run_se == 1:
        coiled.create_software_environment(
            name="tst-prefect-py38",
            conda={"channels": ["conda-forge", "defaults"],
                "dependencies": ["python=3.8.8", "numpy", "prefect"]},
            post_build=["python -m pip install jupyter-server-proxy"]
        )

    from prefect.executors import DaskExecutor, LocalExecutor

    e = 3
    if e==1:
        print("Local")
        executor = LocalExecutor()
    elif e==2:
        print("Local Dask Executor")
        executor = DaskExecutor()
    elif e==3:
        print("Coiled")
        executor = DaskExecutor(
            cluster_class=coiled.Cluster,
            cluster_kwargs={
                "n_workers":1,
                "worker_cpu":1,
                "worker_memory":"8 GiB",
                "scheduler_memory": "8 GiB",
                "software": "grybox/tst-prefect-py38",
                "name": "tst-py38",
                "shutdown_on_close": False,
            }
            )

    state = flow.run(
        executor= executor
    )

    flow.visualize(flow_state=state)

Отслеживание

[2021-03-16 19:04:44+0000] INFO - prefect.TaskRunner | Task 'fake_pwd': Starting task run...
[2021-03-16 19:04:44+0000] ERROR - prefect.TaskRunner | Unexpected error: ClientError('Malformed response received from Cloud - please ensure that you have an API token properly configured.')
Traceback (most recent call last):
  File "/opt/conda/envs/coiled/lib/python3.8/site-packages/prefect/client/client.py", line 465, in _request
    json_resp = response.json()
  File "/opt/conda/envs/coiled/lib/python3.8/site-packages/requests/models.py", line 900, in json
    return complexjson.loads(self.text, **kwargs)
  File "/opt/conda/envs/coiled/lib/python3.8/json/__init__.py", line 357, in loads
    return _default_decoder.decode(s)
  File "/opt/conda/envs/coiled/lib/python3.8/json/decoder.py", line 337, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "/opt/conda/envs/coiled/lib/python3.8/json/decoder.py", line 355, in raw_decode
    raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
  File "/opt/conda/envs/coiled/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/opt/conda/envs/coiled/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 865, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/opt/conda/envs/coiled/lib/python3.8/site-packages/prefect/utilities/executors.py", line 299, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "/opt/conda/envs/coiled/lib/python3.8/site-packages/prefect/tasks/secrets/base.py", line 68, in run
    return _Secret(name).get()
  File "/opt/conda/envs/coiled/lib/python3.8/site-packages/prefect/client/secrets.py", line 161, in get
    raise exc
  File "/opt/conda/envs/coiled/lib/python3.8/site-packages/prefect/client/secrets.py", line 145, in get
    result = self.client.graphql(
  File "/opt/conda/envs/coiled/lib/python3.8/site-packages/prefect/client/client.py", line 298, in graphql
    result = self.post(
  File "/opt/conda/envs/coiled/lib/python3.8/site-packages/prefect/client/client.py", line 213, in post
    response = self._request(
  File "/opt/conda/envs/coiled/lib/python3.8/site-packages/prefect/client/client.py", line 468, in _request
    raise ClientError(
prefect.utilities.exceptions.ClientError: Malformed response received from Cloud - please ensure that you have an API token properly configured.
[2021-03-16 19:04:44+0000] INFO - prefect.TaskRunner | Task 'fake_pwd': Finished task run for task with final state: 'Failed'

Заранее спасибо.


person Itay Livni    schedule 16.03.2021    source источник


Ответы (1)


Добавление нового auth_token в ~/.prefect/config.toml работает.

[cloud]
use_local_secrets = false
auth_token = "<auth_token>"

[flows]
checkpointing = true

[cloud.agent]
# Runner token
auth_token = "<auth_token>"
person Itay Livni    schedule 20.03.2021