Перейти к содержанию

Масштабирование Airflow (Scaling workers)

Эта страница ещё не обновлена под Airflow 3. Описанные концепции актуальны, но часть примеров кода может потребовать правок. При запуске примеров обновляйте импорты и учитывайте возможные breaking changes.

Информация

Одно из главных преимуществ Apache Airflow — возможность масштабироваться под меняющиеся потребности организации. Чтобы использовать Airflow по максимуму, при росте пайплайнов стоит настроить несколько ключевых параметров.

В Airflow есть множество параметров, связанных с производительностью на уровне DAG и задач. Среди них:

  • Настройки уровня задачи.
  • Настройки уровня DAG.
  • Настройки уровня окружения.

В этом руководстве — основные параметры для настройки производительности Airflow, влияние выбора executor на масштабирование и типичные проблемы при масштабировании.

Справка дана для Airflow 2.0 и новее. В более ранних версиях названия параметров могут отличаться.

Вебинар: Scaling Out Airflow.

Дополнительные материалы по теме см. в разделе «Other ways to learn».

Необходимая база

Полезно понимать:

Настройка параметров

В Airflow много параметров влияют на производительность. Их настройка затрагивает парсинг DAG и планирование задач, параллелизм в окружении и др.

Возможность тонкой настройки связана с тем, что Airflow как оркестратор используется в самых разных сценариях. Админы или DevOps могут настраивать параметры на уровне окружения, чтобы не перегружать инфраструктуру; авторы DAG — на уровне DAG или задачи, чтобы пайплайны не перегружали внешние системы. Понимание требований сценария поможет выбрать, какие параметры менять.

Настройки уровня окружения

Настройки уровня окружения действуют на всё окружение Airflow (все DAG). У всех есть значения по умолчанию; их можно переопределить переменной окружения или в airflow.cfg. Значения по умолчанию приведены в Airflow Configuration Reference. Текущие значения в существующем окружении: Admin → Configuration в UI Airflow. Подробнее: Setting Configuration Options.

На Astronomer эти параметры задаются переменными окружения Astronomer. См. Environment Variables on Astronomer.

Настройки уровня окружения имеет смысл менять, когда нужно настроить производительность для всех DAG в окружении, особенно чтобы DAG стабильно работали на вашей инфраструктуре.

Core Settings

Параметры Core задают число одновременно выполняемых процессов и длительность их работы во всём окружении. Переменные окружения для параметров этой секции имеют вид AIRFLOW__CORE__PARAMETER_NAME.

  • dagbag_import_timeout: Сколько секунд может длиться импорт DAG в dagbag до таймаута; должно быть меньше dag_file_processor_timeout. Если в логах парсинга DAG видны таймауты или DAG не появляется в списке / в ошибках импорта, увеличьте значение. Можно увеличить и если задачи не выполняются — воркерам нужно заполнять dagbag при выполнении. По умолчанию 30 секунд.
  • dag_file_processor_timeout: Сколько времени может работать DagFileProcessor (обработка одного DAG-файла) до таймаута. По умолчанию 50 секунд.
  • max_active_runs_per_dag: Максимальное число активных DAG run (на один DAG), которые планировщик может создавать одновременно. DAG run — это один «запуск» DAG во времени, как task instance — один запуск задачи. Параметр особенно важен при backfill пропущенных DAG run. Учитывайте это при настройке. По умолчанию 16.
  • max_active_tasks_per_dag (ранее dag_concurrency): Максимальное число задач, которые могут планироваться одновременно по всем run одного DAG. Используется, чтобы один DAG не занимал слишком много слотов от parallelism или пулов. По умолчанию 16. Если вы увеличили ресурсы (например, Celery workers или ресурсы Kubernetes), а задачи всё равно не запускаются как ожидается, возможно, нужно увеличить и parallelism, и max_active_tasks_per_dag.
  • parallelism: Максимальное число задач в состоянии running или queued на один планировщик в окружении. Например, при 32 и двух планировщиках не более 64 задач суммарно по всем DAG. Если задачи долго остаются в scheduled, стоит увеличить значение. По умолчанию 32. На Astro значение выставляется автоматически по максимальному числу воркеров, настраивать вручную не обязательно.

Scheduler settings

Параметры планировщика задают, как планировщик парсит DAG-файлы и создаёт DAG run. Переменные окружения: AIRFLOW__SCHEDULER__PARAMETER_NAME.

  • dag_dir_list_interval: Как часто (в секундах) сканируется каталог DAG в поисках новых файлов. Меньше значение — быстрее появление новых DAG, выше нагрузка на CPU. По умолчанию 300 (5 минут). Полезно знать время парсинга DAG (dag_processing.total_parse_time), чтобы выбрать min_file_process_interval и dag_dir_list_interval. Если dag_dir_list_interval меньше времени парсинга каждого DAG, возможны проблемы с производительностью. На Astro при менее чем 200 DAG в Deployment можно задать AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL=30 (30 секунд) как переменную окружения на уровне Deployment.
  • min_file_process_interval: Как часто (в секундах) парсится каждый DAG-файл. Изменения в DAG подхватываются после этого интервала. Низкое значение увеличивает нагрузку на CPU планировщика. При сложном коде и динамических DAG можно увеличить значение для улучшения производительности. По умолчанию 30 секунд.
  • parsing_processes (ранее max_threads): Сколько процессов планировщик может запускать параллельно для парсинга DAG. Astronomer рекомендует значение примерно в два раза больше числа доступных vCPU. Увеличение может ускорить сериализацию большого числа DAG. При нескольких планировщиках значение применяется к каждому. В таблице ниже — число процессов парсинга по умолчанию для размеров Astro Hosted deployment:
Small Medium Large Extra Large
2 2 6 12 (на два DAG Processor)
  • max_tis_per_query: Размер батча запросов к метасторе в основном цикле планирования. Большее значение — больше task instance обрабатывается за запрос, но запрос может стать слишком тяжёлым. По умолчанию 16. Значение scheduler.max_tis_per_query должно быть меньше core.parallelism.
  • max_dagruns_to_create_per_loop: Максимальное число DAG run, создаваемых за один цикл планировщика. Уменьшение освобождает ресурсы для планирования задач. По умолчанию 10.
  • scheduler_heartbeat_sec: Как часто (в секундах) планировщик выполняет цикл и запускает новые задачи. По умолчанию 5 секунд.
  • file_parsing_sort_mode: Как планировщик перечисляет и сортирует DAG-файлы для порядка парсинга. Варианты: modified_time, random_seeded_by_host, alphabetical. По умолчанию modified_time.

Настройки уровня DAG

Настройки уровня DAG действуют только на конкретный DAG и задаются в коде DAG. Их стоит менять, когда нужно настроить производительность одного DAG, особенно если он обращается к внешней системе (API, БД), которую не стоит нагружать слишком часто. Если параметр задан и на уровне DAG, и на уровне окружения, используется значение уровня DAG.

Три основных параметра уровня DAG, которые можно задать в коде:

  • concurrency: Максимальное число экземпляров задач, которые могут выполняться одновременно по всем активным DAG run данного DAG. Позволяет одному DAG запускать, например, 32 задачи одновременно, а другому — 16. Если не задано, используется значение уровня окружения max_active_tasks_per_dag.
  • max_active_tasks: Общее число задач, которые могут выполняться одновременно по всем DAG run этого DAG. По сути задаёт параллелизм по всем run данного DAG. Если не задано, используется значение уровня окружения max_active_tasks_per_dag.
  • max_active_runs: Максимальное число активных DAG run для этого DAG. При превышении лимита планировщик не создаёт новые активные DAG run. Если не задано, используется значение уровня окружения max_active_runs_per_dag. При использовании catchup или backfill лучше задать этот параметр явно, чтобы не запустить слишком много DAG run.

Параметры уровня DAG задаются в определении DAG. Пример:

# Максимум 10 одновременных задач при максимум 3 активных DAG run
@dag("my_dag_id", concurrency=10, max_active_runs=3)
def my_dag():
    ...
# Максимум 10 одновременных задач при максимум 3 активных DAG run
with DAG("my_dag_id", concurrency=10, max_active_runs=3):
    ...

Настройки уровня задачи

Настройки уровня задачи задаются в операторах задач. Их стоит менять, когда проблемы с производительностью создают определённые типы задач.

Три основных параметра уровня задачи:

  • pool: Пул, доступный для задачи. Пул ограничивает число одновременных экземпляров произвольной группы задач. Полезно при многих воркерах или параллельных DAG run, когда нужно не превысить лимит API или не перегрузить источник/приёмник данных.
  • max_active_tis_per_dagrun: Максимальное число одновременных выполнений этой задачи в рамках одного DAG run. Не ограничивает параллелизм по разным DAG run, только внутри одного. На одиночные задачи не влияет (в одном DAG run всегда один экземпляр задачи), но позволяет ограничить параллелизм динамически маппленных задач в рамках DAG run.
  • max_active_tis_per_dag (ранее task_concurrency): Максимальное число одновременных выполнений этой задачи по всем DAG run. Например, если задача обращается к внешнему ресурсу (таблица и т.п.), который не должен меняться несколькими задачами сразу, можно задать значение 1.

Эти параметры наследуются от BaseOperator, их можно задавать в любом операторе. Пример:

@task(
    pool="my_custom_pool",
    max_active_tis_per_dag=14
)
def t1():
    pass
def t1_func():
    pass

t1 = PythonOperator(
    task_id="t1",
    python_callable=t1_func,
    pool="my_custom_pool",
    max_active_tis_per_dag=14
)

Исполнители и масштабирование

В зависимости от выбранного executor при масштабировании стоит учитывать дополнительные настройки.

Celery executor

Celery executor использует постоянно работающих воркеров для выполнения задач. Масштабирование с Celery — это выбор и числа, и размера воркеров. Чем больше воркеров или чем они мощнее, тем больше задач можно выполнять параллельно.

Можно настроить worker_concurrency (переменная окружения AIRFLOW__CELERY__WORKER_CONCURRENCY) — сколько задач один Celery worker может выполнять одновременно. По умолчанию не более 16 задач на воркер. При увеличении worker_concurrency может потребоваться больше CPU и/или памяти для воркеров.

Kubernetes executor

Kubernetes executor создаёт отдельный Pod в кластере Kubernetes для каждой задачи. Так как каждая задача выполняется в своём поде, ресурсы можно задавать на уровне отдельной задачи.

При настройке производительности с Kubernetes executor важно учитывать ресурсы кластера. Часто включают автомасштабирование кластера, чтобы использовать эластичность Kubernetes.

Можно настроить worker_pods_creation_batch_size (переменная AIRFLOW__KUBERNETES__WORKER_PODS_CREATION_BATCH_SIZE) — сколько подов создаётся за один цикл планировщика. По умолчанию 1; для лучшей производительности, особенно при многих параллельных задачах, значение стоит увеличить. Верхняя граница зависит от возможностей кластера Kubernetes.

Типичные проблемы при масштабировании

Масштабирование Airflow — скорее искусство, чем точная наука, и сильно зависит от инфраструктуры и ваших DAG. Ниже — частые проблемы:

  • У одного DAG задачи не идут параллельно, у остальных DAG всё в порядке. Возможное ограничение на уровне DAG. Проверьте max_active_tasks_per_dag, пулы (если используются) и общий parallelism.
  • DAG остаются в queued и не запускаются. Число планируемых задач может превышать возможности инфраструктуры. При Kubernetes executor проверьте свободные ресурсы в namespace и возможность увеличить worker_pods_creation_batch_size. При Celery executor — возможность увеличить worker_concurrency.
  • Высокая задержка планирования задач. Планировщику может не хватать ресурсов на парсинг DAG и последующее планирование. Измените worker_concurrency (при Celery) или parallelism.

При проблемах с масштабированием можно обратиться в Apache Airflow Slack или в Astronomer support.


← Исполнители | К содержанию