Я создал даг для загрузки данных из большого запроса в другую большую таблицу запросов. Я использовал BigQueryOperator в composer. Но этот код работает не так, как ожидалось. Я не могу получить сообщение об ошибке, может ли кто-нибудь помочь мне решить эту проблему.
И я вручную создал пустую таблицу, и данные по-прежнему не загружаются в таблицу. Найдите приведенный ниже код и дайте мне знать, пропустил ли я что-нибудь?
from typing import Any
from datetime import datetime, timedelta
import airflow
from airflow import models
from airflow.operators import bash_operator
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
sql="""SELECT * FROM `project_id.dataset_name.source_table`"""
DEFAULT_ARGUMENTS = {
"owner": "Airflow",
"depends_on_past": False,
"start_date": datetime(2019, 8, 7),
"schedule_interval": '0 6 * * *',
"retries": 10
}
dag = models.DAG(
dag_id='Bq_to_bq',
default_args=DEFAULT_ARGUMENTS
)
LOAD_TABLE_TRUNC = BigQueryOperator(
task_id ='load_bq_table_truncate',
dag=dag,
bql=sql,
destination_proect_dataset_table='project-id.dataset-name.table_name',
write_disposition='WRITE_TRUNCATE',
create_disposition='CREATE_IF_NEEDED',
allow_large_results='true',
use_legacy_sql=False,
)
LOAD_TABLE_APPEND = BigQueryOperator(
task_id ='load_bq_table_append',
dag=dag,
bql=sql,
destination_proect_dataset_table='project-id.dataset-name.table_name',
write_disposition='WRITE_APPEND',
create_disposition='CREATE_IF_NEEDED',
allow_large_results='true',
use_legacy_sql=False,
)
LOAD_TABLE_TRUNC.set_downstream(LOAD_TABLE_APPEND)
BigQueryOperator
ожидает логическое примитивное значение дляallow_large_results
. У вас есть'true'
, тогда как это должно бытьTrue
без кавычек. Однако я сомневаюсь, что это причина проблемы. Проверка журналов, как предлагается, будет лучшим подходом - person Josh Laird   schedule 09.08.2019