Динамические задачи (Dynamic task mapping)
При dynamic task mapping (динамическом маппинге задач) можно писать DAG, которые в рантайме создают параллельные задачи. Это меняет подход к проектированию DAG: задачи создаются по текущему окружению без правок кода DAG.
В этом руководстве — что такое динамический маппинг и полный пример для типичного сценария.
Необходимая база
Полезно понимать:
- XCom в Airflow. См. Передача данных между задачами.
- Декораторы для определения задач. См. Декораторы Airflow.
- Операторы Airflow. См. Что такое оператор?.
Концепции динамического маппинга
Динамический маппинг в Airflow основан на модели MapReduce. Для каждого входного элемента создаётся одна задача (map). Опциональная фаза reduce — одна задача обрабатывает собранный вывод маппленных задач. То есть DAG может создать произвольное число параллельных задач в рантайме по входному параметру (map) и при необходимости одну задачу ниже, зависящую от их вывода (reduce).
У задачи Airflow есть две функции для фазы map:
partial(): передаёт параметры, одинаковые для всех маппленных задач, создаваемыхexpand().expand(): передаёт параметры, по которым идёт маппинг; для каждого элемента создаётся отдельная параллельная задача. В части случаев при маппинге по нескольким параметрам вместо этого используется.expand_kwargs().
В примере ниже задача использует и .partial(), и .expand() и создаёт три запуска задачи:
from airflow.sdk import task
@task
def add(x: int, y: int):
return x + y
added_values = add.partial(y=10).expand(x=[1, 2, 3])
from airflow.providers.standard.operators.python import PythonOperator
def add_function(x: int, y: int):
return x + y
added_values = PythonOperator.partial(
task_id="add",
python_callable=add_function,
op_kwargs={"y": 10},
).expand(op_args=[[1], [2], [3]])
Функция expand создаёт три маппленные задачи add — по одной на каждый элемент списка x. Функция partial задаёт значение y, общее для всех задач.
При работе с маппленными задачами учитывайте:
- XCom маппленных экземпляров хранятся в виде списка; доступ по map index. Например, XCom третьего экземпляра (map index 2) задачи
my_mapped_task:ti.xcom_pull(task_ids=['my_mapped_task'])[2]илиti.xcom_pull(task_ids=['my_mapped_task'], map_indexes=[2]). - Ограничить число параллельно выполняемых маппленных экземпляров можно параметрами задачи: max_active_tis_per_dag (по всем DAG run) и max_active_tis_per_dagrun (в рамках одного DAG run).
- Максимальное число маппленных экземпляров задаётся конфигом max_map_length (по умолчанию 1024).
- expand() принимает только keyword arguments.
- Некоторые параметры маппить нельзя (например,
task_id,poolи многие аргументы BaseOperator). - Маппленная задача может не создать ни одного экземпляра (например, если вышестоящая задача вернула пустой список). Тогда маппленная задача помечается skipped; нижестоящие выполняются согласно trigger rules (по умолчанию тоже skipped).
- Результат маппленной задачи можно передать в следующую маппленную задачу.
- Можно маппить по нескольким параметрам.
- Результат вышестоящей задачи можно использовать как вход маппленной задачи. Вышестоящая задача должна вернуть значение в виде
dictилиlist. При традиционных операторах (не декорированных задачах) значения для маппинга должны быть в XCom.
Дополнительные примеры: Dynamic Task Mapping в документации Airflow.
В UI маппленные задачи отображаются в Grid View: в графе DAG run у задачи в скобках [ ] указано число созданных экземпляров. Логи и XCom каждого экземпляра открываются по клику на соответствующий квадрат в сетке.
Маппинг по результату другого оператора
Входные данные для маппленной нижестоящей задачи можно брать из вывода вышестоящего оператора.
Ниже — как передавать данные маппинга в каждом из вариантов:
- Традиционный → традиционный: обе задачи — традиционные операторы.
- Традиционный → TaskFlow: вышестоящая — TaskFlow, нижестоящая — традиционный оператор.
- TaskFlow → традиционный: вышестоящая — традиционный оператор, нижестоящая — TaskFlow.
- TaskFlow → TaskFlow: обе задачи — TaskFlow API.
TaskFlow → TaskFlow: вызов вышестоящей задачи передаётся аргументом в expand():
from airflow.sdk import task
@task
def one_two_three_TF():
return [1, 2, 3]
@task
def plus_10_TF(x):
return x + 10
plus_10_TF.partial().expand(x=one_two_three_TF())
Традиционный оператор: в expand() передаётся атрибут .output объекта задачи:
from airflow.sdk import task
from airflow.providers.standard.operators.python import PythonOperator
def one_two_three_traditional():
return [1, 2, 3]
@task
def plus_10_TF(x):
return x + 10
one_two_three_task = PythonOperator(
task_id="one_two_three_task", python_callable=one_two_three_traditional
)
plus_10_TF.partial().expand(x=one_two_three_task.output)
TaskFlow → традиционный PythonOperator: вывод нужно привести к формату, который принимает op_args у PythonOperator:
from airflow.sdk import task
from airflow.providers.standard.operators.python import PythonOperator
@task
def one_two_three_TF():
# op_args ожидает каждый аргумент в виде списка
return [[1], [2], [3]]
def plus_10_traditional(x):
return x + 10
plus_10_task = PythonOperator.partial(
task_id="plus_10_task", python_callable=plus_10_traditional
).expand(op_args=one_two_three_TF())
Традиционный → традиционный: используйте .output вышестоящей задачи; формат возвращаемого значения должен соответствовать op_args:
from airflow.providers.standard.operators.python import PythonOperator
def one_two_three_traditional():
return [[1], [2], [3]]
def plus_10_traditional(x):
return x + 10
one_two_three_task = PythonOperator(
task_id="one_two_three_task", python_callable=one_two_three_traditional
)
plus_10_task = PythonOperator.partial(
task_id="plus_10_task", python_callable=plus_10_traditional
).expand(op_args=one_two_three_task.output)
one_two_three_task >> plus_10_task
Маппинг по объединённым результатам вышестоящих задач
Списки результатов вышестоящих задач можно объединить методом .concat(). Раньше для этого нужна была промежуточная задача.
В примере ниже для задачи map_me создаётся 7 маппленных экземпляров:
from airflow.sdk import task
import time
@task
def t1():
return [1, 2, 3]
t1_obj = t1()
@task
def t2():
return [4, 5, 6, 7]
t2_obj = t2()
@task
def map_me(input):
print(f"Sleeping for {input} seconds!")
time.sleep(input)
print("Waking up!")
map_me.expand(input=t1_obj.concat(t2_obj))
from airflow.providers.standard.operators.python import PythonOperator
import time
def t1_func():
return [[1], [2], [3]]
def t2_func():
return [[4], [5], [6], [7]]
def map_me_func(input):
print(f"Sleeping for {input} seconds!")
time.sleep(input)
print("Waking up!")
t1 = PythonOperator(task_id="t1", python_callable=t1_func)
t2 = PythonOperator(task_id="t2", python_callable=t2_func)
map_me = PythonOperator.partial(
task_id="map_me", python_callable=map_me_func
).expand(op_args=t1.output.concat(t2.output))
Маппинг по нескольким параметрам {#mapping-over-multiple-parameters} {#mapping-over-multiple-parameters}
Доступны три способа маппить по нескольким параметрам:
- Zip: маппинг по наборам позиционных аргументов (Python
zip()или.zip()у XComArg) — один маппленный экземпляр на каждый набор. Используется expand(). - Наборы keyword arguments: маппинг по двум и более наборам kwargs — один экземпляр на каждый набор, а не на каждую комбинацию. Используется expand_kwargs().
- Декартово произведение (cross-product): маппинг по двум и более keyword arguments — один экземпляр на каждую комбинацию входов. Используется expand().
Декартово произведение (cross-product)
По умолчанию expand() создаёт маппленный экземпляр для каждой комбинации всех переданных входов. Например, при двух значениях первого аргумента, четырёх второго и пяти третьего получится 2×4×5=40 экземпляров. Типичный сценарий — перебор гиперпараметров модели.
В примере ниже маппинг по трём вариантам bash_command и трём вариантам env даёт 3×3=9 экземпляров. Каждая команда выполняется с каждым значением переменной окружения WORD:
from airflow.providers.standard.operators.bash import BashOperator
cross_product_example = BashOperator.partial(
task_id="cross_product_example"
).expand(
bash_command=[
"echo $WORD",
"echo `expr length $WORD`",
"echo \\${WORD//e/X}"
],
env=[
{"WORD": "hello"},
{"WORD": "tea"},
{"WORD": "goodbye"}
]
)
Девять экземпляров задачи cross_product_example дают все комбинации команды и переменной: map index 0 — hello, 1 — tea, 2 — goodbye, 3 — 5, 4 — 3, 5 — 7, 6 — hXllo, 7 — tXa, 8 — goodbyX.
Наборы keyword arguments
Чтобы маппить по наборам значений для двух и более keyword arguments, используется expand_kwargs(). Наборы задаются списком словарей или XComArg. В примере ниже оператор получает 3 набора параметров — 3 маппленных экземпляра:
from airflow.providers.standard.operators.bash import BashOperator
t1 = BashOperator.partial(task_id="t1").expand_kwargs(
[
{"bash_command": "echo $WORD", "env": {"WORD": "hello"}},
{"bash_command": "echo `expr length $WORD`", "env": {"WORD": "tea"}},
{"bash_command": "echo \\${WORD//e/X}", "env": {"WORD": "goodbye"}}
]
)
У задачи t1 три маппленных экземпляра с выводами: map index 0 — hello, 1 — 3, 2 — goodbyX.
Zip
В маппинге можно передавать наборы позиционных аргументов в один и тот же keyword argument (например, op_args у PythonOperator). Если данные — итерируемые (кортежи, словари, списки), можно использовать встроенную функцию Python zip(). Если данные из XCom, используется метод .zip() у XComArg.
Встроенный Python zip()
zip() принимает произвольное число итерируемых и строит кортежи из их элементов; число кортежей равно длине кратчайшей итерируемой. Примеры:
zip(["a", "b"], ["hi", "bye"], (19, 23))→('a', 'hi', 19), ('b', 'bye', 23).zip(["a", "b"], [1], ["hi", "bye"], [19, 23], ["x", "y", "z"])→ один кортеж("a", 1, "hi", 19, "x")(кратчайший список — один элемент).zip(["a", "b", "c"], [1, 2, 3], ["hi", "bye", "tea"])→("a", 1, "hi"), ("b", 2, "bye"), ("c", 3, "tea").
Пример: список из zip передаётся в expand() для маппинга по наборам позиционных аргументов. В TaskFlow каждый набор передаётся в аргумент zipped_x_y_z; в PythonOperator каждый набор распаковывается в op_args и попадает в x, y, z:
from airflow.sdk import task
zipped_arguments = list(zip([1, 2, 3], [10, 20, 30], [100, 200, 300]))
# zipped_arguments: [(1,10,100), (2,20,200), (3,30,300)]
@task
def add_numbers(zipped_x_y_z):
return zipped_x_y_z[0] + zipped_x_y_z[1] + zipped_x_y_z[2]
add_numbers.expand(zipped_x_y_z=zipped_arguments)
from airflow.providers.standard.operators.python import PythonOperator
zipped_arguments = list(zip([1, 2, 3], [10, 20, 30], [100, 200, 300]))
def add_numbers_function(x, y, z):
return x + y + z
add_numbers = PythonOperator.partial(
task_id="add_numbers",
python_callable=add_numbers_function,
).expand(op_args=zipped_arguments)
Три экземпляра задачи add_numbers: map index 0 — 111, 1 — 222, 2 — 333.
XComArg.zip()
Можно объединять через .zip() и XComArg. Для TaskFlow передаётся вызов задачи, для традиционного оператора — task_object.output или XcomArg(task_object). Опциональный аргумент fillvalue в .zip() повторяет поведение zip_longest(): число кортежей равно длине самой длинной итерируемой, недостающие элементы заполняются значением по умолчанию. Без fillvalue в примере ниже был бы один кортеж [(1, 10, 100)], так как кратчайший список — один элемент.
from airflow.sdk import task
@task
def one_two_three():
return [1, 2]
@task
def ten_twenty_thirty():
return [10]
@task
def one_two_three_hundred():
return [100, 200, 300]
zipped_arguments = one_two_three().zip(
ten_twenty_thirty(), one_two_three_hundred(), fillvalue=1000
)
# zipped_arguments: [(1, 10, 100), (2, 1000, 200), (1000, 1000, 300)]
@task
def add_nums(zipped_x_y_z):
return zipped_x_y_z[0] + zipped_x_y_z[1] + zipped_x_y_z[2]
add_nums.expand(zipped_x_y_z=zipped_arguments)
from airflow.providers.standard.operators.python import PythonOperator
def one_two_three_function():
return [1, 2]
def ten_twenty_thirty_function():
return [10]
def one_two_three_hundred_function():
return [100, 200, 300]
one_two_three = PythonOperator(
task_id="one_two_three", python_callable=one_two_three_function
)
ten_twenty_thirty = PythonOperator(
task_id="ten_twenty_thirty", python_callable=ten_twenty_thirty_function
)
one_two_three_hundred = PythonOperator(
task_id="one_two_three_hundred", python_callable=one_two_three_hundred_function
)
zipped_arguments = one_two_three.output.zip(
ten_twenty_thirty.output, one_two_three_hundred.output, fillvalue=1000
)
def add_nums_function(x, y, z):
return x + y + z
add_nums = PythonOperator.partial(
task_id="add_nums", python_callable=add_nums_function
).expand(op_args=zipped_arguments)
Три экземпляра add_nums: map index 0 — 111, 1 — 1202, 2 — 2300.
Цепочка маппингов (repeated mapping)
Задачу можно маппить по выводу другой маппленной задачи. Тогда создаётся по одному маппленному экземпляру на каждый экземпляр вышестоящей задачи.
В примере ниже три маппленные задачи в цепочке:
from airflow.sdk import task
@task
def multiply_by_2(num):
return num * 2
@task
def add_10(num):
return num + 10
@task
def multiply_by_100(num):
return num * 100
multiplied_value_1 = multiply_by_2.expand(num=[1, 2, 3])
summed_value = add_10.expand(num=multiplied_value_1)
multiply_by_100.expand(num=summed_value)
from airflow.providers.standard.operators.python import PythonOperator
def multiply_by_2_func(num):
return [num * 2]
def add_10_func(num):
return [num + 10]
def multiply_by_100_func(num):
return num * 100
multiply_by_2 = PythonOperator.partial(
task_id="multiply_by_2",
python_callable=multiply_by_2_func
).expand(op_args=[[1], [2], [3]])
add_10 = PythonOperator.partial(
task_id="add_10",
python_callable=add_10_func
).expand(op_args=multiply_by_2.output)
multiply_by_100 = PythonOperator.partial(
task_id="multiply_by_100",
python_callable=multiply_by_100_func
).expand(op_args=add_10.output)
multiply_by_2 >> add_10 >> multiply_by_100
У multiply_by_2 три экземпляра: 2, 4, 6. У add_10 три экземпляра: 12, 14, 16. У multiply_by_100 три экземпляра: 1200, 1400, 1600. Так можно выстраивать цепочку из любого числа маппленных задач. Экспоненциально увеличивать число экземпляров при этом нельзя.
Маппинг по task groups
Task groups, определённые декоратором @task_group, тоже можно маппить. Синтаксис такой же, как для одной задачи:
from airflow.sdk import task, task_group
@task_group(group_id="group1")
def tg1(my_num):
@task
def print_num(num):
return num
@task
def add_42(num):
return num + 42
print_num(my_num) >> add_42(my_num)
tg1_object = tg1.expand(my_num=[19, 23, 42, 8, 7, 108])
Получается 6 маппленных экземпляров группы group1. По нескольким параметрам группы можно маппить так же, как для обычных задач (cross-product, zip, expand_kwargs). См. Маппинг по нескольким параметрам.
Преобразование вывода с помощью .map() {#transform-outputs-with-map}
Иногда нужно преобразовать вывод вышестоящей задачи до того, как по нему маппится следующая. Например, если традиционный оператор возвращает данные в фиксированном формате или нужно пропустить часть экземпляров по условию.
Метод .map() принимает Python-функцию и применяет её к итерируемому входу перед маппингом. Вызов: на задаче TaskFlow — my_upstream_task().map(mapping_function), на традиционном операторе — my_upstream_operator.output.map(mapping_function). Нижестоящая задача маппится по результату .map() через .expand() (один keyword argument) или .expand_kwargs() (список словарей с наборами kwargs).
В примере ниже .map() используется, чтобы пропускать отдельные элементы по условию (через AirflowSkipException):
from airflow.sdk import task
from airflow.exceptions import AirflowSkipException
@task
def list_strings():
return ["skip_hello", "hi", "skip_hallo", "hola", "hey"]
def skip_strings_starting_with_skip(string):
if len(string) < 4:
return string + "!"
elif string[:4] == "skip":
raise AirflowSkipException(f"Skipping {string}; as I was told!")
else:
return string + "!"
transformed_list = list_strings().map(skip_strings_starting_with_skip)
@task
def mapped_printing_task(string):
return "Say " + string
mapped_printing_task.partial().expand(string=transformed_list)
from airflow.providers.standard.operators.python import PythonOperator
from airflow.exceptions import AirflowSkipException
def list_strings():
return ["skip_hello", "hi", "skip_hallo", "hola", "hey"]
listed_strings = PythonOperator(
task_id="list_strings",
python_callable=list_strings,
)
def skip_strings_starting_with_skip(string):
if len(string) < 4:
return [string + "!"]
elif string[:4] == "skip":
raise AirflowSkipException(f"Skipping {string}; as I was told!")
else:
return [string + "!"]
transformed_list = listed_strings.output.map(skip_strings_starting_with_skip)
def mapped_printing_function(string):
return "Say " + string
mapped_printing = PythonOperator.partial(
task_id="mapped_printing",
python_callable=mapped_printing_function,
).expand(op_args=transformed_list)
Функция преобразования не появляется как отдельная задача Airflow. В UI во вкладке Mapped Tasks видно, что экземпляры 0 и 2 помечены как skipped.
Пример: обработка файлов в S3
Типичный сценарий — обработка файлов в Amazon S3: ELT — извлечение из S3, загрузка в Snowflake, трансформация в Snowflake. Число файлов каждый день неизвестно; динамический маппинг создаёт по задаче на каждый файл в рантайме (атомарность, удобная наблюдаемость и восстановление после сбоев).
Полный код примера: dynamic-task-mapping-tutorial.
Шаги DAG:
- Задекорированная задача получает список файлов из S3 (префикс с
ds_nodash— только за дату DAG run, например20250412/). - По результату маппится
CopyFromExternalStageToSnowflakeOperatorдля каждого файла. - Одновременно: копирование обработанных файлов в
processed/, удаление исходной папки, выполнение SQL-трансформации в Snowflake.
TaskFlow:
from airflow.decorators import dag, task
from airflow.providers.snowflake.transfers.copy_into_snowflake import (
CopyFromExternalStageToSnowflakeOperator,
)
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.operators.s3 import S3CopyObjectOperator
from airflow.providers.amazon.aws.operators.s3 import S3DeleteObjectsOperator
from pendulum import datetime
@dag(
start_date=datetime(2024, 4, 2),
catchup=False,
template_searchpath="/usr/local/airflow/include",
schedule="@daily",
)
def mapping_elt():
@task
def get_s3_files(current_prefix):
s3_hook = S3Hook(aws_conn_id="s3")
current_files = s3_hook.list_keys(
bucket_name="my-bucket",
prefix=current_prefix + "/",
start_after_key=current_prefix + "/",
)
return [[file] for file in current_files]
copy_to_snowflake = CopyFromExternalStageToSnowflakeOperator.partial(
task_id="load_files_to_snowflake",
stage="MY_STAGE",
table="COMBINED_HOMES",
schema="MYSCHEMA",
file_format="(type = 'CSV',field_delimiter = ',', skip_header=1)",
snowflake_conn_id="snowflake",
).expand(files=get_s3_files(current_prefix="{{ ds_nodash }}"))
move_s3 = S3CopyObjectOperator(
task_id="move_files_to_processed",
aws_conn_id="s3",
source_bucket_name="my-bucket",
source_bucket_key="{{ ds_nodash }}" + "/",
dest_bucket_name="my-bucket",
dest_bucket_key="processed/" + "{{ ds_nodash }}" + "/",
)
delete_landing_files = S3DeleteObjectsOperator(
task_id="delete_landing_files",
aws_conn_id="s3",
bucket="my-bucket",
prefix="{{ ds_nodash }}" + "/",
)
transform_in_snowflake = SnowflakeOperator(
task_id="run_transformation_query",
sql="/transformation_query.sql",
snowflake_conn_id="snowflake",
)
copy_to_snowflake >> [move_s3, transform_in_snowflake]
move_s3 >> delete_landing_files
mapping_elt()
Традиционный вариант:
from airflow import DAG
from airflow.providers.snowflake.transfers.copy_into_snowflake import (
CopyFromExternalStageToSnowflakeOperator,
)
from airflow.operators.python import PythonOperator
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.operators.s3 import S3CopyObjectOperator
from airflow.providers.amazon.aws.operators.s3 import S3DeleteObjectsOperator
from pendulum import datetime
def get_s3_files(current_prefix):
s3_hook = S3Hook(aws_conn_id="s3")
current_files = s3_hook.list_keys(
bucket_name="my-bucket",
prefix=current_prefix + "/",
start_after_key=current_prefix + "/",
)
return [[file] for file in current_files]
with DAG(
"mapping_elt_traditional",
start_date=datetime(2024, 4, 2),
catchup=False,
template_searchpath="/usr/local/airflow/include",
schedule="@daily",
):
get_s3_files_task = PythonOperator(
task_id="get_s3_files",
python_callable=get_s3_files,
op_kwargs={"current_prefix": "{{ ds_nodash }}"},
)
copy_to_snowflake = CopyFromExternalStageToSnowflakeOperator.partial(
task_id="load_files_to_snowflake",
stage="MY_STAGE",
table="COMBINED_HOMES",
schema="MYSCHEMA",
file_format="(type = 'CSV',field_delimiter = ',', skip_header=1)",
snowflake_conn_id="snowflake",
).expand(files=get_s3_files_task.output)
move_s3 = S3CopyObjectOperator(
task_id="move_files_to_processed",
aws_conn_id="s3",
source_bucket_name="my-bucket",
source_bucket_key="{{ ds_nodash }}" + "/",
dest_bucket_name="my-bucket",
dest_bucket_key="processed/" + "{{ ds_nodash }}" + "/",
)
delete_landing_files = S3DeleteObjectsOperator(
task_id="delete_landing_files",
aws_conn_id="s3",
bucket="my-bucket",
prefix="{{ ds_nodash }}" + "/",
)
transform_in_snowflake = SnowflakeOperator(
task_id="run_transformation_query",
sql="/transformation_query.sql",
snowflake_conn_id="snowflake",
)
copy_to_snowflake >> [move_s3, transform_in_snowflake]
move_s3 >> delete_landing_files
При маппинге важно учитывать формат параметра: в примере выше своя функция формирует список списков из ключей S3, потому что оператор загрузки в Snowflake ожидает каждый ключ в виде списка, а list_keys возвращает один плоский список. Такая функция превращает результат хука в список списков для нижестоящего оператора.
← Отладка | К содержанию | Task groups → | Передача данных →