невозможно загрузить данные в большую таблицу запросов с помощью BigQueryOperator в композиторе

Я создал даг для загрузки данных из большого запроса в другую большую таблицу запросов. Я использовал BigQueryOperator в composer. Но этот код работает не так, как ожидалось. Я не могу получить сообщение об ошибке, может ли кто-нибудь помочь мне решить эту проблему.

И я вручную создал пустую таблицу, и данные по-прежнему не загружаются в таблицу. Найдите приведенный ниже код и дайте мне знать, пропустил ли я что-нибудь?

from typing import Any
from datetime import datetime, timedelta
import airflow
from airflow import models
from airflow.operators import bash_operator
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.operators.bigquery_operator import BigQueryOperator

sql="""SELECT * FROM `project_id.dataset_name.source_table`"""

DEFAULT_ARGUMENTS = {
    "owner": "Airflow",
    "depends_on_past": False,
    "start_date": datetime(2019, 8, 7),
    "schedule_interval": '0 6 * * *',
    "retries": 10
}

dag = models.DAG(
        dag_id='Bq_to_bq',
        default_args=DEFAULT_ARGUMENTS
    )
LOAD_TABLE_TRUNC = BigQueryOperator(
    task_id ='load_bq_table_truncate',
    dag=dag,
    bql=sql,
    destination_proect_dataset_table='project-id.dataset-name.table_name',
    write_disposition='WRITE_TRUNCATE',
    create_disposition='CREATE_IF_NEEDED',
    allow_large_results='true',
    use_legacy_sql=False,

)

LOAD_TABLE_APPEND = BigQueryOperator(
    task_id ='load_bq_table_append',
    dag=dag,
    bql=sql,
    destination_proect_dataset_table='project-id.dataset-name.table_name',
    write_disposition='WRITE_APPEND',
    create_disposition='CREATE_IF_NEEDED',
    allow_large_results='true',
    use_legacy_sql=False,

)
LOAD_TABLE_TRUNC.set_downstream(LOAD_TABLE_APPEND)

person Prasanna Kumar    schedule 09.08.2019    source источник
comment
BigQueryOperator ожидает логическое примитивное значение для allow_large_results. У вас есть 'true', тогда как это должно быть True без кавычек. Однако я сомневаюсь, что это причина проблемы. Проверка журналов, как предлагается, будет лучшим подходом   -  person Josh Laird    schedule 09.08.2019


Ответы (2)


Это необходимо для выявления ошибки, связанной с отказом группы DAG

Выявить ошибку можно двумя способами

  • Веб интерфейс:

    1. Go to the DAG and select Graph view.
    2. Выберите задачу и нажмите «Просмотреть журнал».
  • Ведение журнала драйвера стека:

    1. Go to this URL https://console.cloud.google.com/logs/viewer? project=project_id.
    2. В первом раскрывающемся списке выберите «Среда Cloud Composer», затем укажите местоположение и имя DAG.
    3. В раскрывающемся списке «Уровень журнала» выберите «Ошибка как».
person BVSKanth    schedule 09.08.2019

Как упомянул Джош в своем комментарии к вашему сообщению, значение allow_large_results должно быть True без кавычек. Кроме того, я вижу, что у вас есть опечатка в написании destination_proect_dataset_table. Вам не хватает буквы "j": destination_project_dataset_table

Рекомендации BVSKanth по поиску ошибок DAG также точны и заслуживают рассмотрения для будущей отладки.

person Wilson    schedule 12.08.2019