FastAPI

FastAPI Background Tasks — Async Operations & Queue Management

Thirdy Gayares
15 min read

🎓 What You Will Learn

  • Background Tasks: Run tasks without blocking responses
  • Email Sending: Send emails asynchronously
  • File Processing: Handle file uploads and batch operations
  • Task Queuing: Implement task queues with Celery or RQ
  • Error Handling: Retry logic and failure handling
  • Monitoring: Track task status and progress
FastAPIAsync/AwaitCeleryRedis

1Why Background Tasks Improve UX

Long-running operations should not block API responses. Background tasks allow you to:

  • Return immediately: Client gets a response right away
  • Process asynchronously: Task runs in the background
  • Improve UX: No timeout waiting for slow operations
  • Scale better: Distribute tasks across workers

Rule: If an operation takes more than 1 second, consider making it a background task.

2FastAPI BackgroundTasks Basics

FastAPI includes a simple BackgroundTasks feature for basic use cases.

app/routes/users.py
from fastapi import BackgroundTasks
import smtplib
from email.mime.text import MIMEText

def send_email(email: str, subject: str, body: str):
    # Simulate email sending
    print(f"Sending email to {email}...")
    import time
    time.sleep(2)
    print("Email sent!")

@router.post("/users/register")
def register_user(
    email: str,
    background_tasks: BackgroundTasks
):
    # Add task to background queue
    background_tasks.add_task(
        send_email,
        email,
        "Welcome!",
        "Thanks for registering!"
    )

    return {"message": "User registered", "email": email}

📋 Background Task Queue Demo

Send Welcome Email
completed2s
Generate PDF Report
running5s
Process Image Upload
queued-
✅ Tasks run in background while API returns immediately!

3Practical: Sending Emails in Background

Email sending is the most common background task. Use a library like aiosmtplib for async email.

app/services/email_service.py
import aiosmtplib
from email.mime.text import MIMEText

class EmailService:
    async def send_email(
        self,
        to_email: str,
        subject: str,
        body: str
    ):
        message = MIMEText(body)
        message["Subject"] = subject
        message["From"] = "[email protected]"
        message["To"] = to_email

        async with aiosmtplib.SMTP(hostname="smtp.gmail.com", port=587) as smtp:
            await smtp.login("[email protected]", "your-password")
            await smtp.send_message(message)

4File Processing Tasks

Process uploaded files in the background: resize images, generate PDFs, extract data.

app/routes/files.py
from fastapi import UploadFile
from PIL import Image
import io

def process_image(filename: str, file_data: bytes):
    # Load image
    image = Image.open(io.BytesIO(file_data))

    # Resize
    image.thumbnail((500, 500))

    # Save
    image.save(f"uploads/{filename}")
    print(f"Processed {filename}")

@router.post("/upload/image")
def upload_image(
    file: UploadFile,
    background_tasks: BackgroundTasks
):
    # Read file
    contents = file.file.read()

    # Add background task
    background_tasks.add_task(
        process_image,
        file.filename,
        contents
    )

    return {"filename": file.filename, "status": "processing"}

5Setting Up Celery with Redis

For scalable task queuing, use Celery with Redis as the message broker.

app/celery_app.py
from celery import Celery

celery_app = Celery(
    "fastapi_app",
    broker="redis://localhost:6379",
    backend="redis://localhost:6379"
)

@celery_app.task
def send_email_task(email: str, subject: str, body: str):
    print(f"Sending email to {email}...")
    # Send email logic here
    return f"Email sent to {email}"

6Using Celery in FastAPI Routes

Call Celery tasks from your FastAPI routes.

app/routes/users.py
from app.celery_app import send_email_task

@router.post("/users/register")
def register_user(email: str, password: str):
    # Create user
    user = create_user(email, password)

    # Send welcome email asynchronously
    send_email_task.delay(
        email,
        "Welcome!",
        "Thanks for registering!"
    )

    return {"user_id": user.id, "email": user.email}

7Multi-Step Background Tasks

Chain multiple tasks together for complex workflows.

app/celery_app.py
from celery import chain

@celery_app.task
def step_1_validate_data(data: dict):
    print("Step 1: Validating...")
    return data

@celery_app.task
def step_2_process_data(data: dict):
    print("Step 2: Processing...")
    return data

@celery_app.task
def step_3_save_data(data: dict):
    print("Step 3: Saving...")
    return data

# Chain tasks
workflow = chain(
    step_1_validate_data.s(data),
    step_2_process_data.s(),
    step_3_save_data.s()
)

workflow.apply_async()

8Error Handling and Retry Logic

Implement retries for failed tasks with exponential backoff.

app/celery_app.py
@celery_app.task(
    bind=True,
    max_retries=3,
    default_retry_delay=60
)
def send_email_with_retry(self, email: str):
    try:
        # Send email
        send_email(email)
    except Exception as exc:
        # Retry with exponential backoff
        raise self.retry(exc=exc, countdown=2 ** self.request.retries)

Idempotency: Make tasks idempotent (safe to run multiple times). A retry might execute the same task twice.

9Logging and Monitoring

Track task execution and debug failures.

app/celery_app.py
import logging

logger = logging.getLogger(__name__)

@celery_app.task
def logged_task(data: dict):
    logger.info(f"Starting task with {data}")

    try:
        # Process data
        result = process(data)
        logger.info(f"Task completed: {result}")
        return result
    except Exception as e:
        logger.error(f"Task failed: {e}")
        raise

10Tracking Task Status

Let clients check task progress using task IDs.

app/routes/tasks.py
@router.post("/process")
def start_processing(data: dict):
    # Start async task
    task = process_data.delay(data)

    return {"task_id": task.id}

@router.get("/task/{task_id}")
def get_task_status(task_id: str):
    from app.celery_app import celery_app

    task_result = celery_app.AsyncResult(task_id)

    return {
        "task_id": task_id,
        "status": task_result.status,
        "result": task_result.result
    }

11Scheduled Tasks with Celery Beat

Run periodic tasks at scheduled times.

app/celery_app.py
from celery.schedules import crontab

celery_app.conf.beat_schedule = {
    "clean-up-every-day": {
        "task": "app.tasks.cleanup_old_files",
        "schedule": crontab(hour=2, minute=0),  # 2 AM daily
    },
    "send-digest-weekly": {
        "task": "app.tasks.send_weekly_digest",
        "schedule": crontab(day_of_week=0, hour=9),  # Monday 9 AM
    },
}

12Testing Background Tasks

Test tasks synchronously in development and tests.

tests/test_tasks.py
from app.celery_app import celery_app

def test_send_email_task():
    # Run task synchronously for testing
    celery_app.conf.task_always_eager = True

    result = send_email_task.delay("[email protected]")

    assert result.successful()
    assert result.result == "Email sent to [email protected]"

13Common Background Task Pitfalls

PitfallSolution
Tasks that are not idempotentDesign tasks to be safe if run multiple times
No error handlingImplement retry logic and error logging
Task dependencies unclearUse Celery chains and groups for workflows
No monitoringUse Flower or Prometheus for visibility
BackgroundTasks for everythingUse Celery for production, BackgroundTasks for simple cases

14Advanced Patterns

  • Task Routing: Send tasks to specific workers based on type
  • Priority Queues: Prioritize urgent tasks
  • Rate Limiting: Limit task execution rate
  • Workflow Engines: Apache Airflow for complex data pipelines
  • Message Brokers: RabbitMQ, Kafka for high-volume tasks

15Resources & What's Next

You now understand how to run tasks asynchronously—essential for production applications. Use background tasks for emails, file processing, notifications, and data processing.

Next Topics: Long-running tasks with WebSockets, advanced monitoring, and distributed task execution.

Congratulations! Your API can now handle long-running operations without blocking users. Keep tasks small, idempotent, and well-monitored! 🚀

About the Author

TG

Thirdy Gayares

Passionate developer creating custom solutions for everyone. I specialize in building user-friendly tools that solve real-world problems while maintaining the highest standards of security and privacy.