Перейти к содержанию

Хуки (Hooks) в Airflow

Хук — абстракция над конкретным API, позволяющая Airflow взаимодействовать с внешней системой. Хуки встроены во многие операторы, но их можно использовать и напрямую в коде DAG.

В этом руководстве вы узнаете, как использовать хуки в Airflow и когда имеет смысл вызывать их прямо в DAG. Также вы реализуете два разных хука в одном DAG.

В Astronomer Registry доступно более 300 хуков. Если подходящего хука нет, можно написать свой и опубликовать его в сообществе.

Инфо. Подробнее о написании кастомных хуков и операторов см. в руководстве Custom hooks and operators.

Необходимая база

Чтобы получить максимум от руководства, нужно понимать:

Основы хуков

Хуки оборачивают API и предоставляют методы для взаимодействия с внешними системами. Они унифицируют способ работы с внешними системами и делают код DAG чище, понятнее и менее подверженным ошибкам.

Чтобы использовать хук, обычно достаточно connection ID для подключения к внешней системе. Подробнее о настройке подключений: Управление подключениями в Apache Airflow.

Все хуки наследуются от класса BaseHook, в котором реализована установка внешнего подключения по connection ID. Помимо установки соединения, конкретные хуки могут содержать дополнительные методы для действий во внешней системе. Эти методы могут опираться на разные Python-библиотеки. Например, S3Hook использует библиотеку boto3 для работы с Amazon S3.

У S3Hook есть более 20 методов для работы с бакетами Amazon S3. Вот некоторые из них:

  • download_file: загружает файл из S3 в локальную файловую систему.
  • load_file: загружает локальный файл в S3.
  • list_keys: перечисляет ключи в бакете по заданным параметрам.
  • list_prefixes: перечисляет префиксы в бакете по заданным параметрам.
  • check_for_bucket: проверяет существование бакета с указанным именем.

В следующем примере показано использование хука в DAG.

TaskFlow API:

from airflow.sdk import dag, task

@dag
def my_dag():
    @task
    def my_task():
        from airflow.providers.amazon.aws.hooks.s3 import S3Hook

        s3_hook = S3Hook(aws_conn_id="my_aws_conn")
        # используйте методы хука здесь

    my_task()

my_dag()

Традиционный синтаксис:

from airflow.sdk import DAG, task
from airflow.providers.standard.operators.python import PythonOperator

def _my_task():
    from airflow.providers.amazon.aws.hooks.s3 import S3Hook

    s3_hook = S3Hook(aws_conn_id="my_aws_conn")
    # используйте методы хука здесь

with DAG(dag_id="my_dag"):

    my_task = PythonOperator(task_id="my_task", python_callable=_my_task)

Когда использовать хуки

Так как хуки — строительные блоки операторов, их использование в Airflow часто скрыто от автора DAG. Но в ряде случаев хуки стоит вызывать напрямую в Python-функции в DAG. Общие рекомендации:

  • Если нужно регулярно подключаться к API и подходящего хука нет — напишите свой хук и опубликуйте его в сообществе.
  • Если для вашей задачи уже есть оператор со встроенным хуком — используйте оператор, а не ручную настройку хука.
  • Если вы пишете кастомный оператор для работы с внешней системой, он должен использовать хук.
  • Всегда предпочтительнее использовать хук, а не ручное взаимодействие с API. Хуки часто используют в декорированных функциях Airflow (например, с @task) и в DAG, заданных декоратором @asset.

Пример: S3 и Slack

В примере ниже используются хуки S3Hook и SlackHook: чтение значений из файлов в бакете Amazon S3, проверка по ним, отправка результата в Slack и логирование ответа Slack API.

Хуки здесь вызываются напрямую в Python-функциях, потому что ни один из существующих операторов Amazon S3 не умеет читать данные из нескольких файлов в бакете, а операторы Slack не возвращают ответ вызова Slack API (который может понадобиться для логирования и мониторинга).

Исходный код хуков:

Подготовка

Перед запуском примера установите нужные провайдеры Airflow. При использовании Astro CLI добавьте в requirements.txt:

apache-airflow-providers-amazon
apache-airflow-providers-slack

Создание подключений

  1. В Airflow UI откройте Admin → Connections и нажмите + Add Connection.
  2. В поле Connection ID введите уникальное имя подключения.
  3. В списке Connection Type выберите aws для бакета Amazon S3. Если тип aws недоступен, проверьте установку провайдера.
  4. В поле Login введите AWS Access Key ID.
  5. В поле Password введите AWS Secret Access Key. Получить ключи: AWS Account and Access Keys.
  6. Нажмите Save.
  7. Повторите шаги 1–6 для нового подключения к Slack: выберите тип slack, в поле Password укажите Bot User OAuth Token. Токен: api.slack.com/apps → Features → OAuth & Permissions. Нажмите Save.

Запуск примера DAG

В примере DAG используются декораторы Airflow для задач и XCom для передачи данных между задачами. Имя бакета S3 и имена файлов, которые читает первая задача, заданы переменными окружения.

DAG выполняет следующие шаги:

  • Задача с S3Hook читает три заданных ключа из S3 методом read_key и возвращает словарь с содержимым файлов, преобразованным в целые числа.
  • Вторая задача выполняет простую проверку суммы по результатам первой задачи.
  • Метод SlackHook call отправляет результат проверки в канал Slack и возвращает ответ Slack API.

TaskFlow API:

from datetime import datetime
from airflow.decorators import dag, task
from airflow.providers.slack.hooks.slack import SlackHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook

S3BUCKET_NAME = "myhooktutorial"
S3_EXAMPLE_FILE_NAME_1 = "file1.txt"
S3_EXAMPLE_FILE_NAME_2 = "file2.txt"
S3_EXAMPLE_FILE_NAME_3 = "file3.txt"


@task
def read_keys_from_s3():
    s3_hook = S3Hook(aws_conn_id="aws_conn")
    response_file_1 = s3_hook.read_key(
        key=S3_EXAMPLE_FILE_NAME_1, bucket_name=S3BUCKET_NAME
    )
    response_file_2 = s3_hook.read_key(
        key=S3_EXAMPLE_FILE_NAME_2, bucket_name=S3BUCKET_NAME
    )
    response_file_3 = s3_hook.read_key(
        key=S3_EXAMPLE_FILE_NAME_3, bucket_name=S3BUCKET_NAME
    )

    response = {
        "num1": int(response_file_1),
        "num2": int(response_file_2),
        "num3": int(response_file_3),
    }

    return response


@task
def run_sum_check(response):
    if response["num1"] + response["num2"] == response["num3"]:
        return (True, response["num3"])
    return (False, response["num3"])


@task
def post_to_slack(sum_check_result):
    slack_hook = SlackHook(slack_conn_id="hook_tutorial_slack_conn")

    if sum_check_result[0] is True:
        server_response = slack_hook.call(
            api_method="chat.postMessage",
            json={
                "channel": "#test-airflow",
                "text": f"""All is well in your bucket!
                        Correct sum: {sum_check_result[1]}!""",
            },
        )
    else:
        server_response = slack_hook.call(
            api_method="chat.postMessage",
            json={
                "channel": "#test-airflow",
                "text": f"""A test on your bucket contents failed!
                        Target sum not reached: {sum_check_result[1]}""",
            },
        )

    return server_response


@dag(
    dag_id="hook_tutorial",
    start_date=datetime(2022, 5, 20),
    schedule="@daily",
    catchup=False,
)
def hook_tutorial():
    response = read_keys_from_s3()
    sum_check_result = run_sum_check(response)
    post_to_slack(sum_check_result)


hook_tutorial()

Традиционный синтаксис:

from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.slack.hooks.slack import SlackHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook

S3BUCKET_NAME = "myhooktutorial"
S3_EXAMPLE_FILE_NAME_1 = "file1.txt"
S3_EXAMPLE_FILE_NAME_2 = "file2.txt"
S3_EXAMPLE_FILE_NAME_3 = "file3.txt"


def read_keys_from_s3_function():
    s3_hook = S3Hook(aws_conn_id="aws_conn")
    response_file_1 = s3_hook.read_key(
        key=S3_EXAMPLE_FILE_NAME_1, bucket_name=S3BUCKET_NAME
    )
    response_file_2 = s3_hook.read_key(
        key=S3_EXAMPLE_FILE_NAME_2, bucket_name=S3BUCKET_NAME
    )
    response_file_3 = s3_hook.read_key(
        key=S3_EXAMPLE_FILE_NAME_3, bucket_name=S3BUCKET_NAME
    )

    response = {
        "num1": int(response_file_1),
        "num2": int(response_file_2),
        "num3": int(response_file_3),
    }

    return response


def run_sum_check_function(response):
    if response["num1"] + response["num2"] == response["num3"]:
        return (True, response["num3"])
    return (False, response["num3"])


def post_to_slack_function(sum_check_result):
    slack_hook = SlackHook(slack_conn_id="hook_tutorial_slack_conn")

    if sum_check_result[0] is True:
        server_response = slack_hook.call(
            api_method="chat.postMessage",
            json={
                "channel": "#test-airflow",
                "text": f"""All is well in your bucket!
                        Correct sum: {sum_check_result[1]}!""",
            },
        )
    else:
        server_response = slack_hook.call(
            api_method="chat.postMessage",
            json={
                "channel": "#test-airflow",
                "text": f"""A test on your bucket contents failed!
                        Target sum not reached: {sum_check_result[1]}""",
            },
        )

    return server_response


with DAG(
    dag_id="hook_tutorial",
    start_date=datetime(2022, 5, 20),
    schedule="@daily",
    catchup=False,
    render_template_as_native_obj=True,
):
    read_keys_form_s3 = PythonOperator(
        task_id="read_keys_form_s3", python_callable=read_keys_from_s3_function
    )

    run_sum_check = PythonOperator(
        task_id="run_sum_check",
        python_callable=run_sum_check_function,
        op_kwargs={
            "response": "{{ ti.xcom_pull(task_ids='read_keys_form_s3', \
                key='return_value') }}"
        },
    )

    post_to_slack = PythonOperator(
        task_id="post_to_slack",
        python_callable=post_to_slack_function,
        op_kwargs={
            "sum_check_result": "{{ ti.xcom_pull(task_ids='run_sum_check', \
                key='return_value') }}"
        },
    )

    read_keys_form_s3 >> run_sum_check >> post_to_slack

← Выполнение SQL | К содержанию | Управление кодом →