Tâches asynchrones avec Celery et FastAPI
Ce chapitre vient après Notifications et communication utilisateur avec FastAPI. Il montre comment sortir certaines actions du cycle direct de la requête HTTP pour les exécuter en arrière-plan avec Celery et Redis.
Introduction
Dans le chapitre précédent, les emails et SMS partent encore depuis le cycle immédiat de la requête via BackgroundTasks.
C’est très bien pour apprendre et pour des volumes modestes.
Le rôle de ce chapitre est de franchir l’étape suivante : sortir ces envois du process web et les déléguer à une vraie file de tâches.
Ce que ce chapitre va accomplir
À la fin de ce chapitre, tu auras :
- un vrai module
worker/tasks.py, - une app Celery connectée à Redis,
- des tâches asynchrones pour :
- email texte,
- email template,
- SMS,
- une évolution de
services/notification.pypour déléguer les envois à Celery, - une mise à jour minimale de
config.pypour Redis, - la commande worker à lancer localement ou via Docker,
- une compréhension claire de :
- broker,
- backend,
- worker,
.delay(),async_to_sync()dans un task worker.
Point de départ recommandé
Ce chapitre suppose que les notifications fonctionnent déjà en version simple. Avant d’ajouter Celery, il faut avoir clarifié :
- quoi envoyer,
- à quel moment,
- avec quelles données,
- avec quel niveau minimal de sécurité.
Autrement dit, Celery n’ajoute pas le métier : il ajoute l’infrastructure qui exécute ce métier de façon plus robuste.
Source et choix pédagogiques
Ce chapitre s’appuie principalement sur 24-Celery/125-tasks.py.
On y retrouve :
- l’initialisation d’une app Celery,
- Redis comme broker et backend,
- des tâches pour email texte, email template et SMS,
async_to_sync()pour appelerfastapi-maildepuis le worker.
Ici, on reprend cette logique sans casser l’architecture existante :
- les routes déclenchent toujours le même service,
NotificationServicepousse désormais un job Celery,worker/tasks.pyexécute le travail en arrière-plan.
Architecture visée après ce chapitre
Après ce chapitre, le projet peut ressembler à ceci :
fastapi-base-api/
├── main.py
├── config.py
├── requirements.txt
├── docker-compose.yml
├── core/
│ ├── __init__.py
│ ├── security.py
│ ├── exceptions.py
│ ├── middleware.py
│ ├── responses.py
│ └── url_tokens.py
├── api/
│ ├── __init__.py
│ ├── router.py
│ ├── dependencies.py
│ ├── routers/
│ │ ├── __init__.py
│ │ ├── task.py
│ │ └── user.py
│ └── schemas/
│ ├── __init__.py
│ ├── task.py
│ └── user.py
├── database/
│ ├── __init__.py
│ ├── models.py
│ ├── session.py
│ └── redis.py
├── services/
│ ├── __init__.py
│ ├── task.py
│ ├── user.py
│ └── notification.py
├── templates/
│ ├── mail_email_verify.html
│ └── mail_password_reset.html
└── worker/
├── __init__.py
└── tasks.py
Repères utiles avant le code
1. Redis n’exécute pas les tâches
Redis sert ici de :
- broker,
- backend.
Mais c’est le worker Celery qui exécute réellement les jobs.
2. .delay() ne fait pas le travail
.delay() met juste une tâche dans la file.
Le travail réel sera fait par le worker.
3. Une tâche Celery reçoit des données simples
Évite de lui passer :
- session SQLAlchemy,
- objets complexes non sérialisables,
- request FastAPI.
Passe plutôt :
- des chaînes,
- des listes,
- des dicts simples,
- des IDs.
4. fastapi-mail est async, mais Celery task est souvent sync
C’est pourquoi la source utilise async_to_sync().
C’est une idée importante à retenir.
Étape 1 — ajouter les dépendances utiles
Complète requirements.txt avec :
celery[redis]
redis
asgiref
Ton fichier complet peut donc maintenant contenir au minimum :
fastapi
uvicorn[standard]
sqlmodel
sqlalchemy
asyncpg
pydantic-settings
scalar-fastapi
passlib[bcrypt]
PyJWT
python-multipart
email-validator
pytest
pytest-asyncio
httpx
aiosqlite
jinja2
fastapi-mail
itsdangerous
twilio
celery[redis]
redis
asgiref
Installe ensuite :
pip install -r requirements.txt
Étape 2 — faire évoluer config.py pour Redis
Si ce n’est pas déjà fait dans ton projet, ajoute Redis à la configuration DB.
class DatabaseSettings(BaseSettings):
POSTGRES_SERVER: str
POSTGRES_PORT: int
POSTGRES_USER: str
POSTGRES_PASSWORD: str
POSTGRES_DB: str
REDIS_HOST: str = "localhost"
REDIS_PORT: int = 6379
model_config = _base_config
@property
def POSTGRES_URL(self) -> str:
return (
f"postgresql+asyncpg://{self.POSTGRES_USER}:{self.POSTGRES_PASSWORD}"
f"@{self.POSTGRES_SERVER}:{self.POSTGRES_PORT}/{self.POSTGRES_DB}"
)
def REDIS_URL(self, db: int = 0) -> str:
return f"redis://{self.REDIS_HOST}:{self.REDIS_PORT}/{db}"
Pourquoi REDIS_URL(db=...) est pratique
Parce qu’on peut facilement séparer plusieurs usages Redis par base logique :
- base 0 pour autre chose,
- base 9 pour Celery par exemple.
C’est exactement ce que montre la source.
Étape 3 — créer worker/__init__.py
Crée un package worker :
mkdir -p worker
touch worker/__init__.py worker/tasks.py
Étape 4 — créer worker/tasks.py
Crée maintenant le vrai worker Celery.
from asgiref.sync import async_to_sync
from celery import Celery
from fastapi_mail import ConnectionConfig, FastMail, MessageSchema, MessageType
from pydantic import EmailStr
from twilio.rest import Client
from pathlib import Path
from config import db_settings, notification_settings
BASE_DIR = Path(__file__).resolve().parent.parent
TEMPLATE_DIR = BASE_DIR / "templates"
fast_mail = FastMail(
ConnectionConfig(
MAIL_USERNAME=notification_settings.MAIL_USERNAME,
MAIL_PASSWORD=notification_settings.MAIL_PASSWORD,
MAIL_FROM=notification_settings.MAIL_FROM,
MAIL_PORT=notification_settings.MAIL_PORT,
MAIL_SERVER=notification_settings.MAIL_SERVER,
MAIL_FROM_NAME=notification_settings.MAIL_FROM_NAME,
MAIL_STARTTLS=notification_settings.MAIL_STARTTLS,
MAIL_SSL_TLS=notification_settings.MAIL_SSL_TLS,
USE_CREDENTIALS=notification_settings.USE_CREDENTIALS,
VALIDATE_CERTS=notification_settings.VALIDATE_CERTS,
TEMPLATE_FOLDER=TEMPLATE_DIR,
)
)
send_message_sync = async_to_sync(fast_mail.send_message)
twilio_client = None
if (
notification_settings.TWILIO_SID
and notification_settings.TWILIO_AUTH_TOKEN
and notification_settings.TWILIO_NUMBER
):
twilio_client = Client(
notification_settings.TWILIO_SID,
notification_settings.TWILIO_AUTH_TOKEN,
)
celery_app = Celery(
"task_api_tasks",
broker=db_settings.REDIS_URL(9),
backend=db_settings.REDIS_URL(9),
broker_connection_retry_on_startup=True,
)
@celery_app.task(name="notifications.send_mail")
def send_mail_task(
recipients: list[str],
subject: str,
body: str,
):
send_message_sync(
message=MessageSchema(
recipients=recipients,
subject=subject,
body=body,
subtype=MessageType.plain,
)
)
return "Message sent"
@celery_app.task(name="notifications.send_email_template")
def send_email_template_task(
recipients: list[EmailStr],
subject: str,
context: dict,
template_name: str,
):
send_message_sync(
message=MessageSchema(
recipients=recipients,
subject=subject,
template_body=context,
subtype=MessageType.html,
),
template_name=template_name,
)
return "Template email sent"
@celery_app.task(name="notifications.send_sms")
def send_sms_task(to: str, body: str):
if twilio_client is None or notification_settings.TWILIO_NUMBER is None:
return "SMS provider not configured"
twilio_client.messages.create(
from_=notification_settings.TWILIO_NUMBER,
to=to,
body=body,
)
return "SMS sent"
Ce qu’il faut remarquer ici
async_to_sync(fast_mail.send_message)
C’est le point le plus important.
La fonction mail de fastapi-mail est async.
Le task Celery, lui, est écrit ici comme fonction sync.
Donc on fait un pont entre les deux.
celery_app = Celery(...)
On définit une vraie application worker distincte de FastAPI.
Broker et backend Redis
On utilise Redis pour :
- transporter les tâches,
- stocker les états/résultats.
Étape 5 — faire évoluer services/notification.py
Maintenant, au lieu d’envoyer directement via BackgroundTasks, on délègue à Celery.
Remplace le service précédent par ceci :
from pydantic import EmailStr
from worker.tasks import (
send_email_template_task,
send_mail_task,
send_sms_task,
)
class NotificationService:
async def send_email(
self,
recipients: list[EmailStr],
subject: str,
body: str,
):
send_mail_task.delay(recipients, subject, body)
async def send_email_template(
self,
recipients: list[EmailStr],
subject: str,
context: dict,
template_name: str,
):
send_email_template_task.delay(recipients, subject, context, template_name)
async def send_sms(self, to: str, body: str):
send_sms_task.delay(to, body)
Ce qui change conceptuellement
Avant :
- le process FastAPI planifiait l’envoi via
BackgroundTasks.
Maintenant :
- le process FastAPI pousse juste un message dans Redis,
- le worker Celery fait le travail réel.
Étape 6 — simplifier api/dependencies.py
Comme NotificationService n’a plus besoin de BackgroundTasks dans son constructeur, la dépendance devient plus simple.
def get_notification_service() -> NotificationService:
return NotificationService()
NotificationServiceDep = Annotated[
NotificationService,
Depends(get_notification_service),
]
Étape 7 — garder api/routers/user.py presque identique
Bonne nouvelle : les routes du chapitre Notifications et communication utilisateur avec FastAPI changent très peu.
C’est justement un très bon signal architectural.
Tu peux garder les appels de ce type :
await notifications.send_email_template(...)
await notifications.send_sms(...)
Le routeur ne sait pas si derrière :
- tu utilises
BackgroundTasks, - ou Celery.
Et c’est excellent.
Pourquoi c’est une vraie victoire d’architecture
Cela veut dire que :
- la couche API exprime une intention métier,
- la stratégie d’exécution peut changer derrière,
- sans casser toute l’interface applicative.
Étape 8 — lancer le worker localement
Si tu travailles hors Docker, il faut avoir Redis en route, puis lancer Celery.
Exemple :
celery -A worker.tasks.celery_app worker --loglevel=info
Dans un autre terminal, tu lances toujours ton API FastAPI normalement.
Étape 9 — brancher la version Docker Compose
Le chapitre Docker avec FastAPI avait déjà préparé le terrain. Ici, on précise la commande worker.
Dans docker-compose.yml, le service Celery peut ressembler à ceci :
celery:
build: .
env_file:
- .env
environment:
POSTGRES_SERVER: db
POSTGRES_PORT: 5432
REDIS_HOST: redis
REDIS_PORT: 6379
depends_on:
db:
condition: service_healthy
redis:
condition: service_started
command: ["celery", "-A", "worker.tasks.celery_app", "worker", "--loglevel=info"]
Et n’oublie pas le service Redis :
redis:
image: redis:7
command: redis-server --appendonly yes
volumes:
- redis_data:/data
Étape 10 — ce que devient le flux utilisateur
Prenons l’exemple du reset password.
Avant :
- l’API reçoit la requête,
NotificationServiceutiliseBackgroundTasks,- le process API reste responsable du déclenchement final.
Après Celery :
- l’API reçoit la requête,
NotificationServiceappelle.delay(),- Redis reçoit le job,
- le worker Celery traite le job,
- l’API peut répondre vite sans porter l’exécution complète.
Vérifications pratiques
1. Vérifier que Redis est disponible
Si tu es en Docker :
docker compose ps
Tu dois voir redis et celery en cours d’exécution.
2. Vérifier les logs du worker
docker compose logs celery -f
ou en local, directement dans le terminal du worker.
3. Déclencher un email de reset
curl -i \
-X POST http://localhost:8000/user/password-reset/request \
-H "Content-Type: application/json" \
-d '{"email": "enes@test.local"}'
Côté API :
- la réponse doit revenir vite.
Côté worker Celery :
- tu dois voir le job être reçu/exécuté.
4. Déclencher un SMS
TOKEN="..."
curl -i \
-X POST http://localhost:8000/user/notify/sms \
-H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
-d '{
"phone_number": "+33123456789",
"message": "Celery test notification"
}'
Là encore :
- l’API répond rapidement,
- le worker traite ensuite le job.
Erreurs fréquentes
Erreur 1 — rien ne se passe après .delay()
Cause fréquente :
- le worker Celery n’est pas lancé.
Correction :
- lance
celery -A worker.tasks.celery_app worker --loglevel=info - ou démarre le service
celerydans Compose.
Erreur 2 — connexion Redis refusée
Cause fréquente :
- Redis n’est pas démarré,
- ou
REDIS_HOST/REDIS_PORTsont faux.
Correction :
- vérifie la config et les logs.
Erreur 3 — fastapi-mail casse dans Celery
Cause fréquente :
- oubli de
async_to_sync().
Correction :
- garde une fonction sync autour de
fast_mail.send_message.
Erreur 4 — le worker n’importe pas correctement le module
Cause fréquente :
- mauvais chemin
-A.
Correction :
- vérifie la commande :
celery -A worker.tasks.celery_app worker --loglevel=info
Erreur 5 — tu passes des objets trop complexes au task
Cause fréquente :
- passage d’un objet DB, d’une session, ou d’un type non sérialisable.
Correction :
- passe seulement des primitives ou dicts simples.
Bonnes pratiques à retenir
1. Commence avec des jobs simples
Les notifications sont un très bon premier use case Celery.
2. Passe des données sérialisables
IDs, strings, listes, dicts simples.
3. Garde la logique métier côté service/app
Le worker exécute la tâche. Il ne doit pas devenir le centre de toute l’architecture.
4. Observe les logs worker séparément des logs API
Sinon tu auras du mal à comprendre où un problème se produit.
5. Garde une progression réaliste
BackgroundTasks d’abord.
Celery ensuite.
C’est un très bon chemin pédagogique.
Checklist de validation
À la fin, tu dois pouvoir cocher tout ceci :
- j’ai ajouté
celery[redis],redisetasgiref - j’ai une méthode
REDIS_URL()dans la config - j’ai créé
worker/tasks.py - j’ai créé une
celery_app - je comprends pourquoi
async_to_sync()est utilisé -
NotificationServicedélègue maintenant avec.delay() - je sais lancer le worker Celery
- je sais relier Celery à Redis
- je sais lire les logs du worker
- je comprends pourquoi Celery est plus robuste que
BackgroundTaskspour ce type de flux
Résumé final
Dans ce chapitre, tu as franchi une étape importante : les notifications ne sont plus seulement déclenchées à l’intérieur du process FastAPI.
Elles passent maintenant par une vraie file asynchrone :
- FastAPI reçoit la requête,
- Redis transporte le job,
- Celery exécute le travail.
C’est une différence majeure parce qu’elle prépare ton application à devenir :
- plus robuste,
- plus scalable,
- plus propre dans la séparation entre requête HTTP et travail différé.
Pour aller plus loin
- Parcours de lecture — pour replacer Celery dans le bon parcours du wiki
- FAQ, erreurs fréquentes et conseils pratiques — pour retrouver les pièges Redis/worker les plus probables
- Advanced FastAPI — pour garder la vue d’ensemble du bloc avancé
- Frontend React consommant l’API FastAPI — pour voir ensuite comment un client peut s’appuyer sur ces flows côté utilisateur