Перейти к содержанию

Планирование DAG в Apache Airflow®

Одна из базовых возможностей Apache Airflow® — планирование DAG. В Airflow доступно много вариантов: от простых расписаний по cron до data-aware scheduling с ассетами и event-driven планирования по сообщениям в очереди.

В этом руководстве вы узнаете:

  • Какие варианты планирования DAG доступны.
  • Какие параметры DAG управляют планированием.
  • Как интерпретировать метки времени, связанные с DAG run.

Инфо. Это руководство даёт обзор вариантов планирования. Подробнее по отдельным типам: - Повторный запуск DAG и задач (включая backfill) - Event-driven планирование - Ассеты и data-aware планирование

Необходимая база

Чтобы получить максимум от руководства, нужно понимать:

Метки времени DAG run

DAG run — это один запуск DAG, привязанный к моменту времени. На странице деталей DAG run отображаются разные метки времени.

  • Queued at: момент постановки первой task instance этого DAG run в очередь.
  • Last Scheduling Decision: последний момент, когда планировщик пытался запланировать task instances для этого DAG run.
  • Run ID: уникальный идентификатор DAG run. Run ID складывается из типа запуска (например, scheduled) и логической даты. Если логическая дата None, используется run after date с добавлением случайного суффикса для уникальности. Run ID используется для идентификации DAG run в метаданных Airflow.
  • Duration и Run Duration: время выполнения DAG run, разница между временем начала и окончания.
  • End и End Date: момент окончания DAG run. Эта метка не связана с параметром DAG end_date.
  • Start и Start Date: момент фактического начала DAG run. Эта метка не связана с параметром DAG start_date.
  • Run after: момент времени, после которого этот DAG run может быть запущен. Если задана логическая дата, run after совпадает с ней. Если логическая дата None, run after устанавливается в текущее время в момент триггера DAG run.
  • Logical Date: момент времени, после которого этот DAG run может быть запущен. В Airflow UI эта метка отображается как основная дата DAG run. Логическую дату можно явно задать как None при запуске DAG через REST API или UI.

Две дополнительные метки имеют смысл только при использовании CronDataIntervalTimetable:

  • Data Interval End: при использовании CronDataIntervalTimetable data interval end DAG run совпадает с run after текущего запланированного DAG run. Если логическая дата None, data interval end тоже None.
  • Data Interval Start: при использовании CronDataIntervalTimetable data interval start DAG run совпадает с run after предыдущего запланированного DAG run того же DAG. При других расписаниях data interval start совпадает с run after текущего DAG run. Если DAG run запущен с логической датой None, data interval start тоже None.

Подробнее об интервалах данных и отличиях CronDataIntervalTimetable от CronTriggerTimetable: документация Airflow.

Параметры DAG для планирования

Следующие параметры задают, когда DAG будет выполняться:

  • catchup: булево значение; нужно ли автоматически создавать все запуски между start_date и текущей датой. По умолчанию False. Помимо этого можно вручную запускать DAG за любую дату в прошлом. См. Backfill.
  • end_date: дата, после которой DAG больше не планируется. По умолчанию None.
  • schedule: правила, по которым создаются DAG run. Принимает cron-выражения, объекты timedelta, timetables и списки ассетов. По умолчанию None.
  • start_date: момент времени, после которого DAG может запускаться. При использовании CronDataIntervalTimetable start_date — момент, после которого может начаться первый интервал данных. По умолчанию None.

В примере ниже DAG задан с start_date 1 апреля 2025, расписанием @daily и end_date 1 апреля 2026. DAG будет запускаться каждый день в полночь UTC с 1 апреля 2025 по 31 марта 2026. Пропущенные запуски автоматически не восполняются (catchup по умолчанию False).

from pendulum import datetime
from airflow.sdk import dag

@dag(
    start_date=datetime(2025, 4, 1),
    schedule="@daily",
    end_date=datetime(2026, 4, 1),
)
def my_dag():
    pass

my_dag()

Внимание. Не задавайте динамическое расписание (например, datetime.now())! Это приведёт к ошибке планировщика.

Временные расписания (time-based)

Для пайплайнов с простыми требованиями к расписанию параметр schedule DAG можно задать с помощью:

  • объекта timedelta;
  • cron-пресета;
  • cron-выражения.

Cron-выражения под капотом передаются в timetable. По умолчанию используется CronTriggerTimetable. Опция конфигурации [scheduler].create_cron_data_intervals переключает на CronDataIntervalTimetable (поведение в более ранних версиях Airflow). Подробнее: Timetable comparisons.

Cron-выражения

В параметр schedule DAG можно передать любое cron-выражение в виде строки. Например, для запуска каждый день в 4:05 утра: schedule='5 4 * * *'.

Подсказки по составлению cron-выражений: crontab guru.

Cron-пресеты

В Airflow можно использовать пресеты для типичных расписаний. Например, schedule='@hourly' запускает DAG в начале каждого часа. Полный список пресетов: Cron Presets.

Объекты timedelta

Если нужно запускать DAG с заданным интервалом (каждый час, каждые 5 минут и т.д.), а не в фиксированное время, в schedule можно передать объект timedelta из пакета datetime или duration из пакета pendulum. Например, schedule=timedelta(minutes=30) — каждые 30 минут, schedule=timedelta(days=1) — раз в день.

Ограничения расписаний на основе cron

Расписания на основе cron плохо подходят для нерегулярных временных правил, например:

  • несколько запусков в день с неравными интервалами (например, 13:00 и 16:30);
  • запуск каждый день кроме праздников;
  • разное время в разные дни (например, 14:00 по четвергам и 16:00 по субботам).

Такие расписания можно реализовать с помощью timetables.

Data-aware планирование (ассеты)

С помощью ассетов Airflow может учитывать обновления данных и планировать другие DAG при обновлении этих ассетов. Чтобы задать расписание по ассетам, передайте имя (имена) ассета в параметр schedule. Можно задавать условия по нескольким ассетам (OR/AND) и комбинировать расписание по ассетам с временным.

Список ассетов (условие AND) — DAG запускается, когда обновлены все указанные ассеты:

from airflow.sdk import Asset

my_asset_1 = Asset("my_asset_1")
my_asset_2 = Asset("my_asset_2")

@dag(
    schedule=[my_asset_1, my_asset_2],  # список задаёт условие AND
)
def my_dag():
    pass

my_dag()

Этот DAG запускается, когда и my_asset_1, и my_asset_2 обновлены хотя бы один раз.

Условие OR — DAG запускается при обновлении любого из ассетов:

from airflow.sdk import Asset

my_asset_1 = Asset("my_asset_1")
my_asset_2 = Asset("my_asset_2")

@dag(
    schedule=(my_asset_1 | my_asset_2),  # () вместо [] для условного расписания
)
def my_dag():
    pass

my_dag()

Этот DAG запускается, когда обновлён my_asset_1 или my_asset_2.

Комбинация с временем (AssetOrTimeSchedule) — DAG по расписанию и по ассетам:

from airflow.sdk import Asset
from airflow.timetables.assets import AssetOrTimeSchedule
from airflow.timetables.trigger import CronTriggerTimetable

my_asset_1 = Asset("my_asset_1")
my_asset_2 = Asset("my_asset_2")

@dag(
    schedule=AssetOrTimeSchedule(
        timetable=CronTriggerTimetable("0 0 * * *", timezone="UTC"),
        assets=(my_asset_1 | my_asset_2),
    ),
)
def my_dag():
    pass

my_dag()

Этот DAG запускается каждый день в полночь UTC и дополнительно при обновлении my_asset_1 или my_asset_2.

Ассеты могут обновляться задачами любого DAG в том же окружении Airflow, вызовами эндпоинта ассетов REST API или вручную в Airflow UI.

Подробнее об ассетах и data-driven планировании: Ассеты и data-aware планирование в Airflow. Ассеты можно комбинировать с AssetWatchers для event-driven расписаний. См. Event-Driven Scheduling.

Timetables

Каждое временное расписание в Airflow реализовано через timetable. Есть встроенные timetables, в том числе CronTriggerTimetable и CronDataIntervalTimetable. Если под вашу задачу нет готового timetable, можно реализовать свой.

Continuous timetable

DAG можно запускать непрерывно с заданным timetable. Чтобы использовать ContinuousTimetable, задайте расписание "@continuous" и max_active_runs=1.

from pendulum import datetime
from airflow.sdk import dag

@dag(
    start_date=datetime(2025, 4, 1),
    schedule="@continuous",
    max_active_runs=1,
)
def my_dag():
    pass

При таком расписании создаётся один непрерывный DAG run: следующий запуск начинается сразу после завершения предыдущего (успешного или нет). ContinuousTimetable особенно удобен при использовании сенсоров или deferrable-операторов для ожидания нерегулярных событий во внешних системах.

> Внимание. Airflow рассчитан на оркестрацию пайплайнов пакетами (batch), а не на стриминг и низкую задержку. Для запусков чаще чем раз в минуту лучше комбинировать Airflow с инструментами вроде Apache Kafka.

← DAG | К содержанию | Зависимости задач →