🎓 What You Will Learn
- Background Tasks: Run tasks without blocking responses
- Email Sending: Send emails asynchronously
- File Processing: Handle file uploads and batch operations
- Task Queuing: Implement task queues with Celery or RQ
- Error Handling: Retry logic and failure handling
- Monitoring: Track task status and progress
1Why Background Tasks Improve UX
Long-running operations should not block API responses. Background tasks allow you to:
- Return immediately: Client gets a response right away
- Process asynchronously: Task runs in the background
- Improve UX: No timeout waiting for slow operations
- Scale better: Distribute tasks across workers
Rule: If an operation takes more than 1 second, consider making it a background task.
2FastAPI BackgroundTasks Basics
FastAPI includes a simple BackgroundTasks feature for basic use cases.
from fastapi import BackgroundTasks
import smtplib
from email.mime.text import MIMEText
def send_email(email: str, subject: str, body: str):
# Simulate email sending
print(f"Sending email to {email}...")
import time
time.sleep(2)
print("Email sent!")
@router.post("/users/register")
def register_user(
email: str,
background_tasks: BackgroundTasks
):
# Add task to background queue
background_tasks.add_task(
send_email,
email,
"Welcome!",
"Thanks for registering!"
)
return {"message": "User registered", "email": email}📋 Background Task Queue Demo
3Practical: Sending Emails in Background
Email sending is the most common background task. Use a library like aiosmtplib for async email.
import aiosmtplib
from email.mime.text import MIMEText
class EmailService:
async def send_email(
self,
to_email: str,
subject: str,
body: str
):
message = MIMEText(body)
message["Subject"] = subject
message["From"] = "[email protected]"
message["To"] = to_email
async with aiosmtplib.SMTP(hostname="smtp.gmail.com", port=587) as smtp:
await smtp.login("[email protected]", "your-password")
await smtp.send_message(message)4File Processing Tasks
Process uploaded files in the background: resize images, generate PDFs, extract data.
from fastapi import UploadFile
from PIL import Image
import io
def process_image(filename: str, file_data: bytes):
# Load image
image = Image.open(io.BytesIO(file_data))
# Resize
image.thumbnail((500, 500))
# Save
image.save(f"uploads/{filename}")
print(f"Processed {filename}")
@router.post("/upload/image")
def upload_image(
file: UploadFile,
background_tasks: BackgroundTasks
):
# Read file
contents = file.file.read()
# Add background task
background_tasks.add_task(
process_image,
file.filename,
contents
)
return {"filename": file.filename, "status": "processing"}5Setting Up Celery with Redis
For scalable task queuing, use Celery with Redis as the message broker.
from celery import Celery
celery_app = Celery(
"fastapi_app",
broker="redis://localhost:6379",
backend="redis://localhost:6379"
)
@celery_app.task
def send_email_task(email: str, subject: str, body: str):
print(f"Sending email to {email}...")
# Send email logic here
return f"Email sent to {email}"6Using Celery in FastAPI Routes
Call Celery tasks from your FastAPI routes.
from app.celery_app import send_email_task
@router.post("/users/register")
def register_user(email: str, password: str):
# Create user
user = create_user(email, password)
# Send welcome email asynchronously
send_email_task.delay(
email,
"Welcome!",
"Thanks for registering!"
)
return {"user_id": user.id, "email": user.email}7Multi-Step Background Tasks
Chain multiple tasks together for complex workflows.
from celery import chain
@celery_app.task
def step_1_validate_data(data: dict):
print("Step 1: Validating...")
return data
@celery_app.task
def step_2_process_data(data: dict):
print("Step 2: Processing...")
return data
@celery_app.task
def step_3_save_data(data: dict):
print("Step 3: Saving...")
return data
# Chain tasks
workflow = chain(
step_1_validate_data.s(data),
step_2_process_data.s(),
step_3_save_data.s()
)
workflow.apply_async()8Error Handling and Retry Logic
Implement retries for failed tasks with exponential backoff.
@celery_app.task(
bind=True,
max_retries=3,
default_retry_delay=60
)
def send_email_with_retry(self, email: str):
try:
# Send email
send_email(email)
except Exception as exc:
# Retry with exponential backoff
raise self.retry(exc=exc, countdown=2 ** self.request.retries)Idempotency: Make tasks idempotent (safe to run multiple times). A retry might execute the same task twice.
9Logging and Monitoring
Track task execution and debug failures.
import logging
logger = logging.getLogger(__name__)
@celery_app.task
def logged_task(data: dict):
logger.info(f"Starting task with {data}")
try:
# Process data
result = process(data)
logger.info(f"Task completed: {result}")
return result
except Exception as e:
logger.error(f"Task failed: {e}")
raise10Tracking Task Status
Let clients check task progress using task IDs.
@router.post("/process")
def start_processing(data: dict):
# Start async task
task = process_data.delay(data)
return {"task_id": task.id}
@router.get("/task/{task_id}")
def get_task_status(task_id: str):
from app.celery_app import celery_app
task_result = celery_app.AsyncResult(task_id)
return {
"task_id": task_id,
"status": task_result.status,
"result": task_result.result
}11Scheduled Tasks with Celery Beat
Run periodic tasks at scheduled times.
from celery.schedules import crontab
celery_app.conf.beat_schedule = {
"clean-up-every-day": {
"task": "app.tasks.cleanup_old_files",
"schedule": crontab(hour=2, minute=0), # 2 AM daily
},
"send-digest-weekly": {
"task": "app.tasks.send_weekly_digest",
"schedule": crontab(day_of_week=0, hour=9), # Monday 9 AM
},
}12Testing Background Tasks
Test tasks synchronously in development and tests.
from app.celery_app import celery_app
def test_send_email_task():
# Run task synchronously for testing
celery_app.conf.task_always_eager = True
result = send_email_task.delay("[email protected]")
assert result.successful()
assert result.result == "Email sent to [email protected]"13Common Background Task Pitfalls
| Pitfall | Solution |
|---|---|
| Tasks that are not idempotent | Design tasks to be safe if run multiple times |
| No error handling | Implement retry logic and error logging |
| Task dependencies unclear | Use Celery chains and groups for workflows |
| No monitoring | Use Flower or Prometheus for visibility |
| BackgroundTasks for everything | Use Celery for production, BackgroundTasks for simple cases |
14Advanced Patterns
- Task Routing: Send tasks to specific workers based on type
- Priority Queues: Prioritize urgent tasks
- Rate Limiting: Limit task execution rate
- Workflow Engines: Apache Airflow for complex data pipelines
- Message Brokers: RabbitMQ, Kafka for high-volume tasks
15Resources & What's Next
You now understand how to run tasks asynchronously—essential for production applications. Use background tasks for emails, file processing, notifications, and data processing.
Next Topics: Long-running tasks with WebSockets, advanced monitoring, and distributed task execution.
Congratulations! Your API can now handle long-running operations without blocking users. Keep tasks small, idempotent, and well-monitored! 🚀