CWE-566: Authorization Bypass Through User-Controlled Key - Python
Overview
Authorization bypass through user-controlled keys (also known as Insecure Direct Object Reference or IDOR) is a critical vulnerability in Python web applications where developers use user-supplied identifiers - such as user IDs, document IDs, or account numbers - to retrieve resources without verifying that the authenticated user has permission to access them. This enables horizontal privilege escalation, allowing attackers to access other users' data by simply manipulating URL parameters or request bodies.
Python's popular web frameworks (Django, Flask, FastAPI) make database queries simple, but this convenience can lead to overlooking authorization checks. The pattern of directly using request parameters in ORM queries (User.query.get(user_id) or Document.objects.get(id=doc_id)) without ownership validation is extremely common and dangerous. While Django provides some built-in protections through its permission system, Flask and FastAPI require developers to explicitly implement authorization logic.
Python applications are particularly vulnerable because many tutorials and code examples prioritize functionality over security, showing direct object access patterns without authorization checks. The dynamic typing system means there's no compile-time enforcement of security constraints, and the ease of writing "quick" endpoints often leads to skipping proper access control implementation.
Real-world Python IDOR vulnerabilities have led to massive data breaches, including exposure of medical records, financial data, and personal information. The 2019 T-Mobile breach exposed customer data through IDOR vulnerabilities in their API, and countless Flask/Django applications have been compromised through sequential ID enumeration attacks. With Python's dominance in API development and microservices, securing object-level access control is critical.
Primary Defence: Always include ownership verification in database queries. Use patterns like Document.query.filter_by(id=doc_id, owner_id=current_user.id) (Flask), Document.objects.filter(id=doc_id, owner=request.user) (Django), or repository methods that combine resource ID with user ID filters. Never use bare get(id) or objects.get(id=x) without authorization - always scope queries to the authenticated user to prevent horizontal privilege escalation.
Common Vulnerable Patterns
Direct Database Lookup Without Authorization Check
# VULNERABLE - No ownership verification in Flask
from flask import Flask, jsonify, request
from models import Document, db
@app.route('/api/document/<int:doc_id>')
def get_document(doc_id):
# Directly retrieves ANY document by ID
doc = Document.query.get_or_404(doc_id)
return jsonify({
'id': doc.id,
'title': doc.title,
'content': doc.content,
'owner_id': doc.owner_id
})
# Attack example:
# User A's document: GET /api/document/123 → Returns User A's data
# Attacker tries: GET /api/document/124 → Returns User B's data!
# Attacker tries: GET /api/document/125 → Returns User C's data!
# Sequential enumeration exposes ALL documents
Why this is vulnerable: The code uses Document.query.get_or_404(doc_id) which retrieves any document matching the provided ID without verifying that the current user owns it. There is no comparison between the document's owner_id and the authenticated user. An attacker can simply change the doc_id in the URL to access any document in the database, enabling horizontal privilege escalation and mass data exfiltration through sequential ID enumeration.
Django ORM Query Without Permission Check
# VULNERABLE - Missing authorization in Django view
from django.http import JsonResponse
from django.shortcuts import get_object_or_404
from .models import Order
def get_order(request, order_id):
# Retrieves order without checking if user owns it
order = get_object_or_404(Order, id=order_id)
return JsonResponse({
'order_id': order.id,
'total': str(order.total),
'items': [item.name for item in order.items.all()],
'shipping_address': order.shipping_address
})
# Attack example:
# User logged in, makes request: GET /orders/5001/
# User's own order ID is 5001, but tries: GET /orders/5002/
# Result: Returns another user's order with private shipping address!
Why this is vulnerable: Django's get_object_or_404(Order, id=order_id) retrieves any order with the specified ID without checking if the authenticated user owns that order. The function returns the order if it exists, regardless of who owns it. This allows any authenticated user to access other users' order details, including sensitive shipping addresses and financial information, by manipulating the order_id URL parameter.
FastAPI Endpoint with Missing Authorization
# VULNERABLE - Path parameter used directly without auth check
from fastapi import FastAPI, HTTPException, Depends
from sqlalchemy.orm import Session
from . import models, database
@app.get("/api/users/{user_id}/profile")
async def get_user_profile(
user_id: int,
db: Session = Depends(database.get_db)
):
# No check if current user can access this profile
user = db.query(models.User).filter(
models.User.id == user_id
).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
return {
"user_id": user.id,
"email": user.email, # PII exposure!
"full_name": user.full_name,
"phone": user.phone,
"ssn": user.ssn # Critical data leak!
}
# Attack example:
# Attacker enumerates: GET /api/users/1/profile → User 1's SSN
# GET /api/users/2/profile → User 2's SSN
# GET /api/users/3/profile → User 3's SSN
# Mass PII theft through sequential access
Why this is vulnerable: The endpoint accepts a user_id path parameter and queries the database for that user without verifying that the current authenticated user has permission to view that profile. There is no check comparing the requested user_id with the authenticated user's ID. This enables any authenticated user to enumerate all user profiles by incrementing the user_id parameter, exposing highly sensitive PII including SSNs, email addresses, and phone numbers for every user in the system.
File Access Without Owner Validation
# VULNERABLE - File download without authorization
from flask import send_file, abort
import os
@app.route('/download/<filename>')
def download_file(filename):
# Uses user-provided filename directly
file_path = os.path.join('/uploads', filename)
if not os.path.exists(file_path):
abort(404)
# No check if current user owns this file!
return send_file(file_path, as_attachment=True)
# Attack example:
# User uploads: profile_123.pdf (their document)
# Attacker guesses: GET /download/profile_124.pdf
# Attacker tries: GET /download/financial_report_125.pdf
# Result: Downloads other users' private files!
Why this is vulnerable: The code constructs a file path using the user-controlled filename parameter and serves the file with send_file() without any authorization check. There is no verification that the current user owns the file or has permission to access it. An attacker can download any file in the /uploads directory by guessing or enumerating filenames, and can potentially use path traversal (../../etc/passwd) to access files outside the intended directory if additional protections aren't in place.
Batch Operation Without Individual Authorization
# VULNERABLE - Bulk delete without per-item authorization
from django.views.decorators.http import require_http_methods
from django.http import JsonResponse
from .models import Document
import json
@require_http_methods(["DELETE"])
def bulk_delete_documents(request):
data = json.loads(request.body)
doc_ids = data.get('document_ids', [])
# Deletes ALL specified documents without ownership check!
deleted_count = Document.objects.filter(
id__in=doc_ids
).delete()[0]
return JsonResponse({'deleted': deleted_count})
# Attack example:
# POST /api/documents/bulk-delete
# Body: {"document_ids": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]}
# Result: Deletes ANY documents, not just attacker's documents!
# Attacker can delete entire database by enumerating IDs
Why this is vulnerable: The bulk delete operation uses Document.objects.filter(id__in=doc_ids).delete() which deletes all documents matching the provided IDs without verifying ownership for each document. The query lacks an ownership filter (e.g., owner=request.user), allowing an attacker to delete any documents in the system by including their IDs in the request. This enables mass data destruction attacks where an attacker can enumerate IDs and delete the entire database, affecting all users.
Secure Patterns
Filter Query by Current User (Primary Pattern)
# SECURE - Scope query to authenticated user
from flask import Flask, jsonify, abort
from flask_login import login_required, current_user
from models import Document, db
@app.route('/api/document/<int:doc_id>')
@login_required
def get_document(doc_id):
# Filter by BOTH id AND owner_id
doc = Document.query.filter_by(
id=doc_id,
owner_id=current_user.id # Ensures user owns this document
).first()
if not doc:
# Return 404 for both non-existent and unauthorized
# Don't reveal whether document exists but user can't access it
abort(404, "Document not found")
return jsonify({
'id': doc.id,
'title': doc.title,
'content': doc.content
})
Why this works: By including the ownership check (owner_id=current_user.id) directly in the database query filter, the database engine ensures that only documents belonging to the authenticated user can be retrieved. Even if an attacker knows another user's document ID, the query will return no results because the ownership constraint is not satisfied. This prevents horizontal privilege escalation at the data access layer. Returning a 404 for both non-existent and unauthorized resources prevents information leakage about which document IDs exist in the system.
Explicit Authorization Check with Dedicated Function
# SECURE - Separate authorization logic for reusability
from flask import Flask, jsonify, abort
from flask_login import login_required, current_user
from models import Document, db
from functools import wraps
def authorize_document_access(require_permission='read'):
"""Decorator to verify user can access document"""
def decorator(f):
@wraps(f)
def wrapper(doc_id, *args, **kwargs):
doc = Document.query.get_or_404(doc_id)
# Check ownership
if doc.owner_id != current_user.id:
# Check shared access
shared_access = db.session.query(DocumentShare).filter_by(
document_id=doc.id,
user_id=current_user.id
).first()
if not shared_access:
abort(403, "Not authorized to access this document")
# Check permission level
if require_permission == 'write' and not shared_access.can_write:
abort(403, "Not authorized to modify this document")
if require_permission == 'delete' and not shared_access.can_delete:
abort(403, "Not authorized to delete this document")
# User is authorized, attach document to kwargs
kwargs['document'] = doc
return f(doc_id, *args, **kwargs)
return wrapper
return decorator
@app.route('/api/document/<int:doc_id>')
@login_required
@authorize_document_access(require_permission='read')
def get_document(doc_id, document=None):
return jsonify({
'id': document.id,
'title': document.title,
'content': document.content,
'is_owner': document.owner_id == current_user.id
})
@app.route('/api/document/<int:doc_id>', methods=['DELETE'])
@login_required
@authorize_document_access(require_permission='delete')
def delete_document(doc_id, document=None):
db.session.delete(document)
db.session.commit()
return jsonify({'message': 'Document deleted'}), 200
Why this works: This pattern separates authorization logic into a reusable decorator, ensuring consistent enforcement across all endpoints. The decorator performs comprehensive authorization checks including ownership verification and shared access permissions before allowing the endpoint to execute. By centralizing the authorization logic, it prevents developers from forgetting to add checks to new endpoints and makes the codebase easier to audit. The permission levels (read/write/delete) provide fine-grained access control, preventing users with read-only shared access from modifying or deleting documents they don't own.
Django Class-Based Views with Permission Mixins
# SECURE - Django with built-in permission system
from django.views.generic import DetailView, UpdateView, DeleteView
from django.contrib.auth.mixins import LoginRequiredMixin, UserPassesTestMixin
from django.core.exceptions import PermissionDenied
from .models import Order
class UserOwnsOrderMixin(UserPassesTestMixin):
"""Mixin to ensure user owns the order"""
def test_func(self):
order = self.get_object()
# Check if current user owns this order
return order.user == self.request.user
def handle_no_permission(self):
# Return 403 instead of redirecting to login
raise PermissionDenied("You don't have permission to access this order")
class OrderDetailView(LoginRequiredMixin, UserOwnsOrderMixin, DetailView):
model = Order
template_name = 'orders/detail.html'
context_object_name = 'order'
def get_queryset(self):
# Additional safety: scope queryset to current user
return Order.objects.filter(user=self.request.user)
class OrderUpdateView(LoginRequiredMixin, UserOwnsOrderMixin, UpdateView):
model = Order
fields = ['shipping_address', 'notes']
template_name = 'orders/edit.html'
def get_queryset(self):
# Only allow updating own orders
return Order.objects.filter(user=self.request.user)
class OrderDeleteView(LoginRequiredMixin, UserOwnsOrderMixin, DeleteView):
model = Order
success_url = '/orders/'
def get_queryset(self):
# Only allow deleting own orders
return Order.objects.filter(user=self.request.user)
Why this works: Django's UserPassesTestMixin provides a declarative way to enforce authorization rules. The test_func method is automatically called before the view executes, comparing the order's owner with the authenticated user. If the test fails, a 403 Forbidden response is returned before any data is accessed or modified. The additional get_queryset override provides defense-in-depth by scoping all database queries to the current user at the ORM level, ensuring that even if the permission mixin is bypassed, the queryset filter prevents unauthorized access. This layered approach makes authorization failures impossible at multiple levels.
FastAPI Dependency Injection for Authorization
# SECURE - FastAPI with dependency injection pattern
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.orm import Session
from typing import Annotated
from . import models, database, auth
app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
async def get_current_user(
token: Annotated[str, Depends(oauth2_scheme)],
db: Session = Depends(database.get_db)
) -> models.User:
"""Verify JWT token and return current user"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials"
)
user_id = auth.decode_token(token)
if user_id is None:
raise credentials_exception
user = db.query(models.User).filter(models.User.id == user_id).first()
if user is None:
raise credentials_exception
return user
async def get_authorized_document(
doc_id: int,
current_user: Annotated[models.User, Depends(get_current_user)],
db: Session = Depends(database.get_db)
) -> models.Document:
"""Verify user can access document"""
doc = db.query(models.Document).filter(
models.Document.id == doc_id,
models.Document.owner_id == current_user.id # Authorization check
).first()
if not doc:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Document not found"
)
return doc
@app.get("/api/documents/{doc_id}")
async def get_document(
document: Annotated[models.Document, Depends(get_authorized_document)]
):
"""Get document - authorization handled by dependency"""
return {
"id": document.id,
"title": document.title,
"content": document.content,
"created_at": document.created_at
}
@app.delete("/api/documents/{doc_id}")
async def delete_document(
document: Annotated[models.Document, Depends(get_authorized_document)],
db: Session = Depends(database.get_db)
):
"""Delete document - authorization handled by dependency"""
db.delete(document)
db.commit()
return {"message": "Document deleted successfully"}
Why this works: FastAPI's dependency injection system allows authorization logic to be declared as dependencies that run before the endpoint function executes. The get_authorized_document dependency performs both authentication (via get_current_user) and authorization (checking owner_id) before the document is passed to the endpoint. If authorization fails, an exception is raised and the endpoint never executes, preventing any unauthorized access. This pattern ensures consistent authorization across all endpoints using the same dependency, eliminates code duplication, and makes authorization failures impossible because the endpoint literally cannot receive an unauthorized document.
Using UUIDs with Authorization (Defense in Depth)
# SECURE - UUIDs + Authorization checks
from flask import Flask, jsonify, abort
from flask_login import login_required, current_user
from models import Document, db
import uuid
class Document(db.Model):
# Use UUID instead of sequential integer
id = db.Column(db.String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
owner_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
title = db.Column(db.String(200), nullable=False)
content = db.Column(db.Text)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
@app.route('/api/document/<doc_id>')
@login_required
def get_document(doc_id):
# Validate UUID format first
try:
uuid.UUID(doc_id)
except ValueError:
abort(400, "Invalid document ID format")
# STILL perform authorization check - UUIDs are NOT sufficient!
doc = Document.query.filter_by(
id=doc_id,
owner_id=current_user.id
).first()
if not doc:
abort(404, "Document not found")
return jsonify({
'id': doc.id,
'title': doc.title,
'content': doc.content
})
# Example document IDs:
# Instead of: /api/document/123, /api/document/124, /api/document/125
# Use: /api/document/a3f7c8e1-4b9d-4e3a-8f7c-1234567890ab
# UUIDs are cryptographically random and cannot be enumerated
Why this works: UUIDs (Universally Unique Identifiers) are 128-bit values with approximately 2^122 possible unique values, making sequential enumeration computationally infeasible. Unlike sequential IDs (1, 2, 3, 4...), an attacker cannot guess valid UUIDs by incrementing or pattern matching. However, UUIDs alone are NOT a security control - they only prevent enumeration attacks. The code still includes explicit authorization checks (owner_id=current_user.id) because UUIDs can be exposed through logs, URLs shared between users, browser history, or other side channels. This defense-in-depth approach combines obscurity (UUIDs) with proper access control (ownership verification) for maximum security.
Bulk Operations with Per-Item Authorization
# SECURE - Verify ownership for each item in bulk operation
from django.views.decorators.http import require_http_methods
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_protect
from django.contrib.auth.decorators import login_required
from .models import Document
import json
@login_required
@require_http_methods(["DELETE"])
@csrf_protect
def bulk_delete_documents(request):
data = json.loads(request.body)
doc_ids = data.get('document_ids', [])
if not doc_ids or len(doc_ids) > 100: # Rate limit
return JsonResponse(
{'error': 'Invalid request'},
status=400
)
# Only delete documents owned by current user
deleted_count = Document.objects.filter(
id__in=doc_ids,
owner=request.user # Authorization check!
).delete()[0]
# Return count of actually deleted documents
# May be less than requested if user didn't own all of them
return JsonResponse({
'deleted': deleted_count,
'requested': len(doc_ids)
})
# Alternative with detailed feedback:
@login_required
@require_http_methods(["DELETE"])
@csrf_protect
def bulk_delete_documents_detailed(request):
data = json.loads(request.body)
doc_ids = data.get('document_ids', [])
results = {
'deleted': [],
'not_found': [],
'not_authorized': []
}
for doc_id in doc_ids[:100]: # Limit to 100
try:
doc = Document.objects.get(id=doc_id)
# Check ownership
if doc.owner != request.user:
results['not_authorized'].append(doc_id)
continue
doc.delete()
results['deleted'].append(doc_id)
except Document.DoesNotExist:
results['not_found'].append(doc_id)
return JsonResponse(results)
Why this works: Bulk operations are secured by including the ownership filter (owner=request.user) in the queryset used for deletion. The database only deletes documents that match both the ID list AND the ownership requirement, automatically excluding any documents the user doesn't own. This prevents attackers from deleting other users' documents by including unauthorized IDs in the batch. The rate limiting (maximum 100 items) prevents abuse, and returning the actual deleted count vs. requested count provides transparency without revealing which specific IDs were unauthorized. The alternative implementation with detailed feedback allows legitimate users to understand authorization failures while still preventing unauthorized bulk operations.
Framework-Specific Guidance
Django - Permission-Based Access Control
Django provides a robust permission system that should be leveraged for authorization:
# SECURE - Django with permissions and object-level permissions
from django.contrib.auth.decorators import login_required, permission_required
from django.core.exceptions import PermissionDenied
from django.shortcuts import get_object_or_404
from django.http import JsonResponse
from .models import Document
# Model with owner field
class Document(models.Model):
owner = models.ForeignKey(User, on_delete=models.CASCADE)
title = models.CharField(max_length=200)
content = models.TextField()
is_public = models.BooleanField(default=False)
class Meta:
permissions = [
("view_private_document", "Can view private documents"),
]
def user_can_access(self, user):
"""Object-level permission check"""
if self.owner == user:
return True
if self.is_public:
return True
# Check sharing permissions
return self.shared_with.filter(id=user.id).exists()
# View with permission checking
@login_required
def get_document(request, doc_id):
doc = get_object_or_404(Document, id=doc_id)
# Object-level authorization
if not doc.user_can_access(request.user):
raise PermissionDenied("You don't have access to this document")
return JsonResponse({
'id': doc.id,
'title': doc.title,
'content': doc.content,
'is_owner': doc.owner == request.user
})
# Django Rest Framework with object permissions
from rest_framework import viewsets, permissions
from rest_framework.exceptions import PermissionDenied
class IsOwnerOrReadOnly(permissions.BasePermission):
"""Object-level permission to only allow owners to edit"""
def has_object_permission(self, request, view, obj):
# Read permissions for GET, HEAD, OPTIONS
if request.method in permissions.SAFE_METHODS:
return obj.user_can_access(request.user)
# Write permissions only for owner
return obj.owner == request.user
class DocumentViewSet(viewsets.ModelViewSet):
queryset = Document.objects.all()
serializer_class = DocumentSerializer
permission_classes = [permissions.IsAuthenticated, IsOwnerOrReadOnly]
def get_queryset(self):
# Scope to documents user can access
user = self.request.user
return Document.objects.filter(
models.Q(owner=user) |
models.Q(is_public=True) |
models.Q(shared_with=user)
).distinct()
def perform_create(self, serializer):
# Automatically set owner to current user
serializer.save(owner=self.request.user)
Why this works: Django's permission system provides declarative, model-level access control that integrates with the authentication system. The user_can_access method encapsulates object-level authorization logic in the model itself, making it reusable across views and ensuring consistent enforcement. Django Rest Framework's has_object_permission is automatically called before any object modification, preventing unauthorized access at the framework level. The get_queryset override ensures that only authorized documents are even visible to the user's queries, combining permission checks with queryset filtering for defense-in-depth.
Flask - Custom Authorization Decorators
Flask requires manual implementation of authorization, but decorators provide clean reusability:
# SECURE - Flask with comprehensive authorization system
from flask import Flask, abort, g
from flask_login import LoginManager, login_required, current_user
from functools import wraps
from models import Document, DocumentShare, db
app = Flask(__name__)
login_manager = LoginManager(app)
# Authorization decorator with caching
def authorize_resource(model_class, param_name='id', permission='read'):
"""
Decorator to authorize access to a resource
Args:
model_class: SQLAlchemy model class
param_name: URL parameter name containing resource ID
permission: Required permission level ('read', 'write', 'delete')
"""
def decorator(f):
@wraps(f)
def wrapper(*args, **kwargs):
resource_id = kwargs.get(param_name)
# Fetch resource
resource = model_class.query.get(resource_id)
if not resource:
abort(404, f"{model_class.__name__} not found")
# Check ownership
if hasattr(resource, 'owner_id'):
if resource.owner_id == current_user.id:
# Owner has all permissions
g.authorized_resource = resource
return f(*args, **kwargs)
# Check shared access
if hasattr(resource, 'shared_access'):
access = resource.shared_access.filter_by(
user_id=current_user.id
).first()
if access:
# Verify permission level
if permission == 'read' and access.can_read:
g.authorized_resource = resource
return f(*args, **kwargs)
elif permission == 'write' and access.can_write:
g.authorized_resource = resource
return f(*args, **kwargs)
elif permission == 'delete' and access.can_delete:
g.authorized_resource = resource
return f(*args, **kwargs)
# Not authorized
abort(403, "Not authorized to access this resource")
return wrapper
return decorator
# Usage in routes
@app.route('/api/document/<int:id>')
@login_required
@authorize_resource(Document, param_name='id', permission='read')
def get_document(id):
doc = g.authorized_resource
return jsonify({
'id': doc.id,
'title': doc.title,
'content': doc.content
})
@app.route('/api/document/<int:id>', methods=['PUT'])
@login_required
@authorize_resource(Document, param_name='id', permission='write')
def update_document(id):
doc = g.authorized_resource
data = request.get_json()
doc.title = data.get('title', doc.title)
doc.content = data.get('content', doc.content)
db.session.commit()
return jsonify({'message': 'Document updated'})
@app.route('/api/document/<int:id>', methods=['DELETE'])
@login_required
@authorize_resource(Document, param_name='id', permission='delete')
def delete_document(id):
doc = g.authorized_resource
db.session.delete(doc)
db.session.commit()
return jsonify({'message': 'Document deleted'})
Why this works: The custom decorator encapsulates all authorization logic in a single reusable function, ensuring consistent enforcement across all endpoints. By checking ownership first and then falling back to shared access permissions, it implements a complete authorization model. The decorator fetches the resource once and stores it in Flask's g object, eliminating redundant database queries while ensuring the endpoint only receives authorized resources. The permission parameter allows fine-grained control (read/write/delete), preventing users with read-only access from modifying or deleting resources they don't own.
FastAPI - Comprehensive Dependency Authorization
# SECURE - FastAPI with advanced dependency patterns
from fastapi import FastAPI, Depends, HTTPException, status, Security
from fastapi.security import OAuth2PasswordBearer, SecurityScopes
from sqlalchemy.orm import Session
from typing import Annotated, Optional
from pydantic import BaseModel
from . import models, database, auth
app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(
tokenUrl="token",
scopes={
"documents:read": "Read documents",
"documents:write": "Write documents",
"documents:delete": "Delete documents",
}
)
class AuthorizationError(HTTPException):
def __init__(self, detail: str):
super().__init__(
status_code=status.HTTP_403_FORBIDDEN,
detail=detail
)
async def get_current_user(
security_scopes: SecurityScopes,
token: Annotated[str, Depends(oauth2_scheme)],
db: Session = Depends(database.get_db)
) -> models.User:
"""Authenticate user and verify scopes"""
authenticate_value = f'Bearer scope="{security_scopes.scope_str}"'
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": authenticate_value},
)
# Decode and verify token
payload = auth.decode_token(token)
if payload is None:
raise credentials_exception
user_id: int = payload.get("sub")
token_scopes: list = payload.get("scopes", [])
# Verify required scopes
for scope in security_scopes.scopes:
if scope not in token_scopes:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions",
headers={"WWW-Authenticate": authenticate_value},
)
user = db.query(models.User).filter(models.User.id == user_id).first()
if user is None:
raise credentials_exception
return user
class DocumentAccess:
"""Dependency for document access with permission checking"""
def __init__(self, require_ownership: bool = True):
self.require_ownership = require_ownership
async def __call__(
self,
doc_id: int,
current_user: Annotated[models.User, Security(get_current_user)],
db: Session = Depends(database.get_db)
) -> models.Document:
# Fetch document
doc = db.query(models.Document).filter(
models.Document.id == doc_id
).first()
if not doc:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Document not found"
)
# Check ownership
if doc.owner_id == current_user.id:
return doc
# If ownership required, deny access
if self.require_ownership:
raise AuthorizationError(
"You must be the owner to perform this action"
)
# Check shared access
shared_access = db.query(models.DocumentShare).filter(
models.DocumentShare.document_id == doc_id,
models.DocumentShare.user_id == current_user.id
).first()
if not shared_access:
raise AuthorizationError(
"You don't have access to this document"
)
# Attach permission info to document
doc.user_permission = {
'can_read': shared_access.can_read,
'can_write': shared_access.can_write,
'can_delete': shared_access.can_delete
}
return doc
# Usage with different permission requirements
@app.get("/api/documents/{doc_id}")
async def get_document(
document: Annotated[
models.Document,
Security(DocumentAccess(require_ownership=False), scopes=["documents:read"])
]
):
"""Get document - allows shared access"""
return {
"id": document.id,
"title": document.title,
"content": document.content,
"is_owner": not hasattr(document, 'user_permission')
}
@app.put("/api/documents/{doc_id}")
async def update_document(
document: Annotated[
models.Document,
Security(DocumentAccess(require_ownership=True), scopes=["documents:write"])
],
data: DocumentUpdate,
db: Session = Depends(database.get_db)
):
"""Update document - requires ownership"""
document.title = data.title
document.content = data.content
db.commit()
db.refresh(document)
return {"message": "Document updated", "document": document}
@app.delete("/api/documents/{doc_id}")
async def delete_document(
document: Annotated[
models.Document,
Security(DocumentAccess(require_ownership=True), scopes=["documents:delete"])
],
db: Session = Depends(database.get_db)
):
"""Delete document - requires ownership"""
db.delete(document)
db.commit()
return {"message": "Document deleted successfully"}
Why this works: FastAPI's Security dependency system provides OAuth2 scope-based authorization combined with custom object-level permissions. The DocumentAccess dependency class encapsulates complex authorization logic including ownership checks, shared access verification, and permission level validation. By using require_ownership parameter, the same dependency handles both owner-only endpoints (update/delete) and shared-access endpoints (read). The scope checking in get_current_user ensures API-level permissions are verified before object-level checks, providing layered security. This pattern makes it impossible for endpoints to receive unauthorized documents because the dependency raises exceptions before endpoint execution.
Remediation Steps
Locate the Finding
Review the security scan report to identify the data flow:
- Source: User-controlled input containing resource ID
- Flask:
request.args.get('doc_id'),<int:doc_id>in route - Django:
request.GET['order_id'], URL parameter - FastAPI: Path parameter
{doc_id}, Query parameter
- Flask:
- Sink: Database query or resource access without authorization
Document.query.get(doc_id)(Flask-SQLAlchemy)Document.objects.get(id=doc_id)(Django ORM)db.query(models.Document).filter(id == doc_id)(FastAPI/SQLAlchemy)
- Missing Control: No comparison of
resource.owner_idwithcurrent_user.id
Understand the Data Flow
Trace how the user-controlled ID flows through the code:
- Identify where the ID enters: URL parameter, query string, request body
- Follow the ID to the database query
- Check if any ownership validation exists between source and sink
- Verify if authentication is required (
@login_required,@require_auth) - Look for authorization checks (
if owner_id == current_user.id)
Identify the Pattern
Match the vulnerable code to one of these patterns:
- Direct lookup:
Model.query.get(id)→ No ownership check - Filter without owner:
Model.objects.filter(id=id)→ Missingowner=user - Bulk operation:
Model.objects.filter(id__in=ids).delete()→ No per-item auth - File access:
send_file(filename)→ No ownership validation
Apply the Fix
Choose the appropriate remediation approach based on your framework and architecture:
Option 1: Add ownership filter to database query (Recommended - best performance)
- Include ownership condition directly in the query filter
- Pattern: Combine resource ID filter with owner ID filter in same query
- Flask-SQLAlchemy: Use
filter_by(id=doc_id, owner_id=current_user.id) - Django ORM: Use
filter(id=doc_id, owner=request.user)orget_object_or_404(Model, id=x, owner=user) - FastAPI/SQLAlchemy: Include
owner_id == current_user.idin WHERE clause
Option 2: Use authorization decorator/dependency (Best for reusability)
- Create reusable authorization logic applied across multiple endpoints
- Flask: Custom decorator that checks ownership before endpoint execution
- Django: Mixins like
UserPassesTestMixinwithtest_func()checking ownership - FastAPI: Dependency injection that validates ownership and returns authorized resource
Option 3: Explicit ownership check after retrieval (Most transparent)
- Fetch resource, then verify current user is the owner
- Clearest to read but requires two logical steps
- Return 403 Forbidden if ownership check fails
- Useful when complex authorization logic is needed
Option 4: Framework-specific permissions (When using Django/DRF)
- Leverage Django's built-in permission system
- Use permission classes like
IsAuthenticated, customIsOwnerpermissions - Override
get_queryset()to scope to current user - Utilize Django Rest Framework's object-level permissions
See the Secure Patterns section for detailed implementation examples of each approach.
Verify the Fix
Test the remediation thoroughly:
- Test authorized access: Verify legitimate users can access their own resources
- Test unauthorized access: Confirm users cannot access others' resources (IDOR)
- Test with malicious inputs:
- Sequential IDs:
/api/document/1,/api/document/2,/api/document/3 - SQL injection attempts:
/api/document/1' OR '1'='1 - Path traversal:
/api/document/../../../etc/passwd
- Sequential IDs:
- Run automated tests: Execute unit tests covering authorization
- Rescan with security tool: Verify the finding is resolved
- Check business logic: Ensure legitimate functionality still works
Check for Similar Issues
Search the codebase for related vulnerabilities:
# Find direct Model.query.get() calls
grep -r "\.query\.get(" --include="*.py"
# Find Django objects.get() without owner filter
grep -r "objects\.get(id=" --include="*.py"
# Find route parameters that might be IDs
grep -r "@app\.route.*<.*id>" --include="*.py"
# Find FastAPI path parameters
grep -r "Path.*id.*int" --include="*.py"
Review all endpoints that:
- Accept user-controlled IDs in URLs or request bodies
- Perform database lookups based on those IDs
- Access files, documents, or other user-owned resources
- Perform update or delete operations on resources
Security Checklist
Use this checklist to verify your authorization implementation is secure:
Authorization Implementation
- All resource access endpoints verify ownership or permissions before returning data
- Database queries include ownership filters (e.g.,
filter_by(id=x, owner_id=current_user.id)) - No direct lookups without authorization (avoid bare
Model.query.get(id)orModel.objects.get(id=x)) - Authorization checks use server-side authenticated user, never client-provided user IDs
- Bulk operations verify ownership for each item individually
- File operations verify user owns the file before serving/deleting
Query Security
- Flask: Use
filter_by(id=x, owner_id=current_user.id)instead ofget(id) - Django: Use
filter(id=x, owner=request.user)orget_object_or_404(Model, id=x, owner=request.user) - FastAPI: Repository methods include user_id in WHERE clause
- All list endpoints scoped to current user (no global queries)
Authentication & Authorization
-
@login_requiredor equivalent on all resource endpoints - Django:
request.userextracted from session, not request parameters - Flask:
current_userfrom Flask-Login, not from request args - FastAPI: User extracted from JWT/OAuth2 token via dependencies
- Shared access permissions checked if applicable (read/write/delete levels)
Response Security
- Return 404 for both non-existent and unauthorized resources (don't leak existence)
- No sensitive data in error messages that could aid enumeration
- Consistent response structure for authorized and unauthorized requests
Testing Recommendations
- Test that users can access their own resources
- Test that users CANNOT access other users' resources (IDOR prevention)
- Test sequential ID enumeration is blocked
- Test unauthenticated requests are rejected (401)
- Test bulk operations only affect owned items
- Test with malformed IDs (don't return 500 errors)
- Test update/delete operations require ownership
- Test shared access permissions work correctly (if applicable)
Defense in Depth (Optional but Recommended)
- Use UUIDs instead of sequential IDs (makes enumeration harder)
- Rate limiting on resource access endpoints
- Logging/monitoring for authorization failures
- Security headers configured (CORS, CSP, etc.)
Code Review Focus Areas
- Search codebase for
query.get(,objects.get(id=, direct ID usage - Review all
@app.route,@api_view,@app.getdecorators with ID parameters - Check all DELETE, PUT, PATCH endpoints for authorization
- Verify decorators like
@authorize_resourceare consistently applied