Группы задач (Task groups)
Группы задач в Airflow позволяют объединять задачи в группы внутри DAG. С их помощью можно:
- Превращать повторяющиеся паттерны задач в переиспользуемые модули для разных DAG или инстансов Airflow.
- Динамически маппить группы задач и строить сложные динамические сценарии.
- Задавать default_args для набора задач (вместо уровня всего DAG), см. Параметры DAG.
- Упорядочивать сложные DAG: в Grid View задачи визуально объединяются в группы.
В этом руководстве — как создавать и использовать группы задач. Примеры DAG с группами: Astronomer GitHub.
Необходимая база
Полезно понимать:
- Операторы Airflow. См. Что такое оператор?.
Когда использовать группы задач
Группы задач чаще всего используют для визуальной организации сложных DAG. Например:
- Вход неизвестной длины (например, неизвестное число файлов в каталоге). Группы задач позволяют динамически маппить по входу и создавать группу действий для каждого файла. Это единственный способ динамически маппить последовательность задач в Airflow.
- Один и тот же паттерн в нескольких DAG — группа как переиспользуемый модуль.
- DAG с несколькими командами — группы визуально разделяют задачи по командам. Иногда лучше разнести логику по разным DAG и связать их через Assets.
- MLOps DAG — отдельная группа на каждую обучаемую модель.
- Крупные ELT/ETL DAG — группа на каждую таблицу или схему.
Определение групп задач
Группы можно задавать двумя способами:
- Декоратор
@task_groupна Python-функции. - Класс
TaskGroupкак контекстный менеджер.
В большинстве случаев выбор — вопрос предпочтений. Исключение — динамический маппинг по группе: он возможен только с @task_group.
Ниже — простая группа из двух последовательных задач. Операторы зависимостей (<< и >>) работают внутри групп и между группами так же, как для отдельных задач.
# from airflow.decorators import task_group
t0 = EmptyOperator(task_id='start')
@task_group(group_id='my_task_group')
def tg1():
t1 = EmptyOperator(task_id='task_1')
t2 = EmptyOperator(task_id='task_2')
t1 >> t2
t3 = EmptyOperator(task_id='end')
t0 >> tg1() >> t3
# from airflow.utils.task_group import TaskGroup
t0 = EmptyOperator(task_id='start')
with TaskGroup(group_id='my_task_group') as tg1:
t1 = EmptyOperator(task_id='task_1')
t2 = EmptyOperator(task_id='task_2')
t1 >> t2
t3 = EmptyOperator(task_id='end')
t0 >> tg1 >> t3
Группы отображаются и в Grid, и в Graph DAG.
Параметры группы задач {#task-group-parameters}
Группу можно настроить параметрами. Основные: group_id (имя группы) и default_args (передаются всем задачам в группе). Пример с часто используемыми параметрами:
@task_group(
group_id="task_group_1",
default_args={"conn_id": "postgres_default"},
tooltip="This task group is very important!",
prefix_group_id=True,
# parent_group=None,
# dag=None,
)
def tg1():
t1 = EmptyOperator(task_id="t1")
tg1()
with TaskGroup(
group_id="task_group_2",
default_args={"conn_id": "postgres_default"},
tooltip="This task group is also very important!",
prefix_group_id=True,
# parent_group=None,
# dag=None,
# add_suffix_on_collision=True,
) as tg2:
t1 = EmptyOperator(task_id="t1")
task_id в группах задач
Если задача входит в группу, её полный task_id — group_id.task_id. Так сохраняется уникальность task_id в рамках DAG. Этот формат нужно использовать при работе с XCom и ветвлением. Отключить префикс можно параметром группы prefix_group_id=False.
Например, у задачи task_1 в следующем DAG полный task_id — my_outer_task_group.my_inner_task_group.task_1.
@task_group(group_id="my_outer_task_group")
def my_outer_task_group():
@task_group(group_id="my_inner_task_group")
def my_inner_task_group():
EmptyOperator(task_id="task_1")
my_inner_task_group()
my_outer_task_group()
with TaskGroup(group_id="my_outer_task_group") as tg1:
with TaskGroup(group_id="my_inner_task_group") as tg2:
EmptyOperator(task_id="task_1")
Передача данных через группы задач
С декоратором @task_group данные через группу передаются так же, как с обычными декораторами @task:
from airflow.decorators import dag, task, task_group
from pendulum import datetime
import json
@dag(start_date=datetime(2023, 8, 1), schedule=None, catchup=False)
def task_group_example():
@task
def extract_data():
data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
order_data_dict = json.loads(data_string)
return order_data_dict
@task
def transform_sum(order_data_dict: dict):
total_order_value = 0
for value in order_data_dict.values():
total_order_value += value
return {"total_order_value": total_order_value}
@task
def transform_avg(order_data_dict: dict):
total_order_value = 0
for value in order_data_dict.values():
total_order_value += value
avg_order_value = total_order_value / len(order_data_dict)
return {"avg_order_value": avg_order_value}
@task_group
def transform_values(order_data_dict):
return {
"avg": transform_avg(order_data_dict),
"total": transform_sum(order_data_dict),
}
@task
def load(order_values: dict):
print(
f"""Total order value is: {order_values['total']['total_order_value']:.2f}
and average order value is: {order_values['avg']['avg_order_value']:.2f}"""
)
load(transform_values(extract_data()))
task_group_example()
При передаче данных в группу и из группы важно:
- Если группа возвращает значение, которое принимает другая задача на вход, TaskFlow API сам выводит зависимости. Если выход группы никуда не передаётся, зависимости нужно задать явно операторами
<<или>>. - Если нижестоящим задачам нужен вывод задач из группы, функция группы должна возвращать результат. В примере выше возвращается словарь с двумя значениями (по одному от каждой задачи группы), он передаётся в задачу
load().
Динамическое создание групп задач в рантайме {#generate-task-groups-dynamically-at-runtime}
Динамический маппинг можно сочетать с декоратором @task_group: группа маппится по разным значениям параметра. Пример — группа с разными входами для параметра:
from airflow.decorators import dag, task_group, task
from pendulum import datetime
@dag(
start_date=datetime(2022, 12, 1),
schedule=None,
catchup=False,
)
def task_group_mapping_example():
@task_group(group_id="group1")
def tg1(my_num):
@task
def print_num(num):
return num
@task
def add_42(num):
return num + 42
print_num(my_num) >> add_42(my_num)
@task
def pull_xcom(**context):
pulled_xcom = context["ti"].xcom_pull(
task_ids=["group1.add_42"],
map_indexes=[2, 3],
key="return_value",
)
print(pulled_xcom)
tg1_object = tg1.expand(my_num=[19, 23, 42, 8, 7, 108])
tg1_object >> pull_xcom()
task_group_mapping_example()
В этом DAG группа group1 маппится по разным значениям параметра my_num. Создаётся 6 экземпляров группы, в каждом две задачи выполняются со своим значением my_num. Задача pull_xcom() ниже по потоку показывает, как получить нужные значения XCom из списка маппленных экземпляров группы (через map_indexes).
Подробнее о динамическом маппинге, в том числе по нескольким параметрам: Динамические задачи.
Порядок групп задач
При создании групп в цикле по умолчанию они идут параллельно. Если одна группа зависит от результатов другой, их нужно выполнять последовательно. Например, при загрузке таблиц с внешними ключами сначала должны существовать записи в основной таблице.
В примере ниже третья группа (третья итерация цикла) зависит от двух предыдущих, поэтому её нужно выполнять последней. Группы складываются в список, по нему задаются зависимости:
groups = []
for g_id in range(1,4):
tg_id = f"group{g_id}"
@task_group(group_id=tg_id)
def tg1():
t1 = EmptyOperator(task_id="task1")
t2 = EmptyOperator(task_id="task2")
t1 >> t2
if tg_id == "group1":
t3 = EmptyOperator(task_id="task3")
t1 >> t3
groups.append(tg1())
[groups[0] , groups[1]] >> groups[2]
groups = []
for g_id in range(1,4):
tg_id = f"group{g_id}"
with TaskGroup(group_id=tg_id) as tg1:
t1 = EmptyOperator(task_id="task1")
t2 = EmptyOperator(task_id="task2")
t1 >> t2
if tg_id == "group1":
t3 = EmptyOperator(task_id="task3")
t1 >> t3
groups.append(tg1)
[groups[0] , groups[1]] >> groups[2]
В примере также показано, как добавить дополнительную задачу в group1 в зависимости от group_id: даже при создании групп в цикле по одному паттерну можно вносить отличия без дублирования кода.
Вложенные группы задач
Группы можно вкладывать друг в друга: группа объявляется внутри другой. Уровней вложенности может быть любое количество.
groups = []
for g_id in range(1,3):
@task_group(group_id=f"group{g_id}")
def tg1():
t1 = EmptyOperator(task_id="task1")
t2 = EmptyOperator(task_id="task2")
sub_groups = []
for s_id in range(1,3):
@task_group(group_id=f"sub_group{s_id}")
def tg2():
st1 = EmptyOperator(task_id="task1")
st2 = EmptyOperator(task_id="task2")
st1 >> st2
sub_groups.append(tg2())
t1 >> sub_groups >> t2
groups.append(tg1())
groups[0] >> groups[1]
groups = []
for g_id in range(1,3):
with TaskGroup(group_id=f"group{g_id}") as tg1:
t1 = EmptyOperator(task_id="task1")
t2 = EmptyOperator(task_id="task2")
sub_groups = []
for s_id in range(1,3):
with TaskGroup(group_id=f"sub_group{s_id}") as tg2:
st1 = EmptyOperator(task_id="task1")
st2 = EmptyOperator(task_id="task2")
st1 >> st2
sub_groups.append(tg2)
t1 >> sub_groups >> t2
groups.append(tg1)
groups[0] >> groups[1]
Кастомные классы групп задач
Если один и тот же паттерн задач повторяется в нескольких DAG или инстансах Airflow, удобно вынести его в отдельный класс-модуль. Нужно унаследовать TaskGroup и описать задачи внутри класса. Задачи привязываются к группе через self. В остальном задачи задаются так же, как в файле DAG.
from airflow.utils.task_group import TaskGroup
from airflow.decorators import task
class MyCustomMathTaskGroup(TaskGroup):
"""A task group summing two numbers and multiplying the result with 23."""
def __init__(self, group_id, num1=0, num2=0, tooltip="Math!", **kwargs):
"""Instantiate a MyCustomMathTaskGroup."""
super().__init__(group_id=group_id, tooltip=tooltip, **kwargs)
@task(task_group=self)
def task_1(num1, num2):
"""Adds two numbers."""
return num1 + num2
@task(task_group=self)
def task_2(num):
"""Multiplies a number by 23."""
return num * 23
task_2(task_1(num1, num2))
В DAG импортируется класс и создаётся экземпляр с нужными аргументами:
from airflow.decorators import dag, task
from pendulum import datetime
from include.custom_task_group import MyCustomMathTaskGroup
@dag(
start_date=datetime(2023, 8, 1),
schedule=None,
catchup=False,
tags=["@task_group", "task_group"],
)
def custom_tg():
@task
def get_num_1():
return 5
tg1 = MyCustomMathTaskGroup(group_id="my_task_group", num1=get_num_1(), num2=19)
@task
def downstream_task():
return "hello"
tg1 >> downstream_task()
custom_tg()
Такую группу можно переиспользовать в других DAG с разными значениями num1 и num2.