Как определить причину сбоя аутентификации в Airflow PostgresOperator?

Я новичок в воздушном потоке. В качестве первого обучающего упражнения я создал минимальную группу DAG, которая каждый час записывает значение в таблицу базы данных postgres. Однако мне не удалось установить соединение с базой данных postgres.

Обратите внимание, что я не говорю о создании серверной базы данных postgres для использования локального исполнителя, как описано в этот вопрос. Это другая тема (по крайней мере, в моем понимании).

Я создал минимальный пример, в котором я:

  1. создать пользователя Linux, роль postgresql и базу данных с тем же именем minimal_db
  2. создать DAG, состоящий из одного PostgresOperator
  3. создать соединение на вкладке администратора веб-сервера Airflow

1. Создание пользователя Linux, роли и базы данных postgres.

В этой части я полагаюсь на эту статью.

Я вошел в систему под пользователем postgres и создал роль с именем minimal_db. Я создал базу данных с тем же именем. Затем я также создал суперпользователя Linux с тем же именем. Этот последний шаг кажется ненужным, потому что я думаю, что мне не нужна аутентификация identity. Однако это было упомянуто в статье, за которой я следую:

Чтобы войти в систему с аутентификацией на основе идентификатора, вам понадобится пользователь Linux с тем же именем, что и ваша роль и база данных Postgres.

postgres@ws:~$ createuser --interactive
postgres@ws:~$ createdb minimal_db
gontcharovd@ws:~$ sudo adduser minimal_db

Вот информация о подключении к базе данных:

minimal_db=# \conninfo
You are connected to database "minimal_db" as user "minimal_db" via socket in "/var/run/postgresql" at port "5432".

Я не знаю, что означает сокет /var/run/postgresql.

Я создал таблицу с именем my_table и вставил одно значение:

minimal_db=# CREATE TABLE my_table (my_value INT NOT NULL);
minimal_db=# INSERT INTO my_table VALUES (123);

Я проверил, что это значение присутствует в my_table:

minimal_db=# SELECT * FROM my_table;

my_value
----------
      123
(1 row)

Сервер postgres предлагает принимать соединения:

gontcharovd@ws:~$ pg_isready -h localhost -p 5432
localhost:5432 - accepting connections

2. Минимальный воздушный поток DAG

DAG, который я использую, абсолютно минимален: каждый час он записывает значение 123 в my_table в minimal_db.

import airflow
from airflow import DAG
from airflow.operators.postgres_operator import PostgresOperator

dag = DAG(
    'minimal_example',
    start_date=airflow.utils.dates.days_ago(1),
    schedule_interval='@hourly'
)

write_to_postgres = PostgresOperator(
    task_id='write_to_postgres',
    postgres_conn_id='minimal_db_id',
    sql='INSERT INTO my_table VALUES (123);',
    dag=dag
)

write_to_postgres

3. Соединение Airflow, созданное на вкладке "Администратор".

Вот поле postgres_conn_id, которое я использую в DAG:

Conn Id: minimal_db_id
Con Type: Postgres
Host: localhost
Schema: minimal_db
Login: minimal_db
Password: [same password as the minimal_db role]
Port: 5432

4. Ошибка при выполнении задачи write_to_postgres

Я продолжаю получать ошибку аутентификации по паролю, хотя я уверен, что пароль роли postgresql minimal_db совпадает с паролем в minimal_db_id соединении.

[2020-06-05 20:42:51,845] {taskinstance.py:900} INFO - Executing <Task(PostgresOperator): write_to_postgres> on 2020-06-03T00:00:00+00:00
[2020-06-05 20:42:51,847] {standard_task_runner.py:53} INFO - Started process 59141 to run task
[2020-06-05 20:42:51,875] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: minimal_example.write_to_postgres 2020-06-03T00:00:00+00:00 [running]> ws
[2020-06-05 20:42:51,882] {postgres_operator.py:62} INFO - Executing: INSERT INTO my_table VALUES (123);
[2020-06-05 20:42:51,884] {logging_mixin.py:112} INFO - [2020-06-05 20:42:51,884] {base_hook.py:87} INFO - Using connection to: id: minimal_db_id. Host: localhost, Port: 5432, Schema: minimal_db, Login: minimal_db, Password: XXXXXXXX, extra: None
[2020-06-05 20:42:51,892] {taskinstance.py:1145} ERROR - FATAL:  password authentication failed for user "minimal_db"
FATAL:  password authentication failed for user "minimal_db"
Traceback (most recent call last):
  File "/home/gontcharovd/.conda/envs/dateng/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 983, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/home/gontcharovd/.conda/envs/dateng/lib/python3.8/site-packages/airflow/operators/postgres_operator.py", line 65, in execute
    self.hook.run(self.sql, self.autocommit, parameters=self.parameters)
  File "/home/gontcharovd/.conda/envs/dateng/lib/python3.8/site-packages/airflow/hooks/dbapi_hook.py", line 162, in run
    with closing(self.get_conn()) as conn:
  File "/home/gontcharovd/.conda/envs/dateng/lib/python3.8/site-packages/airflow/hooks/postgres_hook.py", line 93, in get_conn
    self.conn = psycopg2.connect(**conn_args)
  File "/home/gontcharovd/.conda/envs/dateng/lib/python3.8/site-packages/psycopg2/__init__.py", line 127, in connect
    conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
psycopg2.OperationalError: FATAL:  password authentication failed for user "minimal_db"
FATAL:  password authentication failed for user "minimal_db"

[2020-06-05 20:42:51,894] {taskinstance.py:1189} INFO - Marking task as FAILED.dag_id=minimal_example, task_id=write_to_postgres, execution_date=20200603T000000, start_date=20200605T184251, end_date=20200605T184251
[2020-06-05 20:43:01,840] {logging_mixin.py:112} INFO - [2020-06-05 20:43:01,840] {local_task_job.py:103} INFO - Task exited with return code 1

Я попытался изменить поле Host в определении minimal_db_id на /var/run/postgresql. Это приводит к сбою аутентификации однорангового узла вместо сбоя аутентификации по паролю:

*** Reading local file: /home/gontcharovd/airflow/logs/minimal_example/write_to_postgres/2020-06-04T18:00:00+00:00/2.log
[2020-06-05 21:25:36,787] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: minimal_example.write_to_postgres 2020-06-04T18:00:00+00:00 [queued]>
[2020-06-05 21:25:36,793] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: minimal_example.write_to_postgres 2020-06-04T18:00:00+00:00 [queued]>
[2020-06-05 21:25:36,793] {taskinstance.py:879} INFO - 
--------------------------------------------------------------------------------
[2020-06-05 21:25:36,793] {taskinstance.py:880} INFO - Starting attempt 2 of 2
[2020-06-05 21:25:36,793] {taskinstance.py:881} INFO - 
--------------------------------------------------------------------------------
[2020-06-05 21:25:36,798] {taskinstance.py:900} INFO - Executing <Task(PostgresOperator): write_to_postgres> on 2020-06-04T18:00:00+00:00
[2020-06-05 21:25:36,800] {standard_task_runner.py:53} INFO - Started process 64696 to run task
[2020-06-05 21:25:36,836] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: minimal_example.write_to_postgres 2020-06-04T18:00:00+00:00 [running]> ws
[2020-06-05 21:25:36,843] {postgres_operator.py:62} INFO - Executing: INSERT INTO my_table VALUES (123);
[2020-06-05 21:25:36,846] {logging_mixin.py:112} INFO - [2020-06-05 21:25:36,846] {base_hook.py:87} INFO - Using connection to: id: minimal_db_id. Host: /var/run/postgresql/, Port: 5432, Schema: minimal_db, Login: minimal_db, Password: XXXXXXXX, extra: None
[2020-06-05 21:25:36,848] {taskinstance.py:1145} ERROR - FATAL:  Peer authentication failed for user "minimal_db"
Traceback (most recent call last):
  File "/home/gontcharovd/.conda/envs/dateng/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 983, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/home/gontcharovd/.conda/envs/dateng/lib/python3.8/site-packages/airflow/operators/postgres_operator.py", line 65, in execute
    self.hook.run(self.sql, self.autocommit, parameters=self.parameters)
  File "/home/gontcharovd/.conda/envs/dateng/lib/python3.8/site-packages/airflow/hooks/dbapi_hook.py", line 162, in run
    with closing(self.get_conn()) as conn:
  File "/home/gontcharovd/.conda/envs/dateng/lib/python3.8/site-packages/airflow/hooks/postgres_hook.py", line 93, in get_conn
    self.conn = psycopg2.connect(**conn_args)
  File "/home/gontcharovd/.conda/envs/dateng/lib/python3.8/site-packages/psycopg2/__init__.py", line 127, in connect
    conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
psycopg2.OperationalError: FATAL:  Peer authentication failed for user "minimal_db"

[2020-06-05 21:25:36,850] {taskinstance.py:1189} INFO - Marking task as FAILED.dag_id=minimal_example, task_id=write_to_postgres, execution_date=20200604T180000, start_date=20200605T192536, end_date=20200605T192536
[2020-06-05 21:25:46,786] {logging_mixin.py:112} INFO - [2020-06-05 21:25:46,785] {local_task_job.py:103} INFO - Task exited with return code 1

Не знаю, что еще можно попробовать.


person Denis Gontcharov    schedule 05.06.2020    source источник
comment
Посмотрите в файле журнала сервера PostgreSQL более подробную информацию об обоих сбоях. Пароль: [тот же пароль, что и у роли minimal_db] Вы не описали установку пароля для этой роли.   -  person jjanes    schedule 05.06.2020
comment
Спасибо, просмотр журналов позволил мне разобраться в проблеме.   -  person Denis Gontcharov    schedule 08.06.2020


Ответы (1)


Я нашел проблему и решение.

Моя ошибка заключалась в том, что я перепутал пароль пользователя Linux minimal_db с ролью postgres minimal_db. Я удалил роль postgres и воссоздал ее с явным паролем:

CREATE USER minimal_db WITH PASSWORD '123'; 

Я указал этот пароль в поле подключения на вкладке «Администратор» на веб-сервере Airflow. Теперь значения корректно записываются в базу данных.

person Denis Gontcharov    schedule 08.06.2020