Запуск этого DAG в воздушном потоке дает ошибку, поскольку задача завершена с кодом возврата Negsignal.SIGABRT.
Я не уверен, что я сделал не так
from airflow import DAG
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from airflow.utils.dates import days_ago
SNOWFLAKE_CONN_ID = 'snowflake_conn'
# TODO: should be able to rely on connection's schema, but currently param required by S3ToSnowflakeTransfer
# SNOWFLAKE_SCHEMA = 'schema_name'
#SNOWFLAKE_STAGE = 'stage_name'
SNOWFLAKE_WAREHOUSE = 'SF_TUTS_WH'
SNOWFLAKE_DATABASE = 'KAFKA_DB'
SNOWFLAKE_ROLE = 'sysadmin'
SNOWFLAKE_SAMPLE_TABLE = 'sample_table'
CREATE_TABLE_SQL_STRING = (
f"CREATE OR REPLACE TRANSIENT TABLE {SNOWFLAKE_SAMPLE_TABLE} (name VARCHAR(250), id INT);"
)
SQL_INSERT_STATEMENT = f"INSERT INTO {SNOWFLAKE_SAMPLE_TABLE} VALUES ('name', %(id)s)"
SQL_LIST = [SQL_INSERT_STATEMENT % {"id": n} for n in range(0, 10)]
default_args = {
'owner': 'airflow',
}
dag = DAG(
'example_snowflake',
default_args=default_args,
start_date=days_ago(2),
tags=['example'],
)
snowflake_op_sql_str = SnowflakeOperator(
task_id='snowflake_op_sql_str',
dag=dag,
snowflake_conn_id=SNOWFLAKE_CONN_ID,
sql=CREATE_TABLE_SQL_STRING,
warehouse=SNOWFLAKE_WAREHOUSE,
database=SNOWFLAKE_DATABASE,
# schema=SNOWFLAKE_SCHEMA,
role=SNOWFLAKE_ROLE,
)
snowflake_op_with_params = SnowflakeOperator(
task_id='snowflake_op_with_params',
dag=dag,
snowflake_conn_id=SNOWFLAKE_CONN_ID,
sql=SQL_INSERT_STATEMENT,
parameters={"id": 56},
warehouse=SNOWFLAKE_WAREHOUSE,
database=SNOWFLAKE_DATABASE,
# schema=SNOWFLAKE_SCHEMA,
role=SNOWFLAKE_ROLE,
)
snowflake_op_sql_list = SnowflakeOperator(
task_id='snowflake_op_sql_list', dag=dag, snowflake_conn_id=SNOWFLAKE_CONN_ID, sql=SQL_LIST
)
snowflake_op_sql_str >> [
snowflake_op_with_params,
snowflake_op_sql_list,]
Получение ЖУРНАЛОВ в airFlow, как показано ниже ::
Reading local file: /Users/aashayjain/airflow/logs/snowflake_test/snowflake_op_with_params/2021-02-02T13:51:18.229233+00:00/1.log
[2021-02-02 19:21:38,880] {taskinstance.py:826} INFO - Dependencies all met for <TaskInstance: snowflake_test.snowflake_op_with_params 2021-02-02T13:51:18.229233+00:00 [queued]>
[2021-02-02 19:21:38,887] {taskinstance.py:826} INFO - Dependencies all met for <TaskInstance: snowflake_test.snowflake_op_with_params 2021-02-02T13:51:18.229233+00:00 [queued]>
[2021-02-02 19:21:38,887] {taskinstance.py:1017} INFO -
--------------------------------------------------------------------------------
[2021-02-02 19:21:38,887] {taskinstance.py:1018} INFO - Starting attempt 1 of 1
[2021-02-02 19:21:38,887] {taskinstance.py:1019} INFO -
--------------------------------------------------------------------------------
[2021-02-02 19:21:38,892] {taskinstance.py:1038} INFO - Executing <Task(SnowflakeOperator): snowflake_op_with_params> on 2021-02-02T13:51:18.229233+00:00
[2021-02-02 19:21:38,895] {standard_task_runner.py:51} INFO - Started process 16510 to run task
[2021-02-02 19:21:38,901] {standard_task_runner.py:75} INFO - Running: ['airflow', 'tasks', 'run', 'snowflake_test', 'snowflake_op_with_params', '2021-02-02T13:51:18.229233+00:00', '--job-id', '7', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/snowflake_test.py', '--cfg-path', '/var/folders/6h/1pzt4pbx6h32h6p5v503wws00000gp/T/tmp1w61m38s']
[2021-02-02 19:21:38,903] {standard_task_runner.py:76} INFO - Job 7: Subtask snowflake_op_with_params
[2021-02-02 19:21:38,933] {logging_mixin.py:103} INFO - Running <TaskInstance: snowflake_test.snowflake_op_with_params 2021-02-02T13:51:18.229233+00:00 [running]> on host 1.0.0.127.in-addr.arpa
[2021-02-02 19:21:38,954] {taskinstance.py:1232} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=snowflake_test
AIRFLOW_CTX_TASK_ID=snowflake_op_with_params
AIRFLOW_CTX_EXECUTION_DATE=2021-02-02T13:51:18.229233+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-02-02T13:51:18.229233+00:00
[2021-02-02 19:21:38,955] {snowflake.py:119} INFO - Executing: INSERT INTO TEST_TABLE VALUES ('name', %(id)s)
[2021-02-02 19:21:38,961] {base.py:74} INFO - Using connection to: id: snowflake_conn. Host: uva00063.us-east-1.snowflakecomputing.com, Port: None, Schema: , Login: aashay, Password: XXXXXXXX, extra: XXXXXXXX
[2021-02-02 19:21:38,963] {connection.py:218} INFO - Snowflake Connector for Python Version: 2.3.7, Python Version: 3.7.3, Platform: Darwin-19.5.0-x86_64-i386-64bit
[2021-02-02 19:21:38,964] {connection.py:769} INFO - This connection is in OCSP Fail Open Mode. TLS Certificates would be checked for validity and revocation status. Any other Certificate Revocation related exceptions or OCSP Responder failures would be disregarded in favor of connectivity.
[2021-02-02 19:21:38,964] {connection.py:785} INFO - Setting use_openssl_only mode to False
[2021-02-02 19:21:38,996] {local_task_job.py:118} INFO - Task exited with return code Negsignal.SIGABRT
apache-airflow == 2.0.0
Python 3.7.3
Жду помощи с этим. дайте мне знать, что мне нужно предоставить более подробную информацию относительно. код или расход воздуха ................................... ???