CWE-918: Server-Side Request Forgery (SSRF) - Python
Overview
Server-Side Request Forgery (SSRF) allows attackers to make the server perform HTTP requests to arbitrary destinations, accessing internal services, cloud metadata endpoints, or bypassing security controls. Always validate URLs against an allowlist, block private IP ranges, and restrict protocols.
Primary Defence: Validate URLs against an allowlist of permitted domains, block private/reserved IP ranges using ipaddress module, and restrict protocols to https:// only.
Common Vulnerable Patterns
Direct URL Usage from User Input
# VULNERABLE - No validation on user-provided URL
import requests
def fetch_image(image_url):
# No validation - SSRF vulnerability!
response = requests.get(image_url)
return response.content
# Attack examples:
# http://localhost:8080/admin
# http://169.254.169.254/latest/meta-data/iam/security-credentials/
# file:///etc/passwd
Unvalidated urllib Requests
# VULNERABLE - urllib without URL validation
from urllib.request import urlopen
def download_file(url):
# No validation - SSRF vulnerability!
with urlopen(url) as response:
return response.read()
# Attack: url = "http://internal-api.local/sensitive-endpoint"
Flask/Django Without Validation
# VULNERABLE - Flask endpoint without URL validation
from flask import Flask, request
import requests
app = Flask(__name__)
@app.route('/proxy')
def proxy():
url = request.args.get('url')
# No validation - SSRF vulnerability!
response = requests.get(url)
return response.text
# Attack: /proxy?url=http://169.254.169.254/latest/meta-data/
Webhook Handler Without Validation
# VULNERABLE - Webhook without URL validation
import requests
def send_webhook(webhook_url, data):
# No validation - SSRF vulnerability!
response = requests.post(webhook_url, json=data)
return response.status_code
# Attack: webhook_url = "http://localhost:6379/SET/key/value" # Redis attack
Secure Patterns
URL Allowlist Validation
# SECURE - Validate URLs against allowlist
import requests
from urllib.parse import urlparse
import ipaddress
import socket
class SafeImageFetcher:
ALLOWED_HOSTS = {
'api.example.com',
'cdn.example.com',
'images.example.com'
}
ALLOWED_SCHEMES = {'https'}
def fetch_image(self, image_url: str) -> bytes:
validated_url = self._validate_url(image_url)
response = requests.get(validated_url, timeout=10)
response.raise_for_status()
return response.content
def _validate_url(self, url: str) -> str:
try:
parsed = urlparse(url)
except Exception:
raise SecurityError("Invalid URL")
# Validate scheme
if parsed.scheme not in self.ALLOWED_SCHEMES:
raise SecurityError(f"Invalid URL scheme: {parsed.scheme}")
# Validate host
host = parsed.hostname
if not host or host.lower() not in self.ALLOWED_HOSTS:
raise SecurityError(f"Host not allowed: {host}")
# Block private IP ranges
if self._is_private_ip(host):
raise SecurityError("Private IP addresses not allowed")
return url
def _is_private_ip(self, host: str) -> bool:
try:
# Resolve hostname to IP
ip_addresses = socket.getaddrinfo(host, None)
for ip_info in ip_addresses:
ip_str = ip_info[4][0]
ip = ipaddress.ip_address(ip_str)
# Check for private IP ranges
if ip.is_private or ip.is_loopback or ip.is_link_local:
return True
# Check for AWS metadata IP
if str(ip) == '169.254.169.254':
return True
return False
except Exception:
# If DNS fails, block it
return True
class SecurityError(Exception):
pass
Why this works:
- Host allowlist: Pre-approved domains (
api.example.com,cdn.example.com,images.example.com) prevent arbitrary target selection - Scheme validation:
urlparseblocks dangerous protocols (file://,jar://,ftp://) that bypass HTTP validation - Comprehensive DNS resolution:
socket.getaddrinforesolves to all IPs (IPv4/IPv6), validated withipaddress.ip_addresschecks foris_private(RFC 1918: 10.x, 172.16-31.x, 192.168.x),is_loopback(127.x, ::1),is_link_local(169.254.x, fe80::) - Explicit AWS metadata protection: Direct check for
169.254.169.254protects IAM credentials even ifis_link_localmisses edge cases - Fail-closed behavior: Exception handling on DNS errors prevents TOCTOU races where validation passes but resolution later points to private IP
- Defense-in-depth: Multi-layer validation (allowlist → scheme → DNS → IP) prevents access to internal services (Redis on localhost:6379), cloud metadata, local files even with DNS rebinding
- All-IP validation: Checks all resolved IPs (not just first), defeating attacks where
getaddrinforeturns both public and private addresses
Requests with Validation and Timeout
# SECURE - requests library with comprehensive validation
import requests
from urllib.parse import urlparse
import re
import ipaddress
import socket
class SecureWebhookHandler:
# Only allow specific domain pattern
ALLOWED_URL_PATTERN = re.compile(r'^https://([a-z0-9-]+\.)*example\.com/.*$')
def __init__(self):
self.session = requests.Session()
self.session.max_redirects = 0 # Disable redirects
def send_webhook(self, webhook_url: str, data: dict) -> int:
validated_url = self._validate_webhook_url(webhook_url)
try:
response = self.session.post(
validated_url,
json=data,
timeout=10,
allow_redirects=False
)
return response.status_code
except requests.exceptions.RequestException as e:
raise SecurityError(f"Request failed: {str(e)}")
def _validate_webhook_url(self, url: str) -> str:
if not url:
raise SecurityError("URL cannot be empty")
# Check against allowlist pattern
if not self.ALLOWED_URL_PATTERN.match(url):
raise SecurityError(f"URL not allowed: {url}")
parsed = urlparse(url)
# Only HTTPS
if parsed.scheme != 'https':
raise SecurityError("Only HTTPS allowed")
# Block private IPs
if self._is_private_address(parsed.hostname):
raise SecurityError("Private IP addresses not allowed")
return url
def _is_private_address(self, host: str) -> bool:
try:
# Resolve DNS
ip_addresses = socket.getaddrinfo(host, None)
for ip_info in ip_addresses:
ip_str = ip_info[4][0]
ip = ipaddress.ip_address(ip_str)
if ip.is_private or ip.is_loopback or ip.is_link_local:
return True
return False
except Exception:
return True
Why this works:
- Redirect blocking:
max_redirects = 0globally prevents redirect-based SSRF (e.g.,example.com/redirect?to=localhost:6379) - Strict domain validation: Regex
^https://([a-z0-9-]+\.)*example\.com/.*$prevents lookalikes (examp1e.com), typosquatting, null byte injection (example.com%00.attacker.com) - Protocol restriction: HTTPS-only blocks
file://,ftp://,gopher://,jar://protocols - DNS rebinding defense:
socket.getaddrinfo+ private IP checks after resolution prevent attackers pointingexample.comto 127.0.0.1 mid-request - Defense-in-depth: Session-based config ensures consistent settings; 10s timeout prevents DoS;
allow_redirects=Falseredundancy at both session and request levels
URL Validator Class
# SECURE - Reusable URL validator
from urllib.parse import urlparse
import ipaddress
import socket
from typing import Set
class UrlValidator:
def __init__(
self,
allowed_schemes: Set[str],
allowed_hosts: Set[str],
block_private_ips: bool = True
):
self.allowed_schemes = {s.lower() for s in allowed_schemes}
self.allowed_hosts = {h.lower() for h in allowed_hosts}
self.block_private_ips = block_private_ips
def validate(self, url: str) -> str:
try:
parsed = urlparse(url)
except Exception:
raise ValueError("Invalid URL")
# Validate scheme
if parsed.scheme.lower() not in self.allowed_schemes:
raise ValueError(f"Scheme not allowed: {parsed.scheme}")
# Validate host
host = parsed.hostname
if not host:
raise ValueError("No host in URL")
host_lower = host.lower()
if not self._is_host_allowed(host_lower):
raise ValueError(f"Host not allowed: {host}")
# Block private IPs
if self.block_private_ips and self._is_private_ip(host):
raise ValueError("Private IP addresses not allowed")
# Block localhost variants
if self._is_localhost(host_lower):
raise ValueError("Localhost not allowed")
return url
def _is_host_allowed(self, host: str) -> bool:
# Exact match
if host in self.allowed_hosts:
return True
# Wildcard subdomain match (*.example.com)
for allowed_host in self.allowed_hosts:
if allowed_host.startswith('*.'):
if host.endswith(allowed_host[1:]):
return True
return False
def _is_private_ip(self, host: str) -> bool:
try:
# Resolve hostname
ip_addresses = socket.getaddrinfo(host, None)
for ip_info in ip_addresses:
ip_str = ip_info[4][0]
ip = ipaddress.ip_address(ip_str)
# Check for private/loopback/link-local
if ip.is_private or ip.is_loopback or ip.is_link_local:
return True
# AWS metadata endpoint
if self._is_aws_metadata(ip):
return True
# Docker internal network
if self._is_docker_internal(ip):
return True
return False
except Exception:
return True
def _is_aws_metadata(self, ip: ipaddress.IPv4Address) -> bool:
return str(ip) == '169.254.169.254'
def _is_docker_internal(self, ip: ipaddress.IPv4Address) -> bool:
# Docker default bridge: 172.17.0.0/16
return str(ip).startswith('172.17.')
def _is_localhost(self, host: str) -> bool:
return host in ('localhost', '127.0.0.1', '::1', '0.0.0.0')
# Usage
validator = UrlValidator(
allowed_schemes={'https'},
allowed_hosts={'api.example.com', '*.cdn.example.com'},
block_private_ips=True
)
safe_url = validator.validate(user_input)
Why this works:
- Centralized reusable class: Constructor-based configuration (allowed schemes, hosts, private IP blocking) enables consistent security across
requests,urllib,httpx, async clients - Flexible domain matching: Wildcard subdomains (
*.cdn.example.com) with case-normalization (host.lower()) prevents bypass via mixed-case - Comprehensive private IP detection: Catches localhost variants (
127.0.0.1,::1,0.0.0.0), AWS metadata (169.254.169.254), Docker networks (172.17.x) - DNS rebinding defense: Pre-resolves hostnames with
socket.getaddrinfo()before validation; fail-closed on DNS errors prevents TOCTOU races - OOP benefits: Enables unit testing (mock DNS), per-environment config, framework integration (Django/Flask/FastAPI) without duplicating logic
DNS Pinning Attack Prevention
DNS pinning is a bypass technique where attackers:
- Create a domain that initially resolves to a legitimate IP
- Application validates the IP (passes allowlist)
- Attacker changes DNS to point to internal IP (127.0.0.1, 192.168.x.x)
- Application makes request using cached DNS or re-resolves
- Request goes to internal service
Protection against DNS pinning:
# Validate BOTH before DNS resolution AND after
from urllib.parse import urlparse
import socket
import ipaddress
def validate_before_and_after_dns(url: str, allowed_domains: set) -> str:
"""Validate URL and prevent DNS pinning attacks"""
parsed = urlparse(url)
hostname = parsed.hostname
# Step 1: Validate hostname against allowlist
if hostname not in allowed_domains:
raise SecurityException("Domain not allowed")
# Step 2: Resolve DNS and validate ALL resolved IPs
try:
ip_addresses = socket.getaddrinfo(hostname, None)
for ip_info in ip_addresses:
ip_str = ip_info[4][0]
ip = ipaddress.ip_address(ip_str)
# Check for private/loopback/link-local
if ip.is_private or ip.is_loopback or ip.is_link_local:
raise SecurityException(f"Domain resolves to private IP: {ip_str}")
# Check for AWS metadata endpoint
if str(ip) == '169.254.169.254':
raise SecurityException("Access to AWS metadata endpoint blocked")
except socket.gaierror:
raise SecurityException("Cannot resolve hostname")
# Step 3: Re-validate just before making request
# (implement with minimal TTL caching)
return url
class SecurityException(Exception):
pass
# Example usage:
ALLOWED_DOMAINS = {'api.example.com', 'cdn.example.com'}
try:
safe_url = validate_before_and_after_dns(
user_provided_url,
ALLOWED_DOMAINS
)
# Make request with validated URL
response = requests.get(safe_url, timeout=10, allow_redirects=False)
except SecurityException as e:
logger.error(f"SSRF protection triggered: {e}")
raise
Why this works:
- DNS pinning defense: Two-phase validation (hostname allowlist, then DNS with IP checks) defeats attackers controlling DNS: initial public IP passes, but changing to
127.0.0.1before HTTP request fails - All IPs validated:
socket.getaddrinforeturns all A/AAAA records - attackers can't hide private IP in secondary records; real-time resolution (no long cache) forces re-validation - Comprehensive private IP detection:
is_private/is_loopback/is_link_localcatch RFC 1918, loopback, link-local; explicit AWS metadata check (169.254.169.254) protects IAM credentials - Fail-closed DNS errors: Exception handling on
socket.gaierrorblocks requests when DNS fails, preventing TOCTOU where validation skips but HTTP succeeds later - Long-running service protection: Critical for services where DNS records change during uptime and multi-tenant environments with attacker-controlled subdomains
Framework-Specific Guidance
Django
# SECURE - Django view with URL validation
from django.http import JsonResponse, HttpResponseBadRequest
from django.views.decorators.http import require_http_methods
import requests
# Create validator instance
url_validator = UrlValidator(
allowed_schemes={'https'},
allowed_hosts={'api.example.com'},
block_private_ips=True
)
@require_http_methods(["GET"])
def proxy_view(request):
url = request.GET.get('url')
if not url:
return HttpResponseBadRequest("URL parameter required")
try:
# Validate URL
validated_url = url_validator.validate(url)
# Make request with timeout and no redirects
response = requests.get(
validated_url,
timeout=10,
allow_redirects=False
)
return JsonResponse({
'status': response.status_code,
'content': response.text
})
except ValueError as e:
return HttpResponseBadRequest(f"Invalid URL: {str(e)}")
except requests.exceptions.RequestException as e:
return HttpResponseBadRequest(f"Request failed: {str(e)}")
# settings.py - Configure allowed hosts
SSRF_ALLOWED_HOSTS = [
'api.example.com',
'*.cdn.example.com'
]
Flask
# SECURE - Flask with URL validation
from flask import Flask, request, jsonify
import requests
app = Flask(__name__)
# Initialize validator
url_validator = UrlValidator(
allowed_schemes={'https'},
allowed_hosts={'api.example.com', 'public-api.example.org'},
block_private_ips=True
)
@app.route('/proxy')
def proxy():
url = request.args.get('url')
if not url:
return jsonify({'error': 'URL parameter required'}), 400
try:
# Validate URL
validated_url = url_validator.validate(url)
# Make request
response = requests.get(
validated_url,
timeout=10,
allow_redirects=False
)
return jsonify({
'status': response.status_code,
'content': response.text
})
except ValueError as e:
return jsonify({'error': f'Invalid URL: {str(e)}'}), 400
except requests.exceptions.RequestException as e:
return jsonify({'error': f'Request failed: {str(e)}'}), 500
FastAPI
# SECURE - FastAPI with URL validation
from fastapi import FastAPI, HTTPException, Query
from pydantic import BaseModel
import requests
app = FastAPI()
# Initialize validator
url_validator = UrlValidator(
allowed_schemes={'https'},
allowed_hosts={'api.example.com'},
block_private_ips=True
)
class ProxyResponse(BaseModel):
status: int
content: str
@app.get("/proxy", response_model=ProxyResponse)
async def proxy(url: str = Query(..., description="URL to fetch")):
try:
# Validate URL
validated_url = url_validator.validate(url)
# Make request
response = requests.get(
validated_url,
timeout=10,
allow_redirects=False
)
return ProxyResponse(
status=response.status_code,
content=response.text
)
except ValueError as e:
raise HTTPException(status_code=400, detail=f"Invalid URL: {str(e)}")
except requests.exceptions.RequestException as e:
raise HTTPException(status_code=500, detail=f"Request failed: {str(e)}")
aiohttp (Async)
# SECURE - aiohttp with URL validation
import aiohttp
from aiohttp import web
async def proxy_handler(request):
url = request.query.get('url')
if not url:
return web.Response(text='URL parameter required', status=400)
try:
# Validate URL
url_validator = UrlValidator(
allowed_schemes={'https'},
allowed_hosts={'api.example.com'},
block_private_ips=True
)
validated_url = url_validator.validate(url)
# Make async request
timeout = aiohttp.ClientTimeout(total=10)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(validated_url, allow_redirects=False) as response:
content = await response.text()
return web.json_response({
'status': response.status,
'content': content
})
except ValueError as e:
return web.Response(text=f'Invalid URL: {str(e)}', status=400)
except aiohttp.ClientError as e:
return web.Response(text=f'Request failed: {str(e)}', status=500)
app = web.Application()
app.router.add_get('/proxy', proxy_handler)
Protecting Cloud Metadata Endpoints
# SECURE - Block AWS/Azure/GCP metadata endpoints
import ipaddress
from urllib.parse import urlparse
class MetadataProtection:
BLOCKED_HOSTS = {
'169.254.169.254', # AWS/Azure metadata
'metadata.google.internal', # GCP metadata
'metadata',
'metadata.azure.com'
}
BLOCKED_PATHS = {
'/latest/meta-data',
'/latest/user-data',
'/latest/dynamic',
'/computeMetadata/v1',
'/metadata/instance'
}
def validate_not_metadata(self, url: str):
parsed = urlparse(url)
host = parsed.hostname.lower() if parsed.hostname else ''
path = parsed.path
# Block metadata service hostnames
if host in self.BLOCKED_HOSTS:
raise ValueError("Access to metadata service blocked")
# Block metadata paths
for blocked_path in self.BLOCKED_PATHS:
if path.startswith(blocked_path):
raise ValueError("Access to metadata endpoint blocked")
# Block link-local addresses (169.254.x.x)
try:
import socket
ip_addresses = socket.getaddrinfo(host, None)
for ip_info in ip_addresses:
ip_str = ip_info[4][0]
ip = ipaddress.ip_address(ip_str)
if ip.is_link_local:
raise ValueError("Link-local addresses blocked")
except Exception:
raise ValueError("DNS resolution failed")
Testing and Validation
SSRF vulnerabilities should be identified through:
- Static Analysis Tools: Use tools like Bandit, Semgrep, SonarQube, or Snyk to identify potential SSRF sinks
- Dynamic Application Security Testing (DAST): Tools like OWASP ZAP, Burp Suite, or Acunetix can test for SSRF by manipulating URL parameters
- Manual Penetration Testing: Test with internal IP addresses (127.0.0.1, 192.168.x.x), cloud metadata endpoints (169.254.169.254), and file:// protocols
- Code Review: Ensure all HTTP client usage includes URL validation against an allowlist and blocks private IP ranges
- Network Monitoring: Monitor outbound requests to detect unexpected internal network access