FastAPI

FastAPI Long-Running Tasks — Real-Time Progress Tracking

Thirdy Gayares
14 min read

🎓 What You Will Learn

  • WebSockets: Real-time bidirectional communication
  • Progress Tracking: Update clients on task progress
  • Celery + Redis: Scalable long-running task execution
  • Retry Logic: Handle failures with exponential backoff
  • Timeout Handling: Gracefully manage task timeouts
  • Monitoring: Track task health and performance
FastAPIWebSocketsCelery

1Challenges of Long-Running Tasks

Long-running tasks create UX problems: users don't know if anything is happening, connections timeout, and there's no visibility into progress.

Solution: Track progress in real-time and stream updates to clients using WebSockets.

2WebSocket Fundamentals

WebSockets enable real-time, bidirectional communication. Perfect for sending progress updates to clients.

app/routes/tasks.py
from fastapi import WebSocket
import asyncio

@app.websocket("/ws/task/{task_id}")
async def websocket_endpoint(websocket: WebSocket, task_id: str):
    await websocket.accept()

    try:
        while True:
            # Get task progress
            progress = get_task_progress(task_id)

            # Send to client
            await websocket.send_json({
                "task_id": task_id,
                "progress": progress,
                "status": "running"
            })

            await asyncio.sleep(1)  # Update every second

    except Exception as e:
        await websocket.close(code=1000)

⏳ Real-Time Progress Bar Demo

Processing file...35%
✅ Real-time progress updates keep users engaged!

3Storing Progress in Redis

Use Redis to store task progress so multiple clients can track the same task.

app/services/task_service.py
import redis
import json

redis_client = redis.Redis(host="localhost", port=6379)

class TaskService:
    @staticmethod
    def update_progress(task_id: str, progress: int, status: str = "running"):
        data = {
            "progress": progress,
            "status": status,
            "updated_at": datetime.now().isoformat()
        }
        redis_client.set(f"task:{task_id}", json.dumps(data))

    @staticmethod
    def get_progress(task_id: str):
        data = redis_client.get(f"task:{task_id}")
        return json.loads(data) if data else None

4Celery Task Progress Reporting

Report progress from Celery tasks using task.update_state().

app/celery_app.py
@celery_app.task(bind=True)
def process_large_file(self, file_path: str):
    total_items = count_items(file_path)

    for idx, item in enumerate(process_file(file_path)):
        # Update progress
        progress = int((idx + 1) / total_items * 100)
        self.update_state(
            state="PROGRESS",
            meta={"current": idx + 1, "total": total_items, "progress": progress}
        )

        # Process item
        process_item(item)

    return {"status": "Complete", "items_processed": total_items}

5Implementing Retry Logic

Retry failed tasks with exponential backoff to handle transient failures.

app/celery_app.py
@celery_app.task(
    bind=True,
    autoretry_for=(Exception,),
    retry_kwargs={"max_retries": 3},
    retry_backoff=True,
    retry_backoff_max=600,
    retry_jitter=True
)
def resilient_task(self, data: dict):
    try:
        return process_data(data)
    except Exception as exc:
        # This will retry automatically
        raise exc

6Handling Task Timeouts

Set timeouts to prevent tasks from running indefinitely.

app/celery_app.py
@celery_app.task(
    time_limit=3600,  # Hard limit: 1 hour
    soft_time_limit=3300  # Soft limit: 55 minutes
)
def time_limited_task(data: dict):
    from celery.exceptions import SoftTimeLimitExceeded

    try:
        return long_running_operation(data)
    except SoftTimeLimitExceeded:
        # Gracefully shutdown
        save_state()
        raise

7Monitoring with Flower

Use Flower to monitor Celery tasks in real-time.

Setup: Run celery -A app.celery_app flower then visit http://localhost:5555

8HTTP Polling Alternative to WebSockets

For simpler cases, clients can poll for status via HTTP instead of WebSockets.

app/routes/tasks.py
@router.get("/task/{task_id}/status")
def get_task_status(task_id: str):
    from app.services.task_service import TaskService

    progress = TaskService.get_progress(task_id)

    return {
        "task_id": task_id,
        "progress": progress.get("progress", 0),
        "status": progress.get("status", "unknown")
    }

9Job Queue Architecture

Implement a reliable job queue with retries and persistence.

app/models/job.py
from sqlmodel import SQLModel, Field
from datetime import datetime

class Job(SQLModel, table=True):
    id: int = Field(primary_key=True)
    task_id: str
    status: str  # pending, running, completed, failed
    progress: int = 0
    error_message: str = None
    created_at: datetime
    updated_at: datetime
    retry_count: int = 0

10Dead Letter Queue for Failed Tasks

Move tasks to a dead letter queue after all retries are exhausted.

app/celery_app.py
@celery_app.task(bind=True, max_retries=3)
def critical_task(self, data: dict):
    try:
        return process(data)
    except Exception as exc:
        if self.request.retries < self.max_retries:
            raise self.retry(exc=exc, countdown=60)
        else:
            # Send to dead letter queue
            send_to_dead_letter_queue(data, str(exc))
            raise

11Load Balancing Across Workers

Distribute tasks fairly across multiple workers using prefetch settings.

app/celery_app.py
celery_app.conf.worker_prefetch_multiplier = 1
celery_app.conf.worker_max_tasks_per_child = 1000

12Testing Long-Running Tasks

Test tasks synchronously during development and testing.

tests/test_tasks.py
def test_long_running_task():
    from app.celery_app import celery_app

    celery_app.conf.task_always_eager = True

    result = process_large_file.delay("test_file.csv")

    assert result.successful()
    assert result.result["items_processed"] > 0

13Common Pitfalls

  • No timeout: Tasks can hang indefinitely
  • No retry: Transient failures cause permanent failure
  • No monitoring: Can't debug stuck tasks
  • No idempotency: Retries cause side effects
  • Blocking WebSockets: Hold connections open unnecessarily

Performance: WebSocket connections are stateful and consume server resources. Use HTTP polling for non-critical updates to reduce load.

14Advanced Patterns

  • Task Groups: Run multiple tasks in parallel
  • Pipelines: Chain task results together
  • Chords: Run tasks then aggregate results
  • Custom Routing: Send tasks to specific workers

15Resources & What's Next

You now understand how to handle long-running tasks with real-time progress tracking. Use these patterns for file processing, data imports, report generation, and batch operations.

Next Topics: Database migrations, security authentication, and deployment strategies.

Congratulations! Your application can now handle complex, long-running operations while keeping users informed. Build with confidence! 🚀

About the Author

TG

Thirdy Gayares

Passionate developer creating custom solutions for everyone. I specialize in building user-friendly tools that solve real-world problems while maintaining the highest standards of security and privacy.