Aller au contenu principal

Tâches asynchrones avec Celery et FastAPI

remarque

Ce chapitre vient après Notifications et communication utilisateur avec FastAPI. Il montre comment sortir certaines actions du cycle direct de la requête HTTP pour les exécuter en arrière-plan avec Celery et Redis.

Introduction

Dans le chapitre précédent, les emails et SMS partent encore depuis le cycle immédiat de la requête via BackgroundTasks. C’est très bien pour apprendre et pour des volumes modestes. Le rôle de ce chapitre est de franchir l’étape suivante : sortir ces envois du process web et les déléguer à une vraie file de tâches.

Ce que ce chapitre va accomplir

À la fin de ce chapitre, tu auras :

  • un vrai module worker/tasks.py,
  • une app Celery connectée à Redis,
  • des tâches asynchrones pour :
    • email texte,
    • email template,
    • SMS,
  • une évolution de services/notification.py pour déléguer les envois à Celery,
  • une mise à jour minimale de config.py pour Redis,
  • la commande worker à lancer localement ou via Docker,
  • une compréhension claire de :
    • broker,
    • backend,
    • worker,
    • .delay(),
    • async_to_sync() dans un task worker.

Point de départ recommandé

Ce chapitre suppose que les notifications fonctionnent déjà en version simple. Avant d’ajouter Celery, il faut avoir clarifié :

  • quoi envoyer,
  • à quel moment,
  • avec quelles données,
  • avec quel niveau minimal de sécurité.

Autrement dit, Celery n’ajoute pas le métier : il ajoute l’infrastructure qui exécute ce métier de façon plus robuste.

Source et choix pédagogiques

Ce chapitre s’appuie principalement sur 24-Celery/125-tasks.py. On y retrouve :

  • l’initialisation d’une app Celery,
  • Redis comme broker et backend,
  • des tâches pour email texte, email template et SMS,
  • async_to_sync() pour appeler fastapi-mail depuis le worker.

Ici, on reprend cette logique sans casser l’architecture existante :

  • les routes déclenchent toujours le même service,
  • NotificationService pousse désormais un job Celery,
  • worker/tasks.py exécute le travail en arrière-plan.

Architecture visée après ce chapitre

Après ce chapitre, le projet peut ressembler à ceci :

fastapi-base-api/
├── main.py
├── config.py
├── requirements.txt
├── docker-compose.yml
├── core/
│ ├── __init__.py
│ ├── security.py
│ ├── exceptions.py
│ ├── middleware.py
│ ├── responses.py
│ └── url_tokens.py
├── api/
│ ├── __init__.py
│ ├── router.py
│ ├── dependencies.py
│ ├── routers/
│ │ ├── __init__.py
│ │ ├── task.py
│ │ └── user.py
│ └── schemas/
│ ├── __init__.py
│ ├── task.py
│ └── user.py
├── database/
│ ├── __init__.py
│ ├── models.py
│ ├── session.py
│ └── redis.py
├── services/
│ ├── __init__.py
│ ├── task.py
│ ├── user.py
│ └── notification.py
├── templates/
│ ├── mail_email_verify.html
│ └── mail_password_reset.html
└── worker/
├── __init__.py
└── tasks.py

Repères utiles avant le code

1. Redis n’exécute pas les tâches

Redis sert ici de :

  • broker,
  • backend.

Mais c’est le worker Celery qui exécute réellement les jobs.

2. .delay() ne fait pas le travail

.delay() met juste une tâche dans la file. Le travail réel sera fait par le worker.

3. Une tâche Celery reçoit des données simples

Évite de lui passer :

  • session SQLAlchemy,
  • objets complexes non sérialisables,
  • request FastAPI.

Passe plutôt :

  • des chaînes,
  • des listes,
  • des dicts simples,
  • des IDs.

4. fastapi-mail est async, mais Celery task est souvent sync

C’est pourquoi la source utilise async_to_sync(). C’est une idée importante à retenir.

Étape 1 — ajouter les dépendances utiles

Complète requirements.txt avec :

celery[redis]
redis
asgiref

Ton fichier complet peut donc maintenant contenir au minimum :

fastapi
uvicorn[standard]
sqlmodel
sqlalchemy
asyncpg
pydantic-settings
scalar-fastapi
passlib[bcrypt]
PyJWT
python-multipart
email-validator
pytest
pytest-asyncio
httpx
aiosqlite
jinja2
fastapi-mail
itsdangerous
twilio
celery[redis]
redis
asgiref

Installe ensuite :

pip install -r requirements.txt

Étape 2 — faire évoluer config.py pour Redis

Si ce n’est pas déjà fait dans ton projet, ajoute Redis à la configuration DB.

class DatabaseSettings(BaseSettings):
POSTGRES_SERVER: str
POSTGRES_PORT: int
POSTGRES_USER: str
POSTGRES_PASSWORD: str
POSTGRES_DB: str

REDIS_HOST: str = "localhost"
REDIS_PORT: int = 6379

model_config = _base_config

@property
def POSTGRES_URL(self) -> str:
return (
f"postgresql+asyncpg://{self.POSTGRES_USER}:{self.POSTGRES_PASSWORD}"
f"@{self.POSTGRES_SERVER}:{self.POSTGRES_PORT}/{self.POSTGRES_DB}"
)

def REDIS_URL(self, db: int = 0) -> str:
return f"redis://{self.REDIS_HOST}:{self.REDIS_PORT}/{db}"

Pourquoi REDIS_URL(db=...) est pratique

Parce qu’on peut facilement séparer plusieurs usages Redis par base logique :

  • base 0 pour autre chose,
  • base 9 pour Celery par exemple.

C’est exactement ce que montre la source.

Étape 3 — créer worker/__init__.py

Crée un package worker :

mkdir -p worker
touch worker/__init__.py worker/tasks.py

Étape 4 — créer worker/tasks.py

Crée maintenant le vrai worker Celery.

from asgiref.sync import async_to_sync
from celery import Celery
from fastapi_mail import ConnectionConfig, FastMail, MessageSchema, MessageType
from pydantic import EmailStr
from twilio.rest import Client
from pathlib import Path

from config import db_settings, notification_settings


BASE_DIR = Path(__file__).resolve().parent.parent
TEMPLATE_DIR = BASE_DIR / "templates"

fast_mail = FastMail(
ConnectionConfig(
MAIL_USERNAME=notification_settings.MAIL_USERNAME,
MAIL_PASSWORD=notification_settings.MAIL_PASSWORD,
MAIL_FROM=notification_settings.MAIL_FROM,
MAIL_PORT=notification_settings.MAIL_PORT,
MAIL_SERVER=notification_settings.MAIL_SERVER,
MAIL_FROM_NAME=notification_settings.MAIL_FROM_NAME,
MAIL_STARTTLS=notification_settings.MAIL_STARTTLS,
MAIL_SSL_TLS=notification_settings.MAIL_SSL_TLS,
USE_CREDENTIALS=notification_settings.USE_CREDENTIALS,
VALIDATE_CERTS=notification_settings.VALIDATE_CERTS,
TEMPLATE_FOLDER=TEMPLATE_DIR,
)
)

send_message_sync = async_to_sync(fast_mail.send_message)


twilio_client = None
if (
notification_settings.TWILIO_SID
and notification_settings.TWILIO_AUTH_TOKEN
and notification_settings.TWILIO_NUMBER
):
twilio_client = Client(
notification_settings.TWILIO_SID,
notification_settings.TWILIO_AUTH_TOKEN,
)


celery_app = Celery(
"task_api_tasks",
broker=db_settings.REDIS_URL(9),
backend=db_settings.REDIS_URL(9),
broker_connection_retry_on_startup=True,
)


@celery_app.task(name="notifications.send_mail")
def send_mail_task(
recipients: list[str],
subject: str,
body: str,
):
send_message_sync(
message=MessageSchema(
recipients=recipients,
subject=subject,
body=body,
subtype=MessageType.plain,
)
)
return "Message sent"


@celery_app.task(name="notifications.send_email_template")
def send_email_template_task(
recipients: list[EmailStr],
subject: str,
context: dict,
template_name: str,
):
send_message_sync(
message=MessageSchema(
recipients=recipients,
subject=subject,
template_body=context,
subtype=MessageType.html,
),
template_name=template_name,
)
return "Template email sent"


@celery_app.task(name="notifications.send_sms")
def send_sms_task(to: str, body: str):
if twilio_client is None or notification_settings.TWILIO_NUMBER is None:
return "SMS provider not configured"

twilio_client.messages.create(
from_=notification_settings.TWILIO_NUMBER,
to=to,
body=body,
)
return "SMS sent"

Ce qu’il faut remarquer ici

async_to_sync(fast_mail.send_message)

C’est le point le plus important. La fonction mail de fastapi-mail est async. Le task Celery, lui, est écrit ici comme fonction sync.

Donc on fait un pont entre les deux.

celery_app = Celery(...)

On définit une vraie application worker distincte de FastAPI.

Broker et backend Redis

On utilise Redis pour :

  • transporter les tâches,
  • stocker les états/résultats.

Étape 5 — faire évoluer services/notification.py

Maintenant, au lieu d’envoyer directement via BackgroundTasks, on délègue à Celery.

Remplace le service précédent par ceci :

from pydantic import EmailStr

from worker.tasks import (
send_email_template_task,
send_mail_task,
send_sms_task,
)


class NotificationService:
async def send_email(
self,
recipients: list[EmailStr],
subject: str,
body: str,
):
send_mail_task.delay(recipients, subject, body)

async def send_email_template(
self,
recipients: list[EmailStr],
subject: str,
context: dict,
template_name: str,
):
send_email_template_task.delay(recipients, subject, context, template_name)

async def send_sms(self, to: str, body: str):
send_sms_task.delay(to, body)

Ce qui change conceptuellement

Avant :

  • le process FastAPI planifiait l’envoi via BackgroundTasks.

Maintenant :

  • le process FastAPI pousse juste un message dans Redis,
  • le worker Celery fait le travail réel.

Étape 6 — simplifier api/dependencies.py

Comme NotificationService n’a plus besoin de BackgroundTasks dans son constructeur, la dépendance devient plus simple.

def get_notification_service() -> NotificationService:
return NotificationService()


NotificationServiceDep = Annotated[
NotificationService,
Depends(get_notification_service),
]

Étape 7 — garder api/routers/user.py presque identique

Bonne nouvelle : les routes du chapitre Notifications et communication utilisateur avec FastAPI changent très peu.

C’est justement un très bon signal architectural.

Tu peux garder les appels de ce type :

await notifications.send_email_template(...)
await notifications.send_sms(...)

Le routeur ne sait pas si derrière :

  • tu utilises BackgroundTasks,
  • ou Celery.

Et c’est excellent.

Pourquoi c’est une vraie victoire d’architecture

Cela veut dire que :

  • la couche API exprime une intention métier,
  • la stratégie d’exécution peut changer derrière,
  • sans casser toute l’interface applicative.

Étape 8 — lancer le worker localement

Si tu travailles hors Docker, il faut avoir Redis en route, puis lancer Celery.

Exemple :

celery -A worker.tasks.celery_app worker --loglevel=info

Dans un autre terminal, tu lances toujours ton API FastAPI normalement.

Étape 9 — brancher la version Docker Compose

Le chapitre Docker avec FastAPI avait déjà préparé le terrain. Ici, on précise la commande worker.

Dans docker-compose.yml, le service Celery peut ressembler à ceci :

celery:
build: .
env_file:
- .env
environment:
POSTGRES_SERVER: db
POSTGRES_PORT: 5432
REDIS_HOST: redis
REDIS_PORT: 6379
depends_on:
db:
condition: service_healthy
redis:
condition: service_started
command: ["celery", "-A", "worker.tasks.celery_app", "worker", "--loglevel=info"]

Et n’oublie pas le service Redis :

redis:
image: redis:7
command: redis-server --appendonly yes
volumes:
- redis_data:/data

Étape 10 — ce que devient le flux utilisateur

Prenons l’exemple du reset password.

Avant :

  1. l’API reçoit la requête,
  2. NotificationService utilise BackgroundTasks,
  3. le process API reste responsable du déclenchement final.

Après Celery :

  1. l’API reçoit la requête,
  2. NotificationService appelle .delay(),
  3. Redis reçoit le job,
  4. le worker Celery traite le job,
  5. l’API peut répondre vite sans porter l’exécution complète.

Vérifications pratiques

1. Vérifier que Redis est disponible

Si tu es en Docker :

docker compose ps

Tu dois voir redis et celery en cours d’exécution.

2. Vérifier les logs du worker

docker compose logs celery -f

ou en local, directement dans le terminal du worker.

3. Déclencher un email de reset

curl -i \
-X POST http://localhost:8000/user/password-reset/request \
-H "Content-Type: application/json" \
-d '{"email": "enes@test.local"}'

Côté API :

  • la réponse doit revenir vite.

Côté worker Celery :

  • tu dois voir le job être reçu/exécuté.

4. Déclencher un SMS

TOKEN="..."

curl -i \
-X POST http://localhost:8000/user/notify/sms \
-H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
-d '{
"phone_number": "+33123456789",
"message": "Celery test notification"
}'

Là encore :

  • l’API répond rapidement,
  • le worker traite ensuite le job.

Erreurs fréquentes

Erreur 1 — rien ne se passe après .delay()

Cause fréquente :

  • le worker Celery n’est pas lancé.

Correction :

  • lance celery -A worker.tasks.celery_app worker --loglevel=info
  • ou démarre le service celery dans Compose.

Erreur 2 — connexion Redis refusée

Cause fréquente :

  • Redis n’est pas démarré,
  • ou REDIS_HOST / REDIS_PORT sont faux.

Correction :

  • vérifie la config et les logs.

Erreur 3 — fastapi-mail casse dans Celery

Cause fréquente :

  • oubli de async_to_sync().

Correction :

  • garde une fonction sync autour de fast_mail.send_message.

Erreur 4 — le worker n’importe pas correctement le module

Cause fréquente :

  • mauvais chemin -A.

Correction :

  • vérifie la commande :
celery -A worker.tasks.celery_app worker --loglevel=info

Erreur 5 — tu passes des objets trop complexes au task

Cause fréquente :

  • passage d’un objet DB, d’une session, ou d’un type non sérialisable.

Correction :

  • passe seulement des primitives ou dicts simples.

Bonnes pratiques à retenir

1. Commence avec des jobs simples

Les notifications sont un très bon premier use case Celery.

2. Passe des données sérialisables

IDs, strings, listes, dicts simples.

3. Garde la logique métier côté service/app

Le worker exécute la tâche. Il ne doit pas devenir le centre de toute l’architecture.

4. Observe les logs worker séparément des logs API

Sinon tu auras du mal à comprendre où un problème se produit.

5. Garde une progression réaliste

BackgroundTasks d’abord. Celery ensuite. C’est un très bon chemin pédagogique.

Checklist de validation

À la fin, tu dois pouvoir cocher tout ceci :

  • j’ai ajouté celery[redis], redis et asgiref
  • j’ai une méthode REDIS_URL() dans la config
  • j’ai créé worker/tasks.py
  • j’ai créé une celery_app
  • je comprends pourquoi async_to_sync() est utilisé
  • NotificationService délègue maintenant avec .delay()
  • je sais lancer le worker Celery
  • je sais relier Celery à Redis
  • je sais lire les logs du worker
  • je comprends pourquoi Celery est plus robuste que BackgroundTasks pour ce type de flux

Résumé final

Dans ce chapitre, tu as franchi une étape importante : les notifications ne sont plus seulement déclenchées à l’intérieur du process FastAPI.

Elles passent maintenant par une vraie file asynchrone :

  • FastAPI reçoit la requête,
  • Redis transporte le job,
  • Celery exécute le travail.

C’est une différence majeure parce qu’elle prépare ton application à devenir :

  • plus robuste,
  • plus scalable,
  • plus propre dans la séparation entre requête HTTP et travail différé.

Pour aller plus loin