🎓 What You Will Learn
- WebSockets: Real-time bidirectional communication
- Progress Tracking: Update clients on task progress
- Celery + Redis: Scalable long-running task execution
- Retry Logic: Handle failures with exponential backoff
- Timeout Handling: Gracefully manage task timeouts
- Monitoring: Track task health and performance
1Challenges of Long-Running Tasks
Long-running tasks create UX problems: users don't know if anything is happening, connections timeout, and there's no visibility into progress.
Solution: Track progress in real-time and stream updates to clients using WebSockets.
2WebSocket Fundamentals
WebSockets enable real-time, bidirectional communication. Perfect for sending progress updates to clients.
from fastapi import WebSocket
import asyncio
@app.websocket("/ws/task/{task_id}")
async def websocket_endpoint(websocket: WebSocket, task_id: str):
await websocket.accept()
try:
while True:
# Get task progress
progress = get_task_progress(task_id)
# Send to client
await websocket.send_json({
"task_id": task_id,
"progress": progress,
"status": "running"
})
await asyncio.sleep(1) # Update every second
except Exception as e:
await websocket.close(code=1000)⏳ Real-Time Progress Bar Demo
3Storing Progress in Redis
Use Redis to store task progress so multiple clients can track the same task.
import redis
import json
redis_client = redis.Redis(host="localhost", port=6379)
class TaskService:
@staticmethod
def update_progress(task_id: str, progress: int, status: str = "running"):
data = {
"progress": progress,
"status": status,
"updated_at": datetime.now().isoformat()
}
redis_client.set(f"task:{task_id}", json.dumps(data))
@staticmethod
def get_progress(task_id: str):
data = redis_client.get(f"task:{task_id}")
return json.loads(data) if data else None4Celery Task Progress Reporting
Report progress from Celery tasks using task.update_state().
@celery_app.task(bind=True)
def process_large_file(self, file_path: str):
total_items = count_items(file_path)
for idx, item in enumerate(process_file(file_path)):
# Update progress
progress = int((idx + 1) / total_items * 100)
self.update_state(
state="PROGRESS",
meta={"current": idx + 1, "total": total_items, "progress": progress}
)
# Process item
process_item(item)
return {"status": "Complete", "items_processed": total_items}5Implementing Retry Logic
Retry failed tasks with exponential backoff to handle transient failures.
@celery_app.task(
bind=True,
autoretry_for=(Exception,),
retry_kwargs={"max_retries": 3},
retry_backoff=True,
retry_backoff_max=600,
retry_jitter=True
)
def resilient_task(self, data: dict):
try:
return process_data(data)
except Exception as exc:
# This will retry automatically
raise exc6Handling Task Timeouts
Set timeouts to prevent tasks from running indefinitely.
@celery_app.task(
time_limit=3600, # Hard limit: 1 hour
soft_time_limit=3300 # Soft limit: 55 minutes
)
def time_limited_task(data: dict):
from celery.exceptions import SoftTimeLimitExceeded
try:
return long_running_operation(data)
except SoftTimeLimitExceeded:
# Gracefully shutdown
save_state()
raise7Monitoring with Flower
Use Flower to monitor Celery tasks in real-time.
Setup: Run celery -A app.celery_app flower then visit http://localhost:5555
8HTTP Polling Alternative to WebSockets
For simpler cases, clients can poll for status via HTTP instead of WebSockets.
@router.get("/task/{task_id}/status")
def get_task_status(task_id: str):
from app.services.task_service import TaskService
progress = TaskService.get_progress(task_id)
return {
"task_id": task_id,
"progress": progress.get("progress", 0),
"status": progress.get("status", "unknown")
}9Job Queue Architecture
Implement a reliable job queue with retries and persistence.
from sqlmodel import SQLModel, Field
from datetime import datetime
class Job(SQLModel, table=True):
id: int = Field(primary_key=True)
task_id: str
status: str # pending, running, completed, failed
progress: int = 0
error_message: str = None
created_at: datetime
updated_at: datetime
retry_count: int = 010Dead Letter Queue for Failed Tasks
Move tasks to a dead letter queue after all retries are exhausted.
@celery_app.task(bind=True, max_retries=3)
def critical_task(self, data: dict):
try:
return process(data)
except Exception as exc:
if self.request.retries < self.max_retries:
raise self.retry(exc=exc, countdown=60)
else:
# Send to dead letter queue
send_to_dead_letter_queue(data, str(exc))
raise11Load Balancing Across Workers
Distribute tasks fairly across multiple workers using prefetch settings.
celery_app.conf.worker_prefetch_multiplier = 1
celery_app.conf.worker_max_tasks_per_child = 100012Testing Long-Running Tasks
Test tasks synchronously during development and testing.
def test_long_running_task():
from app.celery_app import celery_app
celery_app.conf.task_always_eager = True
result = process_large_file.delay("test_file.csv")
assert result.successful()
assert result.result["items_processed"] > 013Common Pitfalls
- No timeout: Tasks can hang indefinitely
- No retry: Transient failures cause permanent failure
- No monitoring: Can't debug stuck tasks
- No idempotency: Retries cause side effects
- Blocking WebSockets: Hold connections open unnecessarily
Performance: WebSocket connections are stateful and consume server resources. Use HTTP polling for non-critical updates to reduce load.
14Advanced Patterns
- Task Groups: Run multiple tasks in parallel
- Pipelines: Chain task results together
- Chords: Run tasks then aggregate results
- Custom Routing: Send tasks to specific workers
15Resources & What's Next
You now understand how to handle long-running tasks with real-time progress tracking. Use these patterns for file processing, data imports, report generation, and batch operations.
Next Topics: Database migrations, security authentication, and deployment strategies.
Congratulations! Your application can now handle complex, long-running operations while keeping users informed. Build with confidence! 🚀