Перейти к содержанию

Лучшие практики написания DAG в Airflow

Поскольку Airflow полностью описывается кодом, для начала достаточно основ Python. Но чтобы DAG были эффективными, безопасными и масштабируемыми, нужны приёмы, специфичные для Airflow. В этом руководстве — как писать DAG, максимально используя возможности Airflow.

Практики можно разделить на две группы:

  • Использование Airflow как оркестратора.
  • Проектирование DAG.

Для разбора концепций и примеров рекомендуется вебинар DAG Writing Best Practices in Apache Airflow и репозиторий с примерами DAG.

Необходимая база

Полезно понимать:

Идемпотентность

Идемпотентность лежит в основе многих практик, в том числе описанных в этом руководстве. Программа считается идемпотентной, если при одних и тех же входных данных один запуск даёт тот же эффект, что и несколько запусков.

В Airflow DAG считается идемпотентным, если повторный запуск одного и того же DAG run с теми же входами даёт тот же результат, что и однократный запуск. Этого проще добиться, если каждая задача в DAG спроектирована идемпотентной. Идемпотентные DAG и задачи ускоряют восстановление после сбоев и снижают риск потери данных.

Идемпотентность открывает путь к одной из самых полезных возможностей Airflow — повторным запускам (retries).

Настройка retries

В распределённой среде, где контейнеры задач выполняются на общих хостах, задачи могут неожиданно завершаться. В логах Airflow при этом иногда появляются zombie-процессы.

Снизить влияние таких ситуаций помогают повторные запуски задач (retries). Retries задаются на разных уровнях; приоритет такой:

  1. Deployments: переменная окружения AIRFLOW__CORE__DEFAULT_TASK_RETRIES.
  2. DAG: параметр retries в default_args DAG.
  3. Задача: параметр retries у оператора задачи.

Значение retries=2 обычно достаточно для типичных проблем распределённого окружения. Подробнее: Повторный запуск DAG и задач.

Проектирование DAG

Следующие принципы помогают делать DAG идемпотентными, эффективными и читаемыми.

Атомарность задач

При разбиении пайплайна на задачи каждая задача должна выполнять одну операцию, которую можно перезапускать независимо от остальных. В атомарной задаче успех части задачи означает успех всей задачи.

Например, в ETL-пайплайне операции Extract, Transform и Load лучше оформить тремя отдельными задачами. Так каждую операцию можно перезапускать отдельно, что поддерживает идемпотентность.

Шаблонируемые поля, переменные и макросы

С помощью шаблонируемых полей в Airflow в DAG подставляются значения из переменных окружения и Jinja. По сравнению с вызовом Python-функций это помогает сохранять идемпотентность и не выполнять код при каждом цикле планировщика. См. раздел «Избегайте кода на верхнем уровне DAG-файла» ниже.

В примере ниже (вопреки рекомендациям) переменные заданы через функции datetime:

# Переменные для задач
# Плохой пример — даты через datetime
today = datetime.today()
yesterday = datetime.today() - timedelta(1)

Если такой код находится в файле DAG, эти функции выполняются при каждом цикле планировщика, что может влиять на производительность. Важнее то, что DAG перестаёт быть идемпотентным: перезапуск DAG run за прошлую дату не даст того же результата, потому что datetime.today() привязан к текущей дате, а не к дате выполнения DAG.

Лучше использовать переменные и макросы Airflow:

# Переменные для задач
# Хороший пример — метка начала предыдущего успешного DAG run
yesterday = {{ prev_start_date_success }}

Можно использовать встроенные переменные и макросы Airflow или задать свои шаблонируемые поля для передачи данных в рантайме. Подробнее: шаблоны и макросы в Airflow.

Инкрементальная фильтрация записей

По возможности разбивайте пайплайны на инкрементальные извлечение и загрузку. Например, при почасовом DAG каждый DAG run должен обрабатывать только записи за этот час, а не весь набор. Тогда сбой по одной части данных не помешает успешно завершиться остальным DAG run. При идемпотентных DAG можно перезапустить DAG только для неудачной части данных, не переобрабатывая весь набор.

Инкрементальные пайплайны можно строить разными способами.

Дата последнего изменения

Для инкрементальной загрузки рекомендуется использовать дату последнего изменения. В идеале у каждой записи в источнике есть столбец с моментом последнего изменения. DAG run тогда выбирает записи, обновлённые в заданном интервале по этому столбцу.

Например, при почасовом DAG каждый run загружает записи, попавшие в свой час. Сбой одного run не затрагивает остальные.

Последовательные ID

Если даты последнего изменения нет, для инкрементальной загрузки можно использовать последовательный или возрастающий ID. Это хорошо подходит, когда в источник только добавляют записи, а не обновляют. Хотя наличие даты последнего изменения в записях считается лучшей практикой, фильтрация по sequence ID — разумная альтернатива при её отсутствии.

Избегайте кода на верхнем уровне DAG-файла

В Airflow код на верхнем уровне — это код, который выполняется в момент парсинга DAG, а не в момент запуска задачи.

Код внутри оператора или задекорированной задачи выполняется Airflow только при запуске задачи, а не при парсинге DAG. В примере ниже call_external_systems() считается кодом верхнего уровня, так как вызывается при парсинге DAG. Выражение x + y к верхнему уровню не относится — оно в определении задачи и выполняется только при её запуске.

@dag(...)
def the_dag():
    @task
    def do_thing():
        x + y

    num_of_things = call_external_system()  # это «код верхнего уровня»
    chain(do_thing() for _ in range(num_of_things))

the_dag()

В зоне риска любой код вне создания DAG и операторов, который обращается к внешним системам. Весь код в dags_folder выполняется с интервалом min_file_process_interval (по умолчанию 30 секунд). Поэтому код, выполняемый при парсинге DAG и делающий запросы к API, БД или другим внешним системам, может создавать запросы и соединения каждые 30 секунд вместо только запланированных запусков DAG и приводить к проблемам с производительностью.

В примере ниже задачи создаются динамически с PostgresOperator на основе записей из другой БД.

Плохая практика: подключение к БД выполняется вне оператора, как код верхнего уровня. При парсинге DAG планировщик использует переменные hook и result для запроса к таблице grocery_list. Этот запрос выполняется при каждом парсинге и может ухудшать производительность.

Хорошая практика: подключение к БД вынесено в отдельную задачу get_list_of_results. Подключение происходит только при реальном запуске DAG.

"""ВНИМАНИЕ: этот DAG — пример плохих практик. Не используйте его."""

from airflow.decorators import dag
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from pendulum import datetime

# Плохо: код верхнего уровня в DAG-файле
hook = PostgresHook("database_conn")
results = hook.get_records("SELECT * FROM grocery_list;")

sql_queries = []
for result in results:
    grocery = result[0]
    amount = result[1]
    sql_query = f"INSERT INTO purchase_order VALUES ('{grocery}', {amount});"
    sql_queries.append(sql_query)


@dag(
    start_date=datetime(2023, 1, 1), max_active_runs=3, schedule="@daily", catchup=False
)
def bad_practices_dag_1():
    insert_into_purchase_order_postgres = PostgresOperator.partial(
        task_id="insert_into_purchase_order_postgres",
        postgres_conn_id="postgres_default",
    ).expand(sql=sql_queries)


bad_practices_dag_1()
from airflow.decorators import dag, task
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from pendulum import datetime


@dag(
    start_date=datetime(2023, 1, 1), max_active_runs=3, schedule="@daily", catchup=False
)
def good_practices_dag_1():
    @task
    def get_list_of_results():
        # Хорошо: подключение к БД внутри задачи
        hook = PostgresHook("database_conn")
        results = hook.get_records("SELECT * FROM grocery_list;")
        return results

    @task
    def create_sql_query(result):
        grocery = result[0]
        amount = result[1]
        sql = f"INSERT INTO purchase_order VALUES ('{grocery}', {amount});"
        return sql

    sql_queries = create_sql_query.expand(result=get_list_of_results())

    insert_into_purchase_order_postgres = PostgresOperator.partial(
        task_id="insert_into_purchase_order_postgres",
        postgres_conn_id="postgres_default",
    ).expand(sql=sql_queries)


good_practices_dag_1()

Относитесь к DAG-файлу как к конфигурации

Код, не входящий в создание DAG и операторов, усложняет чтение, поддержку и изменение DAG-файла. По возможности оставляйте основную работу хукам и операторам, которые вы создаёте в файле. Если DAG нужен дополнительный код (SQL-скрипт, Python-функция), выносите его в отдельный файл и подключайте при запуске DAG.

Единый способ задания зависимостей между задачами

В Airflow зависимости между задачами можно задавать по-разному: функциями set_upstream() и set_downstream() или операторами << и >>. Выбор способа — вопрос вкуса, но для читаемости лучше выбрать один и использовать его везде.

Вместо смешения способов:

task_1.set_downstream(task_2)
task_3.set_upstream(task_2)
task_3 >> task_4

Лучше придерживаться одного варианта:

task_1 >> task_2 >> [task_3, task_4]

Использование возможностей Airflow

Чтобы максимально использовать Airflow, опирайтесь на встроенные механизмы и экосистему — в первую очередь provider packages для интеграции со сторонними системами. Так проще масштабироваться и подключать нужные инструменты.

Использование provider packages

Одно из главных преимуществ Airflow — активное сообщество и готовые интеграции с другими системами в виде provider packages.

Провайдеры позволяют оркестрировать задачи обработки данных во внешних системах прямо из Airflow. По возможности используйте эти интеграции вместо собственных Python-функций: проще внедрять Airflow в организациях с уже существующим стеком и не писать новый код.

Список доступных провайдеров: Astronomer Registry.

Где выполнять обработку данных

Вариантов реализации обработки данных много. Для нагрузок малого и среднего масштаба обработку часто можно выполнять внутри Airflow при достаточных ресурсах инфраструктуры. Крупную обработку обычно лучше выносить в системы, заточенные под такие задачи, например Apache Spark, а Airflow использовать для оркестрации.

Astronomer рекомендует учитывать текущий и ожидаемый объём данных при выборе между обработкой в Airflow и выносом во внешние системы. Если обработка в Airflow подходит:

  • Используйте кастомный XCom backend или промежуточное хранилище при передаче данных между задачами, чтобы не перегружать БД метаданных.
  • Используйте Kubernetes Executor для изоляции задач и более точного управления ресурсами на уровне задач.
  • Заложите в инфраструктуру Airflow достаточные ресурсы.

Другие практики

Несколько дополнительных рекомендаций.

Единая структура файлов

Одинаковая структура файлов в проектах Airflow упрощает навигацию и онбординг. Пример структуры, которую использует Astronomer:

├── dags/                 # DAG-файлы
│   └── example-dag.py    # пример DAG из инициализированного проекта
├── Dockerfile            # образ Docker и переопределения рантайма для Astronomer
├── include/              # дополнительные файлы (скрипты, SQL и т.д.)
├── plugins/              # кастомные и сторонние плагины Airflow
├── packages.txt          # пакеты на уровне ОС
└── requirements.txt     # Python-зависимости

← К содержанию | Параметры DAG → | Управление кодом →