Зависимости между DAG (Cross-DAG dependencies)
Обычно связанные задачи лучше держать в одном DAG, но иногда нужны зависимости между разными DAG. В таком случае узлом графа становится целый DAG, а не одна задача. В руководстве используются термины:
- Downstream DAG (нижестоящий DAG): DAG, который не может запуститься, пока вышестоящий DAG не перейдёт в заданное состояние.
- Upstream DAG (вышестоящий DAG): DAG, который должен достичь заданного состояния, прежде чем сможет выполниться нижестоящий DAG.
Тема Cross-DAG Dependencies в документации Airflow описывает ситуации, когда такие зависимости полезны:
- Задача зависит от другой задачи, но с другой датой выполнения.
- Два DAG связаны по смыслу, но принадлежат разным командам.
- Два DAG связаны, но с разными расписаниями.
- DAG должен запускаться только после обновления одного или нескольких ассетов задачами из других DAG.
В этом руководстве — способы реализации зависимостей между DAG, в том числе когда зависимые DAG находятся в разных развёртываниях Airflow.
Необходимая база
Полезно понимать:
- Сенсоры Airflow. См. Что такое сенсор?.
- Операторы Airflow. См. Что такое оператор?.
- DAG в Airflow. См. Введение в DAG.
- Зависимости в Airflow. См. Управление зависимостями.
Реализация зависимостей между DAG
Зависимости между DAG в Airflow можно задать несколькими способами:
Ниже — когда и как использовать каждый способ и как смотреть зависимости в UI Airflow.
Зависимости через ассеты (Dataset dependencies)
Самый распространённый способ задать зависимости между DAG — ассеты. DAG, работающие с одними и теми же данными, могут иметь явные связи, а запуск DAG можно привязать к обновлению этих данных.
Этот способ подходит, когда нижестоящий DAG должен запускаться только после обновления ассета вышестоящим DAG, особенно при нерегулярных обновлениях. Зависимости между DAG и ассетами при этом хорошо видны в UI Airflow.
При работе с ассетами используются понятия:
- Consuming DAG (потребляющий DAG): DAG, который запускается сразу после обновления указанного ассета.
- Producing task (производящая задача): задача, обновляющая ассет; задаётся параметром
outlets.
Любая задача становится производящей, если в параметр outlets передать один или несколько ассетов. Пример:
from airflow.sdk import Asset
from airflow.providers.standard.operators.empty import EmptyOperator
asset1 = Asset("asset1")
# производящая задача в вышестоящем DAG
EmptyOperator(
task_id="producing_task",
outlets=[asset1], # помечает для Airflow, что asset1 обновлён
)
Нижестоящий DAG запускается после обновления asset1, если передать его в параметр schedule:
from airflow.sdk import Asset, dag
asset1 = Asset("asset1")
@dag(schedule=[asset1])
def consuming_dag():
...
Подробнее: Ассеты и data-aware планирование в Airflow.
TriggerDagRunOperator
TriggerDagRunOperator — простой способ задать зависимость «сверху вниз»: задача в одном DAG запускает другой DAG в том же окружении Airflow. Подробнее: TriggerDagRunOperator.
Запускать нижестоящий DAG можно из любой точки вышестоящего DAG. Если у оператора задать wait_for_completion=True, вышестоящий DAG приостанавливается и продолжается только после завершения нижестоящего. Ожидание можно переложить на triggerer, установив deferrable=True: оператор станет deferrable, что снижает нагрузку и может уменьшить затраты.
Типичный сценарий: вышестоящий DAG загружает тестовые данные для ML-пайплайна, обучает и тестирует модель, публикует предсказания. Если модель показывает слабый результат, TriggerDagRunOperator запускает отдельный DAG переобучения; вышестоящий DAG ждёт. После переобучения и проверки в нижестоящем DAG вышестоящий продолжается и публикует результаты новой модели.
Расписание нижестоящего DAG не связано с запусками через TriggerDagRunOperator. Чтобы DAG запускался только этим оператором, задайте ему schedule=None. Зависимый DAG должен быть включён (не на паузе).
В примере ниже TriggerDagRunOperator запускает DAG с dag_id dependent_dag между двумя другими задачами. У задачи trigger_dependent_dag заданы wait_for_completion=True и deferrable=True, поэтому задача откладывается до завершения dependent_dag. После завершения этой задачи выполняется end_task.
TaskFlow:
from airflow.decorators import dag, task
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from pendulum import datetime, duration
@task
def start_task(task_type):
return f"The {task_type} task has completed."
@task
def end_task(task_type):
return f"The {task_type} task has completed."
default_args = {
"owner": "airflow",
"depends_on_past": False,
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": duration(minutes=5),
}
@dag(
start_date=datetime(2023, 1, 1),
max_active_runs=1,
schedule="@daily",
default_args=default_args,
catchup=False,
)
def trigger_dagrun_dag():
trigger_dependent_dag = TriggerDagRunOperator(
task_id="trigger_dependent_dag",
trigger_dag_id="dependent_dag",
wait_for_completion=True,
deferrable=True, # параметр доступен в Airflow 2.6+
)
start_task("starting") >> trigger_dependent_dag >> end_task("ending")
trigger_dagrun_dag()
Традиционный вариант:
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from pendulum import datetime, duration
def print_task_type(**kwargs):
print(f"The {kwargs['task_type']} task has completed.")
default_args = {
"owner": "airflow",
"depends_on_past": False,
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": duration(minutes=5),
}
with DAG(
dag_id="trigger_dagrun_dag",
start_date=datetime(2023, 1, 1),
max_active_runs=1,
schedule="@daily",
default_args=default_args,
catchup=False,
) as dag:
start_task = PythonOperator(
task_id="start_task",
python_callable=print_task_type,
op_kwargs={"task_type": "starting"},
)
trigger_dependent_dag = TriggerDagRunOperator(
task_id="trigger_dependent_dag",
trigger_dag_id="dependent_dag",
wait_for_completion=True,
deferrable=True, # параметр доступен в Airflow 2.6+
)
end_task = PythonOperator(
task_id="end_task",
python_callable=print_task_type,
op_kwargs={"task_type": "ending"},
)
start_task >> trigger_dependent_dag >> end_task
Если нижестоящему DAG нужен конфиг или конкретная логическая дата, их можно передать параметрами conf и logical_date. Параметр skip_when_already_exists=True не даёт оператору пытаться запустить уже существующий DAG run (и падать при повторных запусках). В Airflow 3.1 параметр fail_when_dag_is_paused позволяет завершать задачу с ошибкой, если зависимый DAG на паузе.
ExternalTaskSensor
Чтобы задать зависимость со стороны нижестоящего DAG, используйте один или несколько ExternalTaskSensor. Нижестоящий DAG будет ждать завершения указанной задачи в вышестоящем DAG, после чего продолжит выполнение.
Такой способ удобен, когда у нижестоящего DAG несколько веток и каждая зависит от своей задачи в одном или нескольких вышестоящих DAG. Вместо зависимости целого DAG от целого DAG (как при ассетах) можно указать, что конкретная задача нижестоящего DAG ждёт завершения конкретной задачи вышестоящего.
Например: вышестоящие задачи обновляют разные таблицы в хранилище, а один нижестоящий DAG запускает по ветке проверок качества для каждой таблицы. В начале каждой ветки ставится ExternalTaskSensor, чтобы проверки по таблице стартовали только после обновления этой таблицы.
Рекомендуется использовать ExternalTaskSensor в deferrable-режиме (deferrable=True). Подробнее: Deferrable-операторы.
В примере ниже три ExternalTaskSensor в начале трёх параллельных веток одного DAG.
TaskFlow:
from airflow.decorators import dag, task
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.operators.empty import EmptyOperator
from pendulum import datetime, duration
@task
def downstream_function_branch_1():
print("Upstream DAG 1 has completed. Starting tasks of branch 1.")
@task
def downstream_function_branch_2():
print("Upstream DAG 2 has completed. Starting tasks of branch 2.")
@task
def downstream_function_branch_3():
print("Upstream DAG 3 has completed. Starting tasks of branch 3.")
default_args = {
"owner": "airflow",
"depends_on_past": False,
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": duration(minutes=5),
}
@dag(
start_date=datetime(2022, 8, 1),
max_active_runs=3,
schedule="*/1 * * * *",
catchup=False,
)
def external_task_sensor_taskflow_dag():
start = EmptyOperator(task_id="start")
end = EmptyOperator(task_id="end")
ets_branch_1 = ExternalTaskSensor(
task_id="ets_branch_1",
external_dag_id="upstream_dag_1",
external_task_id="my_task",
allowed_states=["success"],
failed_states=["failed", "skipped"],
)
task_branch_1 = downstream_function_branch_1()
ets_branch_2 = ExternalTaskSensor(
task_id="ets_branch_2",
external_dag_id="upstream_dag_2",
external_task_id="my_task",
allowed_states=["success"],
failed_states=["failed", "skipped"],
)
task_branch_2 = downstream_function_branch_2()
ets_branch_3 = ExternalTaskSensor(
task_id="ets_branch_3",
external_dag_id="upstream_dag_3",
external_task_id="my_task",
allowed_states=["success"],
failed_states=["failed", "skipped"],
)
task_branch_3 = downstream_function_branch_3()
start >> [ets_branch_1, ets_branch_2, ets_branch_3]
ets_branch_1 >> task_branch_1
ets_branch_2 >> task_branch_2
ets_branch_3 >> task_branch_3
[task_branch_1, task_branch_2, task_branch_3] >> end
external_task_sensor_taskflow_dag()
Традиционный вариант:
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.operators.empty import EmptyOperator
from pendulum import datetime, duration
def downstream_function_branch_1():
print("Upstream DAG 1 has completed. Starting tasks of branch 1.")
def downstream_function_branch_2():
print("Upstream DAG 2 has completed. Starting tasks of branch 2.")
def downstream_function_branch_3():
print("Upstream DAG 3 has completed. Starting tasks of branch 3.")
default_args = {
"owner": "airflow",
"depends_on_past": False,
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": duration(minutes=5),
}
with DAG(
"external-task-sensor-dag",
start_date=datetime(2022, 8, 1),
max_active_runs=3,
schedule="*/1 * * * *",
catchup=False,
) as dag:
start = EmptyOperator(task_id="start")
end = EmptyOperator(task_id="end")
ets_branch_1 = ExternalTaskSensor(
task_id="ets_branch_1",
external_dag_id="upstream_dag_1",
external_task_id="my_task",
allowed_states=["success"],
failed_states=["failed", "skipped"],
)
task_branch_1 = PythonOperator(
task_id="task_branch_1",
python_callable=downstream_function_branch_1,
)
ets_branch_2 = ExternalTaskSensor(
task_id="ets_branch_2",
external_dag_id="upstream_dag_2",
external_task_id="my_task",
allowed_states=["success"],
failed_states=["failed", "skipped"],
)
task_branch_2 = PythonOperator(
task_id="task_branch_2",
python_callable=downstream_function_branch_2,
)
ets_branch_3 = ExternalTaskSensor(
task_id="ets_branch_3",
external_dag_id="upstream_dag_3",
external_task_id="my_task",
allowed_states=["success"],
failed_states=["failed", "skipped"],
)
task_branch_3 = PythonOperator(
task_id="task_branch_3",
python_callable=downstream_function_branch_3,
)
start >> [ets_branch_1, ets_branch_2, ets_branch_3]
ets_branch_1 >> task_branch_1
ets_branch_2 >> task_branch_2
ets_branch_3 >> task_branch_3
[task_branch_1, task_branch_2, task_branch_3] >> end
В этом DAG:
ets_branch_3ждёт завершения задачиmy_taskвupstream_dag_3, затем выполняетсяtask_branch_3.ets_branch_2ждёт завершения задачиmy_taskвupstream_dag_2, затем выполняетсяtask_branch_2.ets_branch_1ждёт завершения задачиmy_taskвupstream_dag_1, затем выполняетсяtask_branch_1.
Эти ожидания идут параллельно и не зависят друг от друга. В графе видно состояние после завершения my_task в upstream_dag_1: выполнились ets_branch_1 и task_branch_1; ets_branch_2 и ets_branch_3 ещё ждут своих вышестоящих задач.
Чтобы нижестоящий DAG ждал завершения всего вышестоящего DAG, а не отдельной задачи, задайте external_task_id=None. В примере выше для срабатывания сенсора внешняя задача должна быть в состоянии success (параметры allowed_states и failed_states).
В примере вышестоящий DAG (example_dag) и нижестоящий (external-task-sensor-dag) должны иметь одинаковые start_date и интервал расписания: ExternalTaskSensor ищет завершение указанной задачи или DAG с тем же logical_date. Чтобы ждать завершения внешней задачи с другой датой, используйте параметры execution_delta или execution_date_fn (подробнее в документации по ссылке выше).
REST API Airflow
REST API Airflow тоже позволяет задавать зависимости между DAG. Чтобы запустить DAG run через API, отправьте POST-запрос на эндпоинт DAGRuns.
Пример скрипта запуска DAG через API (для cross-DAG зависимостей этот код можно выполнять внутри задачи с декоратором @task в вышестоящем DAG):
import requests
# Замените на параметры вашего инстанса Airflow
USERNAME = "admin"
PASSWORD = "admin"
HOST = "http://localhost:8080/"
MY_DAG = "example_dag" # id DAG, который нужно запустить
def get_jwt_token():
token_url = f"{HOST}/auth/token"
payload = {"username": USERNAME, "password": PASSWORD}
headers = {"Content-Type": "application/json"}
response = requests.post(token_url, json=payload, headers=headers)
token = response.json().get("access_token")
return token
def run_dag(dag_id, logical_date=None):
event_payload = {"conf": {"param1": "Hello World"}, "logical_date": logical_date}
token = get_jwt_token()
if token:
url = f"{HOST}/api/v2/dags/{dag_id}/dagRuns"
headers = {"Authorization": f"Bearer {token}"}
response = requests.post(url, json=event_payload, headers=headers)
print(response.status_code)
print(response.json())
else:
raise Exception("Failed to get JWT token")
if __name__ == "__main__":
run_dag(dag_id=MY_DAG, logical_date=None)
Обновить ассет через API можно POST-запросом на эндпоинт Assets.
Зависимости между развёртываниями
Чтобы реализовать зависимости между DAG в двух разных окружениях Airflow на Astro, см. Cross-deployment dependencies.