Программно создать туннель SSH внутри докеризованного оператора python воздушного потока apache

Моя программа не может создать туннель SSH, пока внутри моего контейнера докеров работает apache airflow. Только запуск функции на моем локальном компьютере работает нормально. У меня есть список серверов, которые я использую для создания туннеля, запроса к базе данных и закрытия соединения. Обычно я делаю это следующим образом:

for server in servers:
    server_conn = sshtunnel.SSHTunnelForwarder(
        server,
        ssh_username=ssh_user,
        ssh_password=ssh_password,
        remote_bind_address=(localhost, db_port),
        local_bind_address=(localhost, localport)
    )

Это работает, как и ожидалось, и я могу делать оттуда все, что мне нужно. Однако в Docker это не работает. Я понимаю, что докер запускается и привязывается к порту и на самом деле не является частью хост-системы, поэтому я использовал network_mode="host", чтобы смягчить эту проблему. Однако это не работает, потому что мои контейнеры теряют возможность взаимодействовать друг с другом. Вот мой файл для создания докеров

    postgres:
        image: postgres:9.6
        environment:
            - POSTGRES_USER=airflow
            - POSTGRES_PASSWORD=airflow
            - POSTGRES_DB=airflow
            - PGDATA=/var/lib/postgresql/data/pgdata
        volumes:
            - ~/.whale/pgdata:/var/lib/postgresql/data/pgdata
            - ./dags/docker/sql/create.sql:/docker-entrypoint-initdb.d/init.sql
        ports:
          - "5432:5432"

    webserver:
        image: hawk
        build:
            context: .
            dockerfile: ./dags/docker/Dockerfile-airflow
        restart: always
        depends_on:
            - postgres
            # - redis
        environment:
            - LOAD_EX=n
            - FERNET_KEY=46BKJoQYlPPOexq0OhDZnIlNepKFf87WFwLbfzqDDho=
            - EXECUTOR=Local
        volumes:
            - ./dags:/usr/local/airflow/dags
            # Uncomment to include custom plugins
            # - ./plugins:/usr/local/airflow/plugins
        ports:
            - "8080:8080"
            - "52023:22"
        command: webserver
        healthcheck:
            test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
            interval: 30s
            timeout: 30s
            retries: 3

Я также следовал инструкциям здесь и добрался до точки, где я могу docker exec вводить container и вручную введите приведенный выше фрагмент Python и получите рабочее соединение. Кроме того, я прочитал документацию по воздушному потоку здесь который охватывает операторы соединения SSH, но они поддерживают только команды bash, мне понадобится моя функция python для запуска. Я действительно смущен, почему код Python будет работать, когда exec будет введен в систему, но не когда я запускаю его через свою DAG воздушного потока. В настоящее время я не могу вручную установить все соединения, потому что после развертывания этой системы их будет > 100. Любая помощь будет принята с благодарностью. Если требуется больше глубины, пожалуйста, дайте мне знать.


person Jarred Parr    schedule 21.06.2019    source источник
comment
Обнаруженная проблема заключается в том, что Apache Airflow на данный момент не поддерживает исходящие туннельные подключения SSH. Лучший обходной путь (который я использовал) заключался в размещении конечных точек API на рассматриваемых серверах с нужными мне данными. Очень странно, что они заблокировали эту функцию, и я все еще не исключаю, что это может быть моя собственная конфигурация.   -  person Jarred Parr    schedule 24.06.2019
comment
Привет, повезло с этим? У меня такая же проблема. Я хочу открыть туннель программно и использовать соединение для Postgresql с этим туннелем.   -  person edthix    schedule 11.12.2019
comment
Пример кода здесь gist.github.com/edthix/8bcb0eb8415d01e4302640cddf57f2b6   -  person edthix    schedule 11.12.2019


Ответы (1)


У меня была такая же проблема при открытии туннеля и попытке подключения к базе данных в отдельных задачах, но он работал, выполняя оба в одной задаче (Airflow не сохраняет состояние между запусками задачи):

def select_from_tunnel_db():
    # Open SSH tunnel
    ssh_hook = SSHHook(ssh_conn_id='bastion-ssh-conn', keepalive_interval=60)
    tunnel = ssh_hook.get_tunnel(5432, remote_host='<db_host>', local_port=5432)
    tunnel.start()

    # Connect to DB and run query
    pg_hook = PostgresHook(
        postgres_conn_id='remote-db-conn',  # NOTE: host='localhost'
        schema='db_schema'
    )
    pg_cursor = pg_hook.get_conn().cursor()
    pg_cursor.execute('SELECT * FROM table;')
    select_val = pg_cursor.fetchall()

    return select_val


python_operator = PythonOperator(
    task_id='test_tunnel_conn',
    python_callable=select_from_tunnel_db,
    dag=dag
)

Это перенаправляет трафик через порт 5432 с локального компьютера на тот же порт на удаленном хосте базы данных. Для SSHHook требуется рабочее соединение ssh с конечной точкой, через которую вы будете туннелировать, а для PostgresHook требуется соединение postgres с «localhost» через порт 5432.

person Sbrom    schedule 29.05.2020