Я новичок в воздушном потоке. В качестве первого обучающего упражнения я создал минимальную группу DAG, которая каждый час записывает значение в таблицу базы данных postgres. Однако мне не удалось установить соединение с базой данных postgres.
Обратите внимание, что я не говорю о создании серверной базы данных postgres для использования локального исполнителя, как описано в этот вопрос. Это другая тема (по крайней мере, в моем понимании).
Я создал минимальный пример, в котором я:
- создать пользователя Linux, роль postgresql и базу данных с тем же именем
minimal_db
- создать DAG, состоящий из одного
PostgresOperator
- создать соединение на вкладке администратора веб-сервера Airflow
1. Создание пользователя Linux, роли и базы данных postgres.
В этой части я полагаюсь на эту статью.
Я вошел в систему под пользователем postgres и создал роль с именем minimal_db
. Я создал базу данных с тем же именем. Затем я также создал суперпользователя Linux с тем же именем. Этот последний шаг кажется ненужным, потому что я думаю, что мне не нужна аутентификация identity. Однако это было упомянуто в статье, за которой я следую:
Чтобы войти в систему с аутентификацией на основе идентификатора, вам понадобится пользователь Linux с тем же именем, что и ваша роль и база данных Postgres.
postgres@ws:~$ createuser --interactive
postgres@ws:~$ createdb minimal_db
gontcharovd@ws:~$ sudo adduser minimal_db
Вот информация о подключении к базе данных:
minimal_db=# \conninfo
You are connected to database "minimal_db" as user "minimal_db" via socket in "/var/run/postgresql" at port "5432".
Я не знаю, что означает сокет /var/run/postgresql
.
Я создал таблицу с именем my_table
и вставил одно значение:
minimal_db=# CREATE TABLE my_table (my_value INT NOT NULL);
minimal_db=# INSERT INTO my_table VALUES (123);
Я проверил, что это значение присутствует в my_table
:
minimal_db=# SELECT * FROM my_table;
my_value
----------
123
(1 row)
Сервер postgres предлагает принимать соединения:
gontcharovd@ws:~$ pg_isready -h localhost -p 5432
localhost:5432 - accepting connections
2. Минимальный воздушный поток DAG
DAG, который я использую, абсолютно минимален: каждый час он записывает значение 123 в my_table
в minimal_db
.
import airflow
from airflow import DAG
from airflow.operators.postgres_operator import PostgresOperator
dag = DAG(
'minimal_example',
start_date=airflow.utils.dates.days_ago(1),
schedule_interval='@hourly'
)
write_to_postgres = PostgresOperator(
task_id='write_to_postgres',
postgres_conn_id='minimal_db_id',
sql='INSERT INTO my_table VALUES (123);',
dag=dag
)
write_to_postgres
3. Соединение Airflow, созданное на вкладке "Администратор".
Вот поле postgres_conn_id
, которое я использую в DAG:
Conn Id: minimal_db_id
Con Type: Postgres
Host: localhost
Schema: minimal_db
Login: minimal_db
Password: [same password as the minimal_db role]
Port: 5432
4. Ошибка при выполнении задачи write_to_postgres
Я продолжаю получать ошибку аутентификации по паролю, хотя я уверен, что пароль роли postgresql minimal_db
совпадает с паролем в minimal_db_id
соединении.
[2020-06-05 20:42:51,845] {taskinstance.py:900} INFO - Executing <Task(PostgresOperator): write_to_postgres> on 2020-06-03T00:00:00+00:00
[2020-06-05 20:42:51,847] {standard_task_runner.py:53} INFO - Started process 59141 to run task
[2020-06-05 20:42:51,875] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: minimal_example.write_to_postgres 2020-06-03T00:00:00+00:00 [running]> ws
[2020-06-05 20:42:51,882] {postgres_operator.py:62} INFO - Executing: INSERT INTO my_table VALUES (123);
[2020-06-05 20:42:51,884] {logging_mixin.py:112} INFO - [2020-06-05 20:42:51,884] {base_hook.py:87} INFO - Using connection to: id: minimal_db_id. Host: localhost, Port: 5432, Schema: minimal_db, Login: minimal_db, Password: XXXXXXXX, extra: None
[2020-06-05 20:42:51,892] {taskinstance.py:1145} ERROR - FATAL: password authentication failed for user "minimal_db"
FATAL: password authentication failed for user "minimal_db"
Traceback (most recent call last):
File "/home/gontcharovd/.conda/envs/dateng/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 983, in _run_raw_task
result = task_copy.execute(context=context)
File "/home/gontcharovd/.conda/envs/dateng/lib/python3.8/site-packages/airflow/operators/postgres_operator.py", line 65, in execute
self.hook.run(self.sql, self.autocommit, parameters=self.parameters)
File "/home/gontcharovd/.conda/envs/dateng/lib/python3.8/site-packages/airflow/hooks/dbapi_hook.py", line 162, in run
with closing(self.get_conn()) as conn:
File "/home/gontcharovd/.conda/envs/dateng/lib/python3.8/site-packages/airflow/hooks/postgres_hook.py", line 93, in get_conn
self.conn = psycopg2.connect(**conn_args)
File "/home/gontcharovd/.conda/envs/dateng/lib/python3.8/site-packages/psycopg2/__init__.py", line 127, in connect
conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
psycopg2.OperationalError: FATAL: password authentication failed for user "minimal_db"
FATAL: password authentication failed for user "minimal_db"
[2020-06-05 20:42:51,894] {taskinstance.py:1189} INFO - Marking task as FAILED.dag_id=minimal_example, task_id=write_to_postgres, execution_date=20200603T000000, start_date=20200605T184251, end_date=20200605T184251
[2020-06-05 20:43:01,840] {logging_mixin.py:112} INFO - [2020-06-05 20:43:01,840] {local_task_job.py:103} INFO - Task exited with return code 1
Я попытался изменить поле Host в определении minimal_db_id на /var/run/postgresql
. Это приводит к сбою аутентификации однорангового узла вместо сбоя аутентификации по паролю:
*** Reading local file: /home/gontcharovd/airflow/logs/minimal_example/write_to_postgres/2020-06-04T18:00:00+00:00/2.log
[2020-06-05 21:25:36,787] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: minimal_example.write_to_postgres 2020-06-04T18:00:00+00:00 [queued]>
[2020-06-05 21:25:36,793] {taskinstance.py:669} INFO - Dependencies all met for <TaskInstance: minimal_example.write_to_postgres 2020-06-04T18:00:00+00:00 [queued]>
[2020-06-05 21:25:36,793] {taskinstance.py:879} INFO -
--------------------------------------------------------------------------------
[2020-06-05 21:25:36,793] {taskinstance.py:880} INFO - Starting attempt 2 of 2
[2020-06-05 21:25:36,793] {taskinstance.py:881} INFO -
--------------------------------------------------------------------------------
[2020-06-05 21:25:36,798] {taskinstance.py:900} INFO - Executing <Task(PostgresOperator): write_to_postgres> on 2020-06-04T18:00:00+00:00
[2020-06-05 21:25:36,800] {standard_task_runner.py:53} INFO - Started process 64696 to run task
[2020-06-05 21:25:36,836] {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: minimal_example.write_to_postgres 2020-06-04T18:00:00+00:00 [running]> ws
[2020-06-05 21:25:36,843] {postgres_operator.py:62} INFO - Executing: INSERT INTO my_table VALUES (123);
[2020-06-05 21:25:36,846] {logging_mixin.py:112} INFO - [2020-06-05 21:25:36,846] {base_hook.py:87} INFO - Using connection to: id: minimal_db_id. Host: /var/run/postgresql/, Port: 5432, Schema: minimal_db, Login: minimal_db, Password: XXXXXXXX, extra: None
[2020-06-05 21:25:36,848] {taskinstance.py:1145} ERROR - FATAL: Peer authentication failed for user "minimal_db"
Traceback (most recent call last):
File "/home/gontcharovd/.conda/envs/dateng/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 983, in _run_raw_task
result = task_copy.execute(context=context)
File "/home/gontcharovd/.conda/envs/dateng/lib/python3.8/site-packages/airflow/operators/postgres_operator.py", line 65, in execute
self.hook.run(self.sql, self.autocommit, parameters=self.parameters)
File "/home/gontcharovd/.conda/envs/dateng/lib/python3.8/site-packages/airflow/hooks/dbapi_hook.py", line 162, in run
with closing(self.get_conn()) as conn:
File "/home/gontcharovd/.conda/envs/dateng/lib/python3.8/site-packages/airflow/hooks/postgres_hook.py", line 93, in get_conn
self.conn = psycopg2.connect(**conn_args)
File "/home/gontcharovd/.conda/envs/dateng/lib/python3.8/site-packages/psycopg2/__init__.py", line 127, in connect
conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
psycopg2.OperationalError: FATAL: Peer authentication failed for user "minimal_db"
[2020-06-05 21:25:36,850] {taskinstance.py:1189} INFO - Marking task as FAILED.dag_id=minimal_example, task_id=write_to_postgres, execution_date=20200604T180000, start_date=20200605T192536, end_date=20200605T192536
[2020-06-05 21:25:46,786] {logging_mixin.py:112} INFO - [2020-06-05 21:25:46,785] {local_task_job.py:103} INFO - Task exited with return code 1
Не знаю, что еще можно попробовать.