← → ou Espace pour naviguer

Formation
Apache Airflow

Créer, planifier et surveiller
des workflows programmatiques

7 modules · ~7-10 heures · De l'introduction aux pipelines SQL

1

Sommaire

01

Introduction et Premiers DAGs

02

Structure et Flux de Contrôle

03

Paramètres d'un DAG

04

Jinja Templating

05

Gestion du Temps

06

Gestion des Erreurs

07

Données et SQL

2
01

Introduction et
Premiers DAGs

Découvrir Airflow, son architecture et les concepts fondamentaux

3
Module 1

Qu'est-ce qu'Apache Airflow ?

Analogie : Airflow ne joue pas de musique — il dirige l'orchestre.
4
Module 1

Pourquoi Airflow ?

Problème avec les cron jobsSolution Airflow
Pas de visibilité sur l'état des tâchesInterface web avec statuts en temps réel
Pas de gestion des dépendancesLes DAGs définissent l'ordre d'exécution
Pas de gestion des erreursRetries automatiques, callbacks d'alerte
Configuration dispersée (crontab)Tout en Python, versionné avec Git
Pas de logs centralisésLogs par tâche, accessibles dans l'interface
5
Module 1

Cas d'usage principaux

Pipelines ETL / ELT

Extraire, transformer et charger des données depuis de multiples sources

MLOps

Entraînement, évaluation et déploiement de modèles de Machine Learning

Infrastructure

Sauvegardes, nettoyage de fichiers, gestion de serveurs Cloud

Rapports automatisés

Requêtes SQL planifiées, génération Excel, envoi par email

6
Module 1

Architecture d'Airflow

Browser
Web Server (Flask)
Metadata Database (PostgreSQL)
Scheduler
Triggerer
Executor → Workers
7
Module 1

Metadata Database

Le cœur de la mémoire d'Airflow. Généralement PostgreSQL ou MySQL.

Tous les autres composants lisent et écrivent dans cette base — c'est le point central du système.
8
Module 1

Le Scheduler

Le cerveau du système. Il tourne en permanence et :

  1. Lit le dossier contenant vos fichiers Python (~/airflow/dags/)
  2. Vérifie s'il est l'heure de lancer un DAG
  3. Vérifie si les dépendances d'une tâche sont remplies
  4. Envoie les tâches prêtes à l'Executor
Le scheduler relit les fichiers DAG toutes les 30 secondes par défaut — d'où un court délai avant que vos changements apparaissent dans l'interface.
9
Module 1

Executor et Workers

L'Executor détermine comment et les tâches s'exécutent. Les Workers exécutent réellement le code.

ExecutorDescriptionUsage
SequentialExecutorUne seule tâche à la foisTests uniquement
LocalExecutorParallèle, même machinePetites installations
CeleryExecutorRépartit sur plusieurs serveursProduction distribuée
KubernetesExecutorUn conteneur éphémère par tâcheProduction Cloud
10
Module 1

Web Server et Triggerer

Web Server

Interface graphique basée sur Flask / Gunicorn.

  • Lit les informations depuis la Metadata Database
  • Permet de surveiller, déclencher et debugger les DAGs
  • Accessible via navigateur web

Triggerer (2.2+)

Gère les tâches asynchrones (Deferrable Operators).

  • Au lieu de bloquer un Worker pendant des heures
  • La tâche est mise en pause et surveillée par le Triggerer
  • Le Worker est libéré pour d'autres tâches
11
Module 1

DAGs View

Le tableau de bord principal. Pour chaque DAG, vous voyez :

Vue DAGs dans Airflow
12
Module 1

Vue Grid

La vue la plus utilisée pour surveiller l'historique. Colonnes = exécutions dans le temps, Lignes = tâches du DAG.

Vue Grid dans Airflow
13
Module 1

Code couleur des statuts

Chaque case de la grille indique le statut d'une tâche pour un run donné.

CouleurStatutSignification
VertsuccessTâche terminée avec succès
RougefailedToutes les tentatives épuisées, échec définitif
Orange foncérunningTâche en cours d'exécution
Orange moyenup_for_retryEn attente avant une nouvelle tentative
Orange clairupstream_failedUne tâche en amont a échoué
BleuqueuedEn file d'attente, attend un worker
GrisskippedIgnorée volontairement (ex : branche non choisie)
14
Module 1

Vue Graph

Structure logique du DAG : les tâches sous forme de cases reliées par des flèches. Idéal pour comprendre l'ordre d'exécution et le parallélisme.

Vue Graph dans Airflow
15
Module 1

Les Logs

Outil n°1 pour le débogage. Cliquez sur une tâche → onglet Log → sortie console complète.

Vue Logs dans Airflow
Cherchez en fin de log : le traceback Python ou le code de retour Bash (0 = succès, 1 = erreur, 127 = commande introuvable).
16
Module 1

Qu'est-ce qu'un DAG ?

Directed Acyclic Graph (Graphe Orienté Acyclique)

task_1
task_2
task_3
Un DAG est un fichier Python qui décrit : quand tourner, quelles tâches, et dans quel ordre.
17
Module 1

Les Opérateurs

Un Opérateur est un modèle de tâche prédéfini. Chaque opérateur nécessite au minimum un task_id unique et un ou plusieurs paramètres spécifiques à son type.

BashOperator

BashOperator( task_id='ma_tache', # identifiant unique bash_command='echo "Bonjour"', # paramètre spécifique )

PythonOperator

def ma_fonction(): print("Bonjour") PythonOperator( task_id='ma_tache', # identifiant unique python_callable=ma_fonction, # paramètre spécifique )
18
Module 1

Il en existe des dizaines d'autres

BashOperator et PythonOperator sont les fondamentaux, mais Airflow propose un large catalogue d'opérateurs spécialisés :

OpérateurCe qu'il faitCas d'usage
SQLExecuteQueryOperatorExécute une requête SQLPipelines de données
EmailOperatorEnvoie un emailNotifications, rapports
SlackAPIPostOperatorPoste un message SlackAlertes d'équipe
SimpleHttpOperatorAppelle une API HTTPIntégrations tierces
BranchPythonOperatorChoisit un chemin conditionnelFlux dynamiques (Module 2)
Chaque opérateur déclare ses champs via template_fields — on y reviendra dans le Module 4 (Jinja Templating).
19
Module 1

Dépendances avec >>

Exécution linéaire

task_1 >> task_2 >> task_3
task_1
task_2
task_3

Exécution parallèle

task_A >> [task_B, task_C] >> task_D
task_A
task_B
task_C
task_D

Si task_2 échoue → task_3 ne démarre pas (statut Upstream Failed)

20
Module 1

3 façons d'écrire un DAG

Airflow propose trois syntaxes pour définir un pipeline. Chacune a ses avantages selon le contexte.

1
Classique
with DAG(...):
2
TaskFlow
@dag + @task
3
Hybride
Les deux combinées
21
Module 1

Méthode 1 — Classique

La méthode historique : with DAG(...) as dag: ouvre le conteneur, puis on déclare chaque opérateur en dessous.

from airflow import DAG from airflow.operators.bash import BashOperator from airflow.operators.python import PythonOperator with DAG(dag_id='my_dag', default_args=default_args) as dag: task_1 = BashOperator(task_id='task_1', bash_command='echo "Étape 1"') task_2 = PythonOperator(task_id='task_2', python_callable=ma_fonction) task_3 = BashOperator(task_id='task_3', bash_command='echo "Étape 3"') task_1 >> task_2 >> task_3 # dépendances explicites
Dépendances
Explicites avec >>
Données
XComs manuels
Quand ?
Pipelines existants
22
Module 1

Méthode 2 — TaskFlow

La méthode recommandée depuis Airflow 2.0 pour du Python pur. Les dépendances sont implicites : Airflow déduit l'ordre via les arguments des fonctions.

from airflow.decorators import dag, task @dag(dag_id='taskflow_dag') def mon_pipeline(): @task() def extraire(): return {"nb_lignes": 100} @task() def transformer(donnees): # reçoit le return de extraire() return donnees @task() def charger(donnees): print(f"Chargé {donnees['nb_lignes']} lignes") charger(transformer(extraire())) # enchaînement par données mon_pipeline() # instancier le DAG
Dépendances
Implicites (flux de données)
Données
return automatique
Quand ?
Nouveau code Python
23
Module 1

Méthode 3 — Hybride

Mélanger opérateurs classiques et @task dans un même DAG — idéal pour une migration progressive.

@dag(dag_id='hybrid_dag') def mon_pipeline_hybride(): bash_step = BashOperator( task_id='bash_step', bash_command='echo "Étape Bash"') @task() def python_step(): return "OK" bash_final = BashOperator( task_id='bash_final', bash_command='echo "Fin"') active_step = python_step() # appeler la fonction ! bash_step >> active_step >> bash_final mon_pipeline_hybride()
Attention : une tâche @task doit être appelée python_step() avant de pouvoir la chaîner avec >>. Sans les parenthèses, c'est une fonction, pas une tâche.
24
Module 1

Récapitulatif — 3 méthodes

ClassiqueTaskFlowHybride
Déclarationwith DAG(...):@dag@dag
TâchesBashOperator, PythonOperator@taskLes deux
DépendancesExplicites (>>)Implicites (flux de données)Explicites (>>)
DonnéesXComs manuelsreturn automatiqueMixte
Quand ?Pipelines existantsNouveau code PythonMigration progressive
TaskFlow est la méthode recommandée depuis Airflow 2.0 pour du Python pur.
25
02

Structure et
Flux de Contrôle

Branchements, groupes de tâches et sensors

26
Module 2

BranchPythonOperator

Exécuter des chemins différents selon une condition — comme un if/else au niveau du DAG.

def choisir_chemin(): if nb_commandes > 50: return 'traitement_express' return 'traitement_standard' branch = BranchPythonOperator( task_id='choisir_chemin', python_callable=choisir_chemin, ) branch >> [express, standard]
choisir_chemin
express ✓
standard (skip)
Retourne un task_id (str) ou une liste de task_id pour activer plusieurs branches simultanément : return ['notifier_client', 'notifier_support']
27
Module 2

@task.branch — la version TaskFlow

Depuis Airflow 2.0, préférez le décorateur @task.branch au BranchPythonOperator — plus concis et cohérent avec l'API TaskFlow.

@task.branch

@task.branch() def choisir_chemin(): if nb_commandes > 50: return 'traitement_express' return 'traitement_standard' choisir_chemin() >> [express, standard]

BranchPythonOperator

def choisir_chemin(): if nb_commandes > 50: return 'traitement_express' return 'traitement_standard' branch = BranchPythonOperator( task_id='choisir_chemin', python_callable=choisir_chemin, ) branch >> [express, standard]
Même comportement, même possibilité de retourner une liste — mais moins de boilerplate. Privilégiez @task.branch pour tout nouveau code.
28
Module 2

Le problème de convergence

Après un branchement, la tâche de convergence ne s'exécutera jamais par défaut (all_success) car une branche est toujours skippée.

Par défaut :
trigger_rule='all_success'
→ Bloqué car une branche est skippée
Solution :
trigger_rule='none_failed_min_one_success'
→ S'exécute si aucun échec et ≥1 succès
envoyer_notification = BashOperator( task_id='envoyer_notification', bash_command='echo "Envoyé"', trigger_rule='none_failed_min_one_success', # la solution )
29
Module 2

TaskGroup

Regrouper des tâches dans des boîtes repliables dans l'UI — purement visuel, sans impact sur l'exécution.

from airflow.utils.task_group import TaskGroup with TaskGroup("validation") as grp_valid: verifier_stock = BashOperator( task_id='verifier_stock', ...) verifier_paiement = BashOperator( task_id='verifier_paiement', ...) with TaskGroup("preparation") as grp_prep: preparer = BashOperator(...) emballer = BashOperator(...) preparer >> emballer # ordre interne grp_valid >> grp_prep # chaîner les groupes
validation
vérifier_stock
vérifier_paiement
préparation
préparer
emballer

task_id préfixés : validation.vérifier_stock

30
Module 2

SubDAGs — déprécié

Vous rencontrerez peut-être des SubDagOperator dans du code existant. Ce mécanisme permettait d'imbriquer un DAG entier à l'intérieur d'un autre.

Déprécié depuis Airflow 2.2 — Les SubDAGs posaient de nombreux problèmes : deadlocks, exécution sur un seul worker, difficulté de debug, et complexité inutile.
SubDAGTaskGroup
StatutDépréciéRecommandé
ExécutionWorker dédié (risque deadlock)Workers normaux
VisibilitéDAG séparé cachéBoîte repliable dans le même DAG
ComplexitéÉlevée (fichier/fonction séparés)Faible (with TaskGroup():)
Si vous en croisez un, migrez-le vers un TaskGroup — la syntaxe est plus simple et le comportement plus fiable.
31
Module 2

Sensors

Opérateurs qui attendent qu'une condition externe soit remplie avant de continuer.

FileSensor( task_id='attendre_fichier', filepath='commandes.csv', poke_interval=10, # vérifie toutes les 10s timeout=120, # échoue après 2 min mode='poke', # ou 'reschedule' )
mode='poke' — Worker reste bloqué pendant l'attente. Pour attentes courtes (<5 min)
mode='reschedule' — Worker libéré entre chaque vérification. Pour attentes longues (heures)
32
03

Paramètres
d'un DAG

Identité, planification, concurrence et configuration dynamique

33
Module 3

Identité et Métadonnées

DAG( dag_id='pipeline_ventes_quotidien', # identifiant unique description="Extraction des ventes du CRM", # visible dans l'UI tags=['data', 'production'], # filtres dans l'UI default_args={ 'owner': 'data_team', 'retries': 1, }, )
34
Module 3

Planification

Deux paramètres essentiels : start_date (à partir de quand) et schedule_interval (à quelle fréquence).

DAG( dag_id='mon_pipeline', start_date=pendulum.datetime(2024, 1, 1, tz="Europe/Paris"), # date de référence schedule_interval='0 8 * * 1', # expression Cron OU preset catchup=False, )

schedule_interval — Cron

# min h jour mois sem 0 8 * * 1 # lundi 8h 0 0 1 * * # 1er du mois */15 * * * * # toutes les 15min

schedule_interval — Presets

PresetFréquence
@dailyTous les jours minuit
@hourlyToutes les heures
@weeklyDimanche minuit
NoneManuel uniquement
catchup=False — Si votre start_date est dans le passé, Airflow lance par défaut tous les runs manqués d'un coup. Cela peut saturer votre serveur. Désactivez-le sauf besoin explicite.
35
Module 3

Concurrence

Limiter le parallélisme pour protéger les ressources ou faciliter le debug.

ParamètrePortéeEffet
max_active_runsLe DAG entierNombre de DAGRuns simultanés
max_active_tasksUn seul DAGRunNombre de tâches parallèles dans un run
DAG( max_active_runs=1, # un seul run à la fois max_active_tasks=2, # max 2 tâches en parallèle )

Utilisez la vue Gantt pour observer l'effet du parallélisme.

36
Module 3

Params — valeurs au déclenchement

Valeurs modifiables via le formulaire "Trigger DAG w/ config". Portée : un seul DAGRun.

Déclaration dans le DAG

DAG( dag_id='mon_dag', params={ 'environnement': 'developpement', 'limite_lignes': 100, }, )

Accès en Python (@task)

@task() def ma_tache(**context): env = context['params']['environnement'] limite = context['params']['limite_lignes'] print(f"Env: {env}, limite: {limite}")
Les valeurs modifiées au déclenchement n'affectent que cette exécution. Les runs planifiés utilisent les valeurs par défaut.
37
Module 3

Variables Airflow — config globale

Paires clé-valeur stockées dans Admin > Variables. Persistantes, partagées entre tous les DAGs.

Créer une Variable

Dans l'interface : Admin > Variables > +

KeyVal
chemin_source/data/ventes/
config_pipeline{"seuil": 5}

Accès en Python (@task)

from airflow.models import Variable # Chaîne simple chemin = Variable.get( 'chemin_source', default_var='/data/default/') # JSON → dictionnaire Python config = Variable.get( 'config_pipeline', deserialize_json=True)
Anti-pattern : Ne jamais appeler Variable.get() au top level du fichier — le scheduler relit les fichiers toutes les 30s, provoquant une requête DB à chaque fois. Appelez-le dans une fonction de tâche.
38
Module 3

Params vs Variables — comparaison

paramsVariable
PortéeUn seul DAGRunGlobale (tous les DAGs)
StockageConfiguration du runBase de données Airflow
ModificationFormulaire au déclenchementAdmin > Variables
Accès Pythoncontext['params']['x']Variable.get('x')
Cas d'usageOverrides ponctuels, testsConfig globale (chemins, emails, seuils)
Les deux sont aussi accessibles via le templating Jinja — on verra comment dans le module suivant.
39
04

Jinja Templating,
Macros et Variables

Injecter des valeurs dynamiques dans vos tâches

40
Module 4

Pourquoi le templating ?

Jusqu'ici, vos tâches utilisaient des valeurs en dur. En production, un pipeline doit adapter ses valeurs au contexte d'exécution : chemins datés, requêtes SQL filtrées par période, messages dynamiques.

Sans templating
bash_command='cp /data/2024-03-15/ ...'

Valeur en dur — il faut modifier le code à chaque exécution

Avec templating Jinja
bash_command='cp /data/{{ ds }}/ ...'

Remplacé automatiquement par la date logique du run

Syntaxe {{ ... }} — remplacée au moment de l'exécution de la tâche, pas au chargement du fichier par le scheduler.

41
Module 4

Pourquoi pas simplement Python ?

On pourrait écrire datetime.now() dans un PythonOperator. Mais cela pose deux problèmes :

1. Mauvaise date
datetime.now().strftime('%Y-%m-%d')

Renvoie la date réelle d'exécution, pas la date logique du run. En cas de rattrapage ou relance, vous obtenez la mauvaise date.

2. Pas de Python partout
BashOperator(bash_command='...') SQLExecuteQueryOperator(sql='...')

Ces opérateurs n'exécutent pas de Python — le templating Jinja est le seul moyen d'y injecter des valeurs dynamiques.

Jinja est le langage universel d'Airflow pour injecter des valeurs dynamiques, quel que soit le type d'opérateur.
42
Module 4

Params et Variables en Jinja

Dans le module précédent, on accédait aux params et variables en Python. Le templating Jinja permet de les injecter directement dans les champs des opérateurs.

Params

BashOperator( task_id='lancer', bash_command='echo "Env : {{ params.environnement }}"', ) # → echo "Env : developpement"

Variables Airflow

BashOperator( task_id='afficher', bash_command='echo "{{ var.value.chemin_source }}"', ) # → echo "/data/ventes/" # Sous-clé JSON '{{ var.json.config_pipeline.seuil_erreur }}' # → 5
template_fields — Le templating ne fonctionne que dans certains champs de l'opérateur (bash_command, sql…). Vérifiez avec : print(BashOperator.template_fields)
43
Module 4

Autres variables de template

Au-delà des params et variables, Airflow injecte automatiquement les informations du contexte d'exécution.

Dates et temps

VariableDescriptionExemple
{{ ds }}Date logique (YYYY-MM-DD)2024-03-15
{{ ds_nodash }}Date logique sans tirets20240315
{{ ts }}Timestamp complet ISO 86012024-03-15T00:00:00+01:00
{{ data_interval_start }}Début de la période traitée2024-03-15T00:00:00+01:00
{{ data_interval_end }}Fin de la période traitée2024-03-16T00:00:00+01:00

Métadonnées

VariableDescription
{{ dag.dag_id }}ID du DAG courant
{{ task_instance.task_id }}ID de la tâche en cours
{{ params.xxx }}Paramètre du DAG (Module 3)
{{ var.value.xxx }}Variable Airflow (Module 3)
44
Module 4

Filtres et Macros

Filtres Jinja

Transforment une valeur avec le pipe |

# Remplacer les tirets {{ ds | replace('-', '/') }} # → 2024/03/15 # Valeur par défaut {{ var | default('N/A') }}

Macros Airflow

Fonctions appelées dans les {{ }}

# Ajouter/soustraire des jours {{ macros.ds_add(ds, -7) }} # → 2024-03-08 # Reformater une date {{ macros.ds_format(ds, '%Y-%m-%d', '%d/%m/%Y') }} # → 15/03/2024
45
Module 4

Macros personnalisées

Le paramètre user_defined_macros du DAG permet de définir vos propres variables et fonctions accessibles dans les templates — sans préfixe.

Chaîne simple

DAG( dag_id='mon_dag', user_defined_macros={ 'nom_equipe': 'Infrastructure IT', 'version': '2.1.3', }, ) # Dans un template : 'Rapport par {{ nom_equipe }} v{{ version }}' # → Rapport par Infrastructure IT v2.1.3

Fonction Python

def semaine(ds): from datetime import datetime d = datetime.strptime(ds, '%Y-%m-%d') return d.isocalendar()[1] DAG( user_defined_macros={ 'semaine': semaine}, ) # Dans un template : 'Semaine n° {{ semaine(ds) }}' # → Semaine n° 11
46
05

Gestion du
Temps

Fuseaux horaires, changement d'heure, intervalle de données et catchup

47
Module 5

UTC par défaut et pendulum

Sans fuseau horaire, 0 8 * * * = 8h UTC = 9h en hiver ou 10h en été à Paris.

Piège silencieux
start_date=datetime(2024, 1, 1) # UTC implicite !
Solution
import pendulum start_date=pendulum.datetime( 2024, 1, 1, tz="Europe/Paris" )
Avec tz="Europe/Paris", Airflow interprète le cron comme 8h heure de Paris et gère CET/CEST automatiquement.
48
Module 5

Le changement d'heure

Heure d'été (mars)

2h → 3h : la plage 2h-3h n'existe pas

Un DAG planifié à 2h30 → ne tourne pas

Heure d'hiver (octobre)

3h → 2h : la plage 2h-3h se produit 2 fois

Un DAG planifié à 2h30 → peut tourner 2 fois

Règle d'or : Ne planifiez pas de tâches critiques entre 2h00 et 3h00 heure locale.
FormatComportement au changement d'heure
Cron "0 8 * * *"Suit l'heure locale — toujours 8h Paris
timedelta(hours=24)Compte exactement 24h — décale d'une heure
49
Module 5

L'intervalle de données

Chaque run traite une période, pas un instant. Un DAG quotidien du 1er janvier ne tourne pas le 1er — il tourne le 2 janvier pour traiter les données du 1er.

Période traitéelogical_date (= ds)Moment réel du run
01 jan → 02 jan2024-01-012 janvier 00:00
02 jan → 03 jan2024-01-023 janvier 00:00
03 jan → 04 jan2024-01-034 janvier 00:00
Idempotence : Utilisez {{ ds }} au lieu de datetime.now() — un run relancé traite toujours les mêmes données.
50
Module 5

Catchup vs Backfill

CatchupBackfill
DéclenchementAutomatique à l'activationManuel via CLI
PérimètreTout depuis start_datePlage choisie
OrdreTous en parallèle Chronologique
RisqueSurcharge massiveMaîtrisé
UsageQuasi jamaisAprès correction de bug
# Backfill ciblé via la CLI airflow dags backfill \ --start-date 2024-01-01 \ --end-date 2024-01-31 \ mon_dag
51
06

Gestion des Erreurs
et Trigger Rules

Retries, alertes, interventions manuelles et règles de déclenchement

52
Module 6

Retries automatiques

FAILED
attente retry_delay
UP_FOR_RETRY
RUNNING
default_args = { 'retries': 3, # 3 tentatives supplémentaires 'retry_delay': timedelta(minutes=5), # 5 min entre chaque 'retry_exponential_backoff': True, # 5min, 10min, 20min... 'max_retry_delay': timedelta(hours=1), # plafond du backoff }

La plupart des erreurs sont transitoires (coupure réseau, API 503). Les retries les absorbent automatiquement.

53
Module 6

Alertes — email

Méthode la plus simple : Airflow envoie un email automatiquement après un échec définitif (retries épuisés).

default_args = { 'email': ['team@company.com'], 'email_on_failure': True, # après échec définitif 'email_on_retry': False, # pas à chaque retry }
Nécessite un serveur SMTP configuré dans airflow.cfg (section [smtp]).
54
Module 6

Alertes — callback

Pour Slack, Teams ou tout webhook : on_failure_callback reçoit un dictionnaire context avec toutes les infos sur l'échec.

def send_alert(context): dag_id = context['dag'].dag_id task_id = context['task_instance'].task_id date = context['ds'] err = context.get('exception', '?') # requests.post(WEBHOOK, json=...)
CléContenu
context['dag'].dag_idID du DAG
context['task_instance'].task_idID de la tâche
context['execution_date']Date logique (datetime)
context['ds']Date logique (str)
context['exception']L'erreur qui a causé l'échec
Au niveau du DAG (toutes les tâches) :
DAG(on_failure_callback=send_alert)
Au niveau d'une tâche (prioritaire) :
BashOperator(..., on_failure_callback=fn)
55
Module 6

SLAs — délais garantis

Un SLA (Service Level Agreement) définit le temps maximum qu'une tâche devrait mettre. Si ce délai est dépassé, Airflow déclenche une alerte — mais n'arrête pas la tâche.

Sur une tâche

extraction = BashOperator( task_id='extraction', bash_command='...', sla=timedelta(hours=1), # max 1h )

Callback personnalisé

def sla_alert(dag, task_list, **kwargs): print(f"SLA raté : {task_list}") # envoyer Slack, email... DAG( sla_miss_callback=sla_alert, )
SLA vs dagrun_timeout — Le SLA alerte sans interrompre, tandis que dagrun_timeout tue le DAGRun entier. Utilisez les deux ensemble pour surveiller puis couper si nécessaire.
56
Module 6

Interventions manuelles : le Clear

Dans Airflow, on ne "relance" pas — on efface l'état. La tâche revient dans la file et Airflow la relance.

FAILED
— Clear →
(sans état)
RUNNING
Workflow de correction :
  1. DAG échoue (tâche rouge dans l'UI)
  2. Lire les logs pour identifier l'erreur
  3. Corriger le code, sauvegarder (~30s pour recharger)
  4. Clear la tâche (avec Downstream coché)
57
Module 6

Trigger Rules

Par défaut, une tâche exige que toutes ses tâches amont aient réussi (all_success). Mais ce comportement est parfois trop strict :

Trigger RuleS'exécute si…Cas d'usage
all_successToutes amont ont réussiDéfaut, pipeline linéaire
one_success≥1 amont a réussiRapport partiel tolérant
all_doneToutes terminées (peu importe le résultat)Nettoyage, libération de ressources
none_failed_min_one_successAucun échec + ≥1 succèsConvergence après branchement
alwaysToujoursLogging, métriques
nettoyage = BashOperator( task_id='nettoyage', bash_command='rm -rf /tmp/pipeline_*', trigger_rule='all_done', # s'exécute toujours, même après un échec )
58
Module 6

Autonomie en couches

Airflow adopte une stratégie de dégradation progressive. Avant d'alerter un humain, il essaie de se débrouiller seul.

Tâche échoue
1. Retry automatique
Attend retry_delay, réessaie jusqu'à retries fois
encore en échec
2. Alerte automatique
Email, Slack, Teams via on_failure_callback
3. Intervention humaine
Lire les logs, corriger, Clear
59
07

Données
et SQL

Connections, opérateurs SQL, XComs et pipelines complets

60
Module 7

Chef d'orchestre, pas porteur de données

C'est la distinction la plus importante à retenir sur Airflow.

AIRFLOW
task_1 → task_2 → task_3
"Fais ce SELECT" · "Insère ces lignes"
instructions ↓   confirmations ↑
SYSTÈMES EXTERNES
PostgreSQL · Snowflake · S3 · API
Font le vrai travail de données
Airflow envoie des instructions et récupère des confirmations légères (un code de retour, un compteur, un identifiant). Il ne doit jamais charger des tables entières en mémoire.
61
Module 7

Connections

Un coffre-fort centralisé pour les identifiants, accessible via Admin > Connections.

Mauvaise pratique
conn = psycopg2.connect( host='serveur', password='motdepasse123' ) # visible dans Git !
Bonne pratique
SQLExecuteQueryOperator( conn_id='my_prod_db', sql='SELECT COUNT(*) ...', ) # aucun secret dans le code

Supporte nativement : PostgreSQL, MySQL, SQLite, Snowflake, BigQuery, S3, HTTP…

62
Module 7

SQLExecuteQueryOperator

Exécute du SQL via un conn_id, avec support complet du templating Jinja.

from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator creer_table = SQLExecuteQueryOperator( task_id='creer_table', conn_id='sqlite_default', sql=""" CREATE TABLE IF NOT EXISTS commandes ( id INTEGER PRIMARY KEY AUTOINCREMENT, date_cmd TEXT, produit TEXT, quantite INTEGER ); """, )
63
Module 7

XComs (Cross-Communication)

XCom = mécanisme pour transmettre des valeurs entre tâches d'un même DAGRun. Stocké dans la base de données interne d'Airflow, visible dans Admin > XComs.

Tâche A
produit un XCom
— stocké en DB →
Base Airflow
clé-valeur
— lu par →
Tâche B
récupère le XCom

Comment produire un XCom

  • @task : le return est automatiquement stocké
  • SQLExecuteQueryOperator : le résultat du SELECT est stocké
  • Manuellement : ti.xcom_push(key, value)

Comment récupérer un XCom

  • @task : passer le résultat en argument
  • .output : pont entre opérateur classique et @task
  • Manuellement : ti.xcom_pull(task_ids='...')
Règle d'or : uniquement des métadonnées légères (compteur, identifiant, chemin). Jamais de DataFrames, tables ou fichiers — vous satureriez la base Airflow.
64
Module 7

XComs — en pratique

TaskFlow (automatique)

@task() def extraire(): return {"nb_lignes": 100} @task() def transformer(donnees): # donnees = {"nb_lignes": 100} # passé via XCom automatiquement return donnees transformer(extraire())

Opérateur classique + .output

calcul = SQLExecuteQueryOperator( task_id='calcul', conn_id='sqlite_default', sql="SELECT SUM(montant)...", ) @task() def afficher(total): # total = [(1249.94,)] print(f"Total : {total[0][0]}") afficher(calcul.output)
Le résultat d'un SELECT est une liste de tuples : [(1249.94,)]. Pour extraire la valeur : total[0][0] (première ligne, première colonne).
65
Module 7

XComs — multiple_outputs

Par défaut, une @task produit un seul XCom (clé return_value). Avec multiple_outputs=True, chaque clé du dictionnaire retourné devient un XCom séparé.

Sans multiple_outputs

@task() def extraire(): return {"nb": 100, "src": "API"} # 1 XCom : return_value = {"nb": 100, "src": "API"} # La tâche suivante reçoit tout le dict result = extraire() transformer(result) # reçoit le dict entier

Avec multiple_outputs

@task(multiple_outputs=True) def extraire(): return {"nb": 100, "src": "API"} # 2 XComs séparés : nb = 100, src = "API" # On peut cibler une seule clé result = extraire() traiter_nb(result["nb"]) # reçoit 100 traiter_src(result["src"]) # reçoit "API"
Utile quand une étape d'extraction produit plusieurs valeurs destinées à des tâches différentes en aval.
66
Module 7

XComs — TaskFlow vs Opérateurs

Deux façons de manipuler les XComs selon le type de tâche.

TaskFlow (@task)Opérateur classique
Produirereturn valeurAutomatique (ex: résultat SELECT) ou ti.xcom_push(key, val)
RécupérerPasser en argument : fn(result).output ou ti.xcom_pull(task_ids='...')
Multi-valeursmultiple_outputs=TruePlusieurs xcom_push avec des clés différentes
Pont entre les deuxfn(operateur.output) — passe le XCom d'un opérateur à une @task
Via Jinja{{ ti.xcom_pull(task_ids='...', key='...') }} — dans tout champ templatisable

Python (PythonOperator)

def produire(ti): ti.xcom_push(key='nb', value=42) def consommer(ti): nb = ti.xcom_pull( task_ids='produire', key='nb') print(nb) # → 42

Jinja (BashOperator, SQL…)

BashOperator( task_id='afficher', bash_command='echo "{{ ti.xcom_pull( task_ids=\'produire\', key=\'nb\') }}"', ) # → echo "42"
67
Module 7

Pipeline SQL complet

créer_table
insérer ({{ ds }})
calculer (XCom)
afficher (@task)
Résultat : Laptop (999.99) + Souris (3×29.99) + Clavier (2×79.99) = 1 249.94 EUR
68

Récapitulatif

01-02

DAGs, opérateurs, dépendances, branchements, sensors

03-04

Paramètres, planification, Jinja templating, macros

05-06

Gestion du temps, fuseaux, erreurs, retries, trigger rules

07

Connections, SQL, XComs, pipelines complets

Formation Apache Airflow — Merci !

69