🎓 What You Will Learn
- Password Hashing: Securely hash passwords with bcrypt
- User Registration: Validate and store new users
- JWT Tokens: Create and validate JSON Web Tokens
- Login Endpoints: Authenticate users and issue tokens
- Refresh Tokens: Implement short-lived + long-lived tokens
- OAuth2 & OpenID: Third-party authentication integration
1Why Authentication is Critical
Authentication verifies user identity. Authorization controls what they can access. Both are essential for security.
Golden Rule: Never store passwords in plain text. Always hash passwords with bcrypt, Argon2, or scrypt.
2Password Hashing with Bcrypt
Use bcrypt to securely hash passwords.
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def hash_password(password: str) -> str:
return pwd_context.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)3User Registration with Validation
Register new users with password validation.
from pydantic import BaseModel, EmailStr, Field
class UserCreate(BaseModel):
email: EmailStr
password: str = Field(min_length=8)
full_name: str
@router.post("/register")
def register(user_data: UserCreate, session: Session = Depends(get_session)):
# Check if user exists
existing = session.exec(
select(User).where(User.email == user_data.email)
).first()
if existing:
raise HTTPException(status_code=400, detail="Email already registered")
# Create user
hashed_pwd = hash_password(user_data.password)
user = User(
email=user_data.email,
hashed_password=hashed_pwd,
full_name=user_data.full_name
)
session.add(user)
session.commit()
return {"message": "User registered", "email": user.email}🔐 JWT Authentication Flow
4Creating JWT Tokens
Generate JWT tokens for stateless authentication.
from datetime import datetime, timedelta
import jwt
SECRET_KEY = "your-secret-key"
def create_access_token(user_id: int) -> str:
payload = {
"sub": str(user_id),
"exp": datetime.utcnow() + timedelta(minutes=15)
}
return jwt.encode(payload, SECRET_KEY, algorithm="HS256")
def verify_token(token: str) -> int:
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
user_id = payload.get("sub")
if not user_id:
return None
return int(user_id)
except jwt.ExpiredSignatureError:
return None5Login Endpoint
Authenticate users and issue tokens.
@router.post("/login")
def login(email: str, password: str, session: Session = Depends(get_session)):
user = session.exec(
select(User).where(User.email == email)
).first()
if not user or not verify_password(password, user.hashed_password):
raise HTTPException(status_code=401, detail="Invalid credentials")
access_token = create_access_token(user.id)
return {
"access_token": access_token,
"token_type": "bearer",
"user_id": user.id
}6Protected Routes with Dependency Injection
Protect routes using FastAPI's dependency system.
from fastapi.security import HTTPBearer, HTTPAuthCredentials
security = HTTPBearer()
def get_current_user(
credentials: HTTPAuthCredentials = Depends(security),
session: Session = Depends(get_session)
) -> User:
user_id = verify_token(credentials.credentials)
if not user_id:
raise HTTPException(status_code=401, detail="Invalid token")
user = session.get(User, user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
@router.get("/me")
def get_profile(current_user: User = Depends(get_current_user)):
return {"id": current_user.id, "email": current_user.email}7Refresh Token Strategy
Implement short-lived access tokens + long-lived refresh tokens.
def create_tokens(user_id: int) -> dict:
access_token = create_access_token(user_id)
refresh_payload = {
"sub": str(user_id),
"exp": datetime.utcnow() + timedelta(days=7)
}
refresh_token = jwt.encode(refresh_payload, SECRET_KEY, algorithm="HS256")
return {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "bearer"
}8Role-Based Access Control
Control what authenticated users can do.
from enum import Enum
class UserRole(str, Enum):
user = "user"
admin = "admin"
class User(SQLModel, table=True):
id: int = Field(primary_key=True)
email: str
role: UserRole = UserRole.userdef require_admin(current_user: User = Depends(get_current_user)) -> User:
if current_user.role != UserRole.admin:
raise HTTPException(status_code=403, detail="Admin access required")
return current_user
@router.delete("/users/{user_id}")
def delete_user(
user_id: int,
admin: User = Depends(require_admin)
):
# Delete user...
return {"message": "User deleted"}9OAuth2 Provider Integration
Allow users to login with Google, GitHub, etc.
OAuth2 Flow: Redirect user to provider → Provider authenticates → Provider sends code → Exchange code for tokens → Create user session
10Password Reset Flows
Safe password reset with time-limited tokens.
@router.post("/forgot-password")
def forgot_password(email: str, session: Session = Depends(get_session)):
user = session.exec(select(User).where(User.email == email)).first()
if not user:
return {"message": "If email exists, reset link sent"}
# Create reset token (valid for 1 hour)
reset_token = create_access_token(user.id, expires_minutes=60)
# Send email with reset link
send_reset_email(email, reset_token)
return {"message": "Reset link sent"}11Security Best Practices Checklist
- ✅ Hash passwords with bcrypt/Argon2
- ✅ Use HTTPS in production
- ✅ Implement CORS properly
- ✅ Validate all inputs
- ✅ Rate limit login attempts
- ✅ Log security events
- ✅ Use environment variables for secrets
- ✅ Keep dependencies updated
Never: Log passwords, store tokens in URLs, hardcode secrets, or skip HTTPS.
12Rate Limiting Login Attempts
Prevent brute force attacks by limiting login attempts.
import redis
redis_client = redis.Redis()
@router.post("/login")
def login(email: str, password: str):
key = f"login_attempts:{email}"
attempts = redis_client.incr(key)
if attempts > 5:
raise HTTPException(status_code=429, detail="Too many attempts")
if attempts == 1:
redis_client.expire(key, 900) # 15 minute window
# Authenticate...13Testing Authentication
Test auth flows thoroughly.
def test_login_success():
response = client.post("/login", data={"email": "[email protected]", "password": "password123"})
assert response.status_code == 200
assert "access_token" in response.json()
def test_invalid_credentials():
response = client.post("/login", data={"email": "[email protected]", "password": "wrong"})
assert response.status_code == 40114Monitoring & Logging Security Events
Log failed logins and suspicious activity for auditing.
Log Events: Failed logins, privilege escalation, permission denials, and token refreshes.
15Resources & What's Next
You now understand secure authentication and authorization. Implement these patterns in every FastAPI application.
Next Topics: Advanced deployment strategies, monitoring, and production hardening.
Congratulations! Your API is now secure. Protect user data and build with confidence! 🔐