Перейти к содержанию

Отладка DAG в Airflow

В этом руководстве описано, как находить и устранять типичные проблемы с DAG в Airflow, а также куда обращаться, если решения не нашлось. Основной фокус — локальная разработка, но большая часть применима и к продакшену.

Рекомендуется наладить систематическое тестирование DAG, чтобы предотвращать типичные ошибки. См. Test Airflow DAGs.

Необходимая база

Полезно понимать:

Общий подход к отладке

Чтобы повысить шансы найти и исправить ошибку, задайте себе следующие вопросы:

  • Воспроизводится ли проблема в новом локальном инстансе Airflow (например, через Astro CLI)?
  • Какие версии Airflow и провайдеров используются? Сверьтесь с подходящей версией документации Airflow.
  • Можете ли собрать нужные логи? Где они лежат и как настраиваются: Airflow logging.
  • Проблема со всеми DAG или только с одним?
  • Корректно ли настроены подключения Airflow (учётные данные)? См. раздел «Отладка подключений» ниже.
  • Есть ли у Airflow доступ ко всем нужным файлам? Особенно важно в контейнерах (например, Astro CLI).
  • В каком состоянии компоненты Airflow? Проверьте логи каждого компонента и при необходимости перезапустите окружение.
  • Проблема в Airflow или во внешней системе? Проверьте, выполняется ли действие во внешней системе без Airflow.

Ответы помогут сузить круг причин и выбрать следующие шаги.

Airflow не запускается на Astro CLI

Чаще всего Airflow локально запускают через Astro CLI, standalone или Docker. Ниже — типичные проблемы именно с Astro CLI.

Частые причины:

  • Компоненты Airflow уходят в crash-loop из-за ошибок в кастомных плагинах или XCom backend. Логи планировщика: astro dev logs -s.
  • Ошибки из-за кастомных команд в Dockerfile или конфликтов зависимостей в packages.txt и requirements.txt.
  • Astro CLI установлен некорректно. Проверьте: astro version. При наличии новой версии обновите CLI.

По проблемам инфраструктуры на других платформах (Docker, Kubernetes с Helm Chart, managed-сервисы) см. соответствующую документацию и поддержку.

Подробнее об тестировании и отладке локально с Astro CLI — в документации Astro.

Типичные проблемы с DAG

DAG не отображаются в UI

Если DAG не появляется в UI, чаще всего Airflow не может его распарсить. В UI при этом будет видна ошибка импорта (Import Error). Текст ошибки подсказывает, что исправить.

Чтобы посмотреть ошибки импорта в терминале: с Astro CLI — astro dev run dags list-import-errors, с Airflow CLI — airflow dags list-import-errors.

Если сообщения об ошибке импорта нет, но DAG в UI по-прежнему нет:

  • Если в UI указано, что планировщик не запущен, проверьте логи планировщика — возможно, падение из-за ошибки в DAG. В Astro CLI: astro dev logs -s, затем перезапуск.
  • Перезапустите планировщик: astro dev restart.
  • Убедитесь, что DAG зарегистрирован в БД метаданных: astro dev run dags list или airflow dags list. Если DAG есть в списке, но не в UI, перезапустите Airflow: astro dev restart.
  • Проверьте права доступа к DAG и права на просмотр DAG.
  • Airflow сканирует каталог dags с интервалом dag_dir_list_interval (по умолчанию 5 минут). Принудительный перепарсинг: astro dev run dags reserialize / airflow dags reserialize.
  • Убедитесь, что все DAG лежат в каталоге dags.

На уровне кода проверьте, что:

  • DAG, определённый через декоратор @dag, вызывается (см. Декораторы Airflow).
  • У DAG уникальный dag_id.

Через плагин можно настроить Airflow listener, который выполняет код при появлении новой ошибки импорта (on_new_dag_import_error) или при повторной обработке известной ошибки (on_existing_dag_import_error).

Ошибки импорта из-за зависимостей

Частая причина ошибок импорта — отсутствие нужных пакетов в окружении Airflow: нет provider packages для используемых операторов/хуков или нет Python-пакетов, которые вызываются в задачах.

В проекте Astro пакеты уровня ОС добавляют в packages.txt, Python-пакеты (в том числе провайдеры) — в requirements.txt. Если нужен другой менеджер пакетов, можно добавить команду в Dockerfile.

Чтобы избежать несовместимости при выходе новых версий, рекомендуется фиксировать версии в проекте. Например, apache-airflow-providers-amazon==9.6.0 в requirements.txt не даст подтянуться несовместимой новой версии. Без фиксации Airflow будет использовать последнюю доступную версию.

В Astro CLI пакеты ставятся в контейнер планировщика. Проверить установку пакета:

astro dev bash --scheduler "pip freeze | grep <package-name>"

При конфликте версий или необходимости разных версий Python задачи можно выполнять в отдельных окружениях:

Если у многих задач общий набор пакетов и версий, часто используют два и более отдельных развёртывания Airflow.

DAG не запускаются или ведут себя не так

Если DAG не запускаются или работают не так, как задумано, проверьте:

  • Если не запускается ни один DAG — состояние планировщика: astro dev logs -s.
  • Протестировать DAG: astro dev dags test <dag_id> <execution_date> или airflow dags test <dag_id> <execution_date>.
  • У DAG должен быть start_date в прошлом. При start_date в будущем DAG run будет успешным, но без запуска задач. Не используйте datetime.now() в качестве start_date.
  • У каждого DAG должен быть уникальный dag_id. При двух DAG с одним id планировщик раз в 30 секунд случайно выбирает один из них для отображения.
  • DAG должны быть включены (не на паузе), чтобы запускаться по расписанию. Включить можно переключателем в UI или через Airflow CLI. Чтобы все DAG по умолчанию были включены, задайте в конфиге dags_are_paused_at_creation=False.

Если DAG запускается, но не по ожидаемому расписанию, см. Планирование в Airflow. При кастомном timetable убедитесь, что data interval DAG run не раньше start_date DAG.

Типичные проблемы с задачами

Если не работает весь DAG, см. раздел «DAG не запускаются или ведут себя не так» выше.

Между Airflow 2 и Airflow 3 существенно изменилась архитектура (безопасность, remote execution и др.). Для авторов DAG важно: прямой доступ к БД метаданных из задач больше недоступен. Подробнее: Upgrade from Airflow 2 to 3 и Release notes.

Задачи не запускаются или ведут себя не так

DAG может стартовать, но задачи остаются в разных состояниях или выполняются не в нужном порядке. Что проверить:

  • При CeleryExecutor в версиях Airflow до 2.6, если задачи застревают в queued, можно включить stalled_task_timeout.
  • Параметр task_queued_timeout задаёт, сколько задача может находиться в очереди перед повтором или пометкой failed. По умолчанию 600 секунд.
  • Проверьте зависимости задач и trigger rules. См. Управление зависимостями. Можно временно пересобрать DAG из EmptyOperator, чтобы убедиться в правильной структуре зависимостей.
  • При использовании декораторов задач убедитесь, что задачи вызываются (см. Декораторы Airflow).
  • При большом числе экземпляров задач или DAG учитывайте параметры масштабирования и лимиты по умолчанию. Подробнее: Scaling Airflow.
  • Если у задачи depends_on_past=True, новые задачи не запустятся, пока не задано состояние предыдущих run этой задачи.
  • Если задачи остаются в scheduled или queued, проверьте работу планировщика. При необходимости перезапустите его или увеличьте ресурсы.
  • Ещё раз проверьте, что start_date DAG в прошлом. start_date в будущем даёт успешный DAG run без выполнения задач.

Задачи падают

Большинство падений задач относится к одному из трёх типов:

  • Проблемы во внешней системе.
  • Ошибки внутри оператора.
  • Некорректные параметры оператора.

Упавшие задачи отображаются красными квадратами в Grid view; оттуда же открываются логи задачи. В логах видна причина ошибки.

Для быстрого реагирования на падения настройте уведомления. См. Уведомления об ошибках в Airflow.

В новых DAG сообщения вроде Task exited with return code Negsignal.SIGKILL или код -9 часто связаны с нехваткой памяти. Увеличьте ресурсы планировщика, webserver или pod в зависимости от типа executor (Local, Celery, Kubernetes).

После исправления может понадобиться перезапуск DAG или задач. См. Rerunning DAGs.

Проблемы с динамически маппленными задачами

Динамический маппинг задач позволяет менять число задач в рантайме по входным параметрам. Поддерживается и динамический маппинг по task groups.

Возможные причины проблем:

  • Не все параметры поддерживают маппинг; при неподдерживаемом параметре будет ошибка импорта.
  • Число одновременно выполняемых маппленных экземпляров одной задачи во всех run DAG ограничено параметром задачи max_active_tis_per_dag.
  • Превышен лимит числа маппленных экземпляров. Лимит задаётся конфигом max_map_length (по умолчанию 1024).
  • Маппинг по пустому списку — маппленная задача будет пропущена.
  • При .expand_kwargs() параметры должны быть в виде List(Dict).
  • В .expand() нужно передавать именованный аргумент (keyword argument).

При сложных сценариях с динамическим маппингом сначала соберите структуру DAG из EmptyOperator или задекорированных Python-операторов, убедитесь, что она верна, затем подставляйте нужные задачи. Примеры: Create dynamic Airflow tasks.

Часто вывод вышестоящего оператора не совпадает по формату с тем, что нужно для маппинга. Используйте .map() для преобразования элементов списка через Python-функцию.

Отсутствующие логи

При отладке падения задачи логов может не быть: на странице логов в UI — «спиннер» или пустой файл.

Обычно логи не появляются, когда процесс в планировщике или воркере завершается и связь теряется. Что можно сделать:

  • Проверьте логи планировщика и webserver на ошибки, связанные с отсутствием логов.
  • Убедитесь, что логи хранятся достаточно долго. Клиентам Astronomer: View logs.
  • Увеличьте CPU или память для задачи.
  • При Kubernetes executor, если задача падает очень быстро (менее 15 секунд), pod может завершиться до того, как webserver успеет забрать логи. По возможности добавьте небольшую задержку в задачу (в зависимости от оператора). Иначе ищите причину немедленного падения (часто — нехватка ресурсов или ошибка конфигурации).
  • Увеличьте ресурсы воркеров (Celery) или планировщика (Local executor).
  • Увеличьте параметр log_fetch_timeout_sec выше значения по умолчанию (5 секунд). Он задаёт время ожидания webserver при первом контакте с воркером при получении логов.
  • Попробуйте перезапустить задачу через clearing task instance — иногда логи появляются при повторном запуске.

Отладка подключений

Подключения в Airflow обычно нужны для обмена с внешними системами. Большинство хуков и операторов ожидают заданный параметр подключения. Поэтому некорректно заданные подключения — одна из самых частых причин отладки при первых шагах с DAG.

Конкретная ошибка может быть разной, но в логах задачи часто фигурирует слово «connection». Если подключение не задано, может быть сообщение вида 'connection_abc' is not defined.

Что проверить:

  • Работают ли учётные данные при прямом обращении к API внешней системы.
  • Задавайте подключения через переменные окружения Airflow, а не только в UI. Не задавайте одно и то же подключение в нескольких местах; при дублировании приоритет у переменной окружения.
  • Измените подключение _default на нужные данные или создайте новое подключение с другим именем и передайте его в хук или оператор.
  • Установлены ли нужные provider packages для данного типа подключения.
  • Как устроены подключения: Управление подключениями в Airflow.

Чтобы узнать, какие параметры нужны для конкретного типа подключения:

  • Посмотрите исходный код хука, который использует оператор.
  • Изучите документацию внешней системы по аутентификации.
  • В Astronomer Registry откройте документацию провайдера; у популярных провайдеров обычно описаны типы подключений (например, Azure).

Нужна дополнительная помощь

Этого руководства обычно достаточно для типичных случаев. Если вашей проблемы в нём нет:

  • При баге в Airflow или основных провайдерах создайте issue в репозитории Airflow. Для open source инструментов Astronomer — в репозиториях Astronomer.
  • Apache Airflow Slack: каналы #newbie-questions или #troubleshooting — удобное место для сложных вопросов по Airflow.
  • Stack Overflow с тегами airflow и другими по используемым инструментам. Удобно, когда непонятно, в каком компоненте ошибка — вопрос увидят специалисты по разным системам.
  • Клиенты Astronomer: поддержка.

Чтобы получить более точный ответ, включите в вопрос или issue:

  • Что изменилось в окружении, когда появилась проблема.
  • Что именно вы хотите сделать (максимально подробно).
  • Полный код DAG, если ошибка в нём.
  • Полный текст ошибки и traceback.
  • Версию Airflow и версии задействованных провайдеров.
  • Как запускается Airflow (Astro CLI, standalone, Docker, managed-сервис).

← Версионирование | К содержанию | Лучшие практики →