Задача завершена с кодом возврата Negsignal.SIGABRT. Задача воздушного потока не выполняется с помощью SnowflakeOperator

Запуск этого DAG в воздушном потоке дает ошибку, поскольку задача завершена с кодом возврата Negsignal.SIGABRT.

Я не уверен, что я сделал не так


    from airflow import DAG
    from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
    from airflow.utils.dates import days_ago
    
    SNOWFLAKE_CONN_ID = 'snowflake_conn'
    # TODO: should be able to rely on connection's schema, but currently param required by S3ToSnowflakeTransfer
    # SNOWFLAKE_SCHEMA = 'schema_name'
    #SNOWFLAKE_STAGE = 'stage_name'
    SNOWFLAKE_WAREHOUSE = 'SF_TUTS_WH'
    SNOWFLAKE_DATABASE = 'KAFKA_DB'
    SNOWFLAKE_ROLE = 'sysadmin'
    SNOWFLAKE_SAMPLE_TABLE = 'sample_table'
    
    CREATE_TABLE_SQL_STRING = (
        f"CREATE OR REPLACE TRANSIENT TABLE {SNOWFLAKE_SAMPLE_TABLE} (name VARCHAR(250), id INT);"
    )
    
    SQL_INSERT_STATEMENT = f"INSERT INTO {SNOWFLAKE_SAMPLE_TABLE} VALUES ('name', %(id)s)"
    SQL_LIST = [SQL_INSERT_STATEMENT % {"id": n} for n in range(0, 10)]
    
    default_args = {
        'owner': 'airflow',
    }
    
    dag = DAG(
        'example_snowflake',
        default_args=default_args,
        start_date=days_ago(2),
        tags=['example'],
    )
    
    snowflake_op_sql_str = SnowflakeOperator(
        task_id='snowflake_op_sql_str',
        dag=dag,
        snowflake_conn_id=SNOWFLAKE_CONN_ID,
        sql=CREATE_TABLE_SQL_STRING,
        warehouse=SNOWFLAKE_WAREHOUSE,
        database=SNOWFLAKE_DATABASE,
      #  schema=SNOWFLAKE_SCHEMA,
        role=SNOWFLAKE_ROLE,
    )
    
    snowflake_op_with_params = SnowflakeOperator(
        task_id='snowflake_op_with_params',
        dag=dag,
        snowflake_conn_id=SNOWFLAKE_CONN_ID,
        sql=SQL_INSERT_STATEMENT,
        parameters={"id": 56},
        warehouse=SNOWFLAKE_WAREHOUSE,
        database=SNOWFLAKE_DATABASE,
     #   schema=SNOWFLAKE_SCHEMA,
        role=SNOWFLAKE_ROLE,
    )
    
    
    snowflake_op_sql_list = SnowflakeOperator(
        task_id='snowflake_op_sql_list', dag=dag, snowflake_conn_id=SNOWFLAKE_CONN_ID, sql=SQL_LIST
    )
    
    snowflake_op_sql_str >> [
        snowflake_op_with_params,
        snowflake_op_sql_list,]

Получение ЖУРНАЛОВ в airFlow, как показано ниже ::

 Reading local file: /Users/aashayjain/airflow/logs/snowflake_test/snowflake_op_with_params/2021-02-02T13:51:18.229233+00:00/1.log
[2021-02-02 19:21:38,880] {taskinstance.py:826} INFO - Dependencies all met for <TaskInstance: snowflake_test.snowflake_op_with_params 2021-02-02T13:51:18.229233+00:00 [queued]>
[2021-02-02 19:21:38,887] {taskinstance.py:826} INFO - Dependencies all met for <TaskInstance: snowflake_test.snowflake_op_with_params 2021-02-02T13:51:18.229233+00:00 [queued]>
[2021-02-02 19:21:38,887] {taskinstance.py:1017} INFO - 
--------------------------------------------------------------------------------
[2021-02-02 19:21:38,887] {taskinstance.py:1018} INFO - Starting attempt 1 of 1
[2021-02-02 19:21:38,887] {taskinstance.py:1019} INFO - 
--------------------------------------------------------------------------------
[2021-02-02 19:21:38,892] {taskinstance.py:1038} INFO - Executing <Task(SnowflakeOperator): snowflake_op_with_params> on 2021-02-02T13:51:18.229233+00:00
[2021-02-02 19:21:38,895] {standard_task_runner.py:51} INFO - Started process 16510 to run task
[2021-02-02 19:21:38,901] {standard_task_runner.py:75} INFO - Running: ['airflow', 'tasks', 'run', 'snowflake_test', 'snowflake_op_with_params', '2021-02-02T13:51:18.229233+00:00', '--job-id', '7', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/snowflake_test.py', '--cfg-path', '/var/folders/6h/1pzt4pbx6h32h6p5v503wws00000gp/T/tmp1w61m38s']
[2021-02-02 19:21:38,903] {standard_task_runner.py:76} INFO - Job 7: Subtask snowflake_op_with_params
[2021-02-02 19:21:38,933] {logging_mixin.py:103} INFO - Running <TaskInstance: snowflake_test.snowflake_op_with_params 2021-02-02T13:51:18.229233+00:00 [running]> on host 1.0.0.127.in-addr.arpa
[2021-02-02 19:21:38,954] {taskinstance.py:1232} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=snowflake_test
AIRFLOW_CTX_TASK_ID=snowflake_op_with_params
AIRFLOW_CTX_EXECUTION_DATE=2021-02-02T13:51:18.229233+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-02-02T13:51:18.229233+00:00
[2021-02-02 19:21:38,955] {snowflake.py:119} INFO - Executing: INSERT INTO TEST_TABLE VALUES ('name', %(id)s)
[2021-02-02 19:21:38,961] {base.py:74} INFO - Using connection to: id: snowflake_conn. Host: uva00063.us-east-1.snowflakecomputing.com, Port: None, Schema: , Login: aashay, Password: XXXXXXXX, extra: XXXXXXXX
[2021-02-02 19:21:38,963] {connection.py:218} INFO - Snowflake Connector for Python Version: 2.3.7, Python Version: 3.7.3, Platform: Darwin-19.5.0-x86_64-i386-64bit
[2021-02-02 19:21:38,964] {connection.py:769} INFO - This connection is in OCSP Fail Open Mode. TLS Certificates would be checked for validity and revocation status. Any other Certificate Revocation related exceptions or OCSP Responder failures would be disregarded in favor of connectivity.
[2021-02-02 19:21:38,964] {connection.py:785} INFO - Setting use_openssl_only mode to False
[2021-02-02 19:21:38,996] {local_task_job.py:118} INFO - Task exited with return code Negsignal.SIGABRT

apache-airflow == 2.0.0

Python 3.7.3

Жду помощи с этим. дайте мне знать, что мне нужно предоставить более подробную информацию относительно. код или расход воздуха ................................... ???




Ответы (1)


Вы выполняете:

snowflake_op_with_params = SnowflakeOperator(
    task_id='snowflake_op_with_params',
    dag=dag,
    snowflake_conn_id=SNOWFLAKE_CONN_ID,
    sql=SQL_INSERT_STATEMENT,
    parameters={"id": 56},
    warehouse=SNOWFLAKE_WAREHOUSE,
    database=SNOWFLAKE_DATABASE,
 #   schema=SNOWFLAKE_SCHEMA,
    role=SNOWFLAKE_ROLE,
)

Это попытка запустить sql в SQL_INSERT_STATEMENT. Итак, он выполняет:

f"INSERT INTO {SNOWFLAKE_SAMPLE_TABLE} VALUES ('name', %(id)s)"

который дает:

INSERT INTO sample_table VALUES ('name', %(id)s)

Как показано в вашем собственном журнале:

[2021-02-02 19:21:38,955] {snowflake.py:119} INFO - Executing: INSERT INTO TEST_TABLE VALUES ('name', %(id)s)

Это недопустимый оператор SQL.

Я не могу точно сказать, какой SQL вы хотели выполнить. Основываясь на SQL_LIST, я могу предположить, что %(id)s должен быть и id целочисленного типа.

person Elad    schedule 02.02.2021
comment
Я жестко запрограммировал SQL-оператор INFO - Выполнение: INSERT INTO KAFKA_DB.KAFKA_SCHEMA.TEST_TABLE VALUES ('name', 123) INFO -Snowflake Connector для версии Python: 2.3.7, Версия Python: 3.7.3, Платформа: Darwin-19.5.0 -x86_64-i386-64bit INFO - это соединение находится в режиме открытия при отказе OCSP. Сертификаты TLS будут проверяться на предмет действительности и статуса отзыва. Любые другие исключения, связанные с отзывом сертификата, или сбои ответчика OCSP не будут учитываться в пользу возможности подключения. ИНФОРМАЦИЯ - установка для режима use_openssl_only значения False INFO - завершение задачи с кодом возврата Negsignal.SIGABRT - person Aashu; 03.02.2021
comment
По-прежнему возникает та же ошибка. после жесткого кодирования SQL-оператора sql. Этот SQL правильный. протестировано в снежинке - ›INSERT INTO KAFKA_DB.KAFKA_SCHEMA.TEST_TABLE VALUES ('name', 123) - person Aashu; 03.02.2021
comment
@Aashu Может ли хост, указанный в соединении, быть неправильным? Вы используете ocsp. docs.snowflake.com/en/user-guide/… Я считаю, что ваш хост должен быть: ocsp. *. snowflakecomputing.com:80 - person Elad; 03.02.2021
comment
Я также думаю, что это будет проблемой. Я пробовал использовать то, что вы предложили. В соответствии с документами, которые я пробовал - ocsp.snowflakecomputing.com:80, все еще получаю ту же ошибку. Спасибо, что изучили это. очень признателен за вашу помощь.Пожалуйста, дайте мне знать, что вы думаете, если что-то щелкнет для вас, чтобы решить эту проблему .. - person Aashu; 03.02.2021
comment
@Aashu ocsp.snowflakecomputing.com:80 не может быть ответом, потому что он не указывает, к какой учетной записи вы пытаетесь подключиться. Я вижу, вы уже открыли запрос в службу поддержки в github коннектора python снежинки. Возможно, вы захотите открыть реальный случай поддержки, чтобы снежинка. Я бы не стал говорить им об Airflow, это их просто запутает. Вы просто пытаетесь выполнить запрос с помощью оболочки Python. Проблема не в воздушном потоке. Вы также можете прочитать github.com/snowflakedb/snowflake-connector-python/ issues / 245 не та же проблема, что и ваша, но может быть связана - person Elad; 03.02.2021