Skip to content

CWE-918: Server-Side Request Forgery (SSRF) - Python

Overview

Server-Side Request Forgery (SSRF) allows attackers to make the server perform HTTP requests to arbitrary destinations, accessing internal services, cloud metadata endpoints, or bypassing security controls. Always validate URLs against an allowlist, block private IP ranges, and restrict protocols.

Primary Defence: Validate URLs against an allowlist of permitted domains, block private/reserved IP ranges using ipaddress module, and restrict protocols to https:// only.

Common Vulnerable Patterns

Direct URL Usage from User Input

# VULNERABLE - No validation on user-provided URL

import requests

def fetch_image(image_url):
    # No validation - SSRF vulnerability!
    response = requests.get(image_url)
    return response.content

# Attack examples:

# http://localhost:8080/admin

# http://169.254.169.254/latest/meta-data/iam/security-credentials/

# file:///etc/passwd

Unvalidated urllib Requests

# VULNERABLE - urllib without URL validation

from urllib.request import urlopen

def download_file(url):
    # No validation - SSRF vulnerability!
    with urlopen(url) as response:
        return response.read()

# Attack: url = "http://internal-api.local/sensitive-endpoint"

Flask/Django Without Validation

# VULNERABLE - Flask endpoint without URL validation

from flask import Flask, request
import requests

app = Flask(__name__)

@app.route('/proxy')
def proxy():
    url = request.args.get('url')
    # No validation - SSRF vulnerability!
    response = requests.get(url)
    return response.text

# Attack: /proxy?url=http://169.254.169.254/latest/meta-data/

Webhook Handler Without Validation

# VULNERABLE - Webhook without URL validation

import requests

def send_webhook(webhook_url, data):
    # No validation - SSRF vulnerability!
    response = requests.post(webhook_url, json=data)
    return response.status_code

# Attack: webhook_url = "http://localhost:6379/SET/key/value"  # Redis attack

Secure Patterns

URL Allowlist Validation

# SECURE - Validate URLs against allowlist

import requests
from urllib.parse import urlparse
import ipaddress
import socket

class SafeImageFetcher:
    ALLOWED_HOSTS = {
        'api.example.com',
        'cdn.example.com',
        'images.example.com'
    }

    ALLOWED_SCHEMES = {'https'}

    def fetch_image(self, image_url: str) -> bytes:
        validated_url = self._validate_url(image_url)

        response = requests.get(validated_url, timeout=10)
        response.raise_for_status()

        return response.content

    def _validate_url(self, url: str) -> str:
        try:
            parsed = urlparse(url)
        except Exception:
            raise SecurityError("Invalid URL")

        # Validate scheme
        if parsed.scheme not in self.ALLOWED_SCHEMES:
            raise SecurityError(f"Invalid URL scheme: {parsed.scheme}")

        # Validate host
        host = parsed.hostname
        if not host or host.lower() not in self.ALLOWED_HOSTS:
            raise SecurityError(f"Host not allowed: {host}")

        # Block private IP ranges
        if self._is_private_ip(host):
            raise SecurityError("Private IP addresses not allowed")

        return url

    def _is_private_ip(self, host: str) -> bool:
        try:
            # Resolve hostname to IP
            ip_addresses = socket.getaddrinfo(host, None)

            for ip_info in ip_addresses:
                ip_str = ip_info[4][0]
                ip = ipaddress.ip_address(ip_str)

                # Check for private IP ranges
                if ip.is_private or ip.is_loopback or ip.is_link_local:
                    return True

                # Check for AWS metadata IP
                if str(ip) == '169.254.169.254':
                    return True

            return False
        except Exception:
            # If DNS fails, block it
            return True

class SecurityError(Exception):
    pass

Why this works:

  • Host allowlist: Pre-approved domains (api.example.com, cdn.example.com, images.example.com) prevent arbitrary target selection
  • Scheme validation: urlparse blocks dangerous protocols (file://, jar://, ftp://) that bypass HTTP validation
  • Comprehensive DNS resolution: socket.getaddrinfo resolves to all IPs (IPv4/IPv6), validated with ipaddress.ip_address checks for is_private (RFC 1918: 10.x, 172.16-31.x, 192.168.x), is_loopback (127.x, ::1), is_link_local (169.254.x, fe80::)
  • Explicit AWS metadata protection: Direct check for 169.254.169.254 protects IAM credentials even if is_link_local misses edge cases
  • Fail-closed behavior: Exception handling on DNS errors prevents TOCTOU races where validation passes but resolution later points to private IP
  • Defense-in-depth: Multi-layer validation (allowlist → scheme → DNS → IP) prevents access to internal services (Redis on localhost:6379), cloud metadata, local files even with DNS rebinding
  • All-IP validation: Checks all resolved IPs (not just first), defeating attacks where getaddrinfo returns both public and private addresses

Requests with Validation and Timeout

# SECURE - requests library with comprehensive validation

import requests
from urllib.parse import urlparse
import re
import ipaddress
import socket

class SecureWebhookHandler:
    # Only allow specific domain pattern
    ALLOWED_URL_PATTERN = re.compile(r'^https://([a-z0-9-]+\.)*example\.com/.*$')

    def __init__(self):
        self.session = requests.Session()
        self.session.max_redirects = 0  # Disable redirects

    def send_webhook(self, webhook_url: str, data: dict) -> int:
        validated_url = self._validate_webhook_url(webhook_url)

        try:
            response = self.session.post(
                validated_url,
                json=data,
                timeout=10,
                allow_redirects=False
            )
            return response.status_code
        except requests.exceptions.RequestException as e:
            raise SecurityError(f"Request failed: {str(e)}")

    def _validate_webhook_url(self, url: str) -> str:
        if not url:
            raise SecurityError("URL cannot be empty")

        # Check against allowlist pattern
        if not self.ALLOWED_URL_PATTERN.match(url):
            raise SecurityError(f"URL not allowed: {url}")

        parsed = urlparse(url)

        # Only HTTPS
        if parsed.scheme != 'https':
            raise SecurityError("Only HTTPS allowed")

        # Block private IPs
        if self._is_private_address(parsed.hostname):
            raise SecurityError("Private IP addresses not allowed")

        return url

    def _is_private_address(self, host: str) -> bool:
        try:
            # Resolve DNS
            ip_addresses = socket.getaddrinfo(host, None)

            for ip_info in ip_addresses:
                ip_str = ip_info[4][0]
                ip = ipaddress.ip_address(ip_str)

                if ip.is_private or ip.is_loopback or ip.is_link_local:
                    return True

            return False
        except Exception:
            return True

Why this works:

  • Redirect blocking: max_redirects = 0 globally prevents redirect-based SSRF (e.g., example.com/redirect?to=localhost:6379)
  • Strict domain validation: Regex ^https://([a-z0-9-]+\.)*example\.com/.*$ prevents lookalikes (examp1e.com), typosquatting, null byte injection (example.com%00.attacker.com)
  • Protocol restriction: HTTPS-only blocks file://, ftp://, gopher://, jar:// protocols
  • DNS rebinding defense: socket.getaddrinfo + private IP checks after resolution prevent attackers pointing example.com to 127.0.0.1 mid-request
  • Defense-in-depth: Session-based config ensures consistent settings; 10s timeout prevents DoS; allow_redirects=False redundancy at both session and request levels

URL Validator Class

# SECURE - Reusable URL validator

from urllib.parse import urlparse
import ipaddress
import socket
from typing import Set

class UrlValidator:
    def __init__(
        self,
        allowed_schemes: Set[str],
        allowed_hosts: Set[str],
        block_private_ips: bool = True
    ):
        self.allowed_schemes = {s.lower() for s in allowed_schemes}
        self.allowed_hosts = {h.lower() for h in allowed_hosts}
        self.block_private_ips = block_private_ips

    def validate(self, url: str) -> str:
        try:
            parsed = urlparse(url)
        except Exception:
            raise ValueError("Invalid URL")

        # Validate scheme
        if parsed.scheme.lower() not in self.allowed_schemes:
            raise ValueError(f"Scheme not allowed: {parsed.scheme}")

        # Validate host
        host = parsed.hostname
        if not host:
            raise ValueError("No host in URL")

        host_lower = host.lower()

        if not self._is_host_allowed(host_lower):
            raise ValueError(f"Host not allowed: {host}")

        # Block private IPs
        if self.block_private_ips and self._is_private_ip(host):
            raise ValueError("Private IP addresses not allowed")

        # Block localhost variants
        if self._is_localhost(host_lower):
            raise ValueError("Localhost not allowed")

        return url

    def _is_host_allowed(self, host: str) -> bool:
        # Exact match
        if host in self.allowed_hosts:
            return True

        # Wildcard subdomain match (*.example.com)
        for allowed_host in self.allowed_hosts:
            if allowed_host.startswith('*.'):
                if host.endswith(allowed_host[1:]):
                    return True

        return False

    def _is_private_ip(self, host: str) -> bool:
        try:
            # Resolve hostname
            ip_addresses = socket.getaddrinfo(host, None)

            for ip_info in ip_addresses:
                ip_str = ip_info[4][0]
                ip = ipaddress.ip_address(ip_str)

                # Check for private/loopback/link-local
                if ip.is_private or ip.is_loopback or ip.is_link_local:
                    return True

                # AWS metadata endpoint
                if self._is_aws_metadata(ip):
                    return True

                # Docker internal network
                if self._is_docker_internal(ip):
                    return True

            return False
        except Exception:
            return True

    def _is_aws_metadata(self, ip: ipaddress.IPv4Address) -> bool:
        return str(ip) == '169.254.169.254'

    def _is_docker_internal(self, ip: ipaddress.IPv4Address) -> bool:
        # Docker default bridge: 172.17.0.0/16
        return str(ip).startswith('172.17.')

    def _is_localhost(self, host: str) -> bool:
        return host in ('localhost', '127.0.0.1', '::1', '0.0.0.0')

# Usage

validator = UrlValidator(
    allowed_schemes={'https'},
    allowed_hosts={'api.example.com', '*.cdn.example.com'},
    block_private_ips=True
)

safe_url = validator.validate(user_input)

Why this works:

  • Centralized reusable class: Constructor-based configuration (allowed schemes, hosts, private IP blocking) enables consistent security across requests, urllib, httpx, async clients
  • Flexible domain matching: Wildcard subdomains (*.cdn.example.com) with case-normalization (host.lower()) prevents bypass via mixed-case
  • Comprehensive private IP detection: Catches localhost variants (127.0.0.1, ::1, 0.0.0.0), AWS metadata (169.254.169.254), Docker networks (172.17.x)
  • DNS rebinding defense: Pre-resolves hostnames with socket.getaddrinfo() before validation; fail-closed on DNS errors prevents TOCTOU races
  • OOP benefits: Enables unit testing (mock DNS), per-environment config, framework integration (Django/Flask/FastAPI) without duplicating logic

DNS Pinning Attack Prevention

DNS pinning is a bypass technique where attackers:

  1. Create a domain that initially resolves to a legitimate IP
  2. Application validates the IP (passes allowlist)
  3. Attacker changes DNS to point to internal IP (127.0.0.1, 192.168.x.x)
  4. Application makes request using cached DNS or re-resolves
  5. Request goes to internal service

Protection against DNS pinning:

# Validate BOTH before DNS resolution AND after

from urllib.parse import urlparse
import socket
import ipaddress

def validate_before_and_after_dns(url: str, allowed_domains: set) -> str:
    """Validate URL and prevent DNS pinning attacks"""
    parsed = urlparse(url)
    hostname = parsed.hostname

    # Step 1: Validate hostname against allowlist
    if hostname not in allowed_domains:
        raise SecurityException("Domain not allowed")

    # Step 2: Resolve DNS and validate ALL resolved IPs
    try:
        ip_addresses = socket.getaddrinfo(hostname, None)
        for ip_info in ip_addresses:
            ip_str = ip_info[4][0]
            ip = ipaddress.ip_address(ip_str)

            # Check for private/loopback/link-local
            if ip.is_private or ip.is_loopback or ip.is_link_local:
                raise SecurityException(f"Domain resolves to private IP: {ip_str}")

            # Check for AWS metadata endpoint
            if str(ip) == '169.254.169.254':
                raise SecurityException("Access to AWS metadata endpoint blocked")
    except socket.gaierror:
        raise SecurityException("Cannot resolve hostname")

    # Step 3: Re-validate just before making request
    # (implement with minimal TTL caching)
    return url

class SecurityException(Exception):
    pass

# Example usage:

ALLOWED_DOMAINS = {'api.example.com', 'cdn.example.com'}

try:
    safe_url = validate_before_and_after_dns(
        user_provided_url,
        ALLOWED_DOMAINS
    )
    # Make request with validated URL
    response = requests.get(safe_url, timeout=10, allow_redirects=False)
except SecurityException as e:
    logger.error(f"SSRF protection triggered: {e}")
    raise

Why this works:

  • DNS pinning defense: Two-phase validation (hostname allowlist, then DNS with IP checks) defeats attackers controlling DNS: initial public IP passes, but changing to 127.0.0.1 before HTTP request fails
  • All IPs validated: socket.getaddrinfo returns all A/AAAA records - attackers can't hide private IP in secondary records; real-time resolution (no long cache) forces re-validation
  • Comprehensive private IP detection: is_private/is_loopback/is_link_local catch RFC 1918, loopback, link-local; explicit AWS metadata check (169.254.169.254) protects IAM credentials
  • Fail-closed DNS errors: Exception handling on socket.gaierror blocks requests when DNS fails, preventing TOCTOU where validation skips but HTTP succeeds later
  • Long-running service protection: Critical for services where DNS records change during uptime and multi-tenant environments with attacker-controlled subdomains

Framework-Specific Guidance

Django

# SECURE - Django view with URL validation

from django.http import JsonResponse, HttpResponseBadRequest
from django.views.decorators.http import require_http_methods
import requests

# Create validator instance

url_validator = UrlValidator(
    allowed_schemes={'https'},
    allowed_hosts={'api.example.com'},
    block_private_ips=True
)

@require_http_methods(["GET"])
def proxy_view(request):
    url = request.GET.get('url')

    if not url:
        return HttpResponseBadRequest("URL parameter required")

    try:
        # Validate URL
        validated_url = url_validator.validate(url)

        # Make request with timeout and no redirects
        response = requests.get(
            validated_url,
            timeout=10,
            allow_redirects=False
        )

        return JsonResponse({
            'status': response.status_code,
            'content': response.text
        })

    except ValueError as e:
        return HttpResponseBadRequest(f"Invalid URL: {str(e)}")
    except requests.exceptions.RequestException as e:
        return HttpResponseBadRequest(f"Request failed: {str(e)}")

# settings.py - Configure allowed hosts

SSRF_ALLOWED_HOSTS = [
    'api.example.com',
    '*.cdn.example.com'
]

Flask

# SECURE - Flask with URL validation

from flask import Flask, request, jsonify
import requests

app = Flask(__name__)

# Initialize validator

url_validator = UrlValidator(
    allowed_schemes={'https'},
    allowed_hosts={'api.example.com', 'public-api.example.org'},
    block_private_ips=True
)

@app.route('/proxy')
def proxy():
    url = request.args.get('url')

    if not url:
        return jsonify({'error': 'URL parameter required'}), 400

    try:
        # Validate URL
        validated_url = url_validator.validate(url)

        # Make request
        response = requests.get(
            validated_url,
            timeout=10,
            allow_redirects=False
        )

        return jsonify({
            'status': response.status_code,
            'content': response.text
        })

    except ValueError as e:
        return jsonify({'error': f'Invalid URL: {str(e)}'}), 400
    except requests.exceptions.RequestException as e:
        return jsonify({'error': f'Request failed: {str(e)}'}), 500

FastAPI

# SECURE - FastAPI with URL validation

from fastapi import FastAPI, HTTPException, Query
from pydantic import BaseModel
import requests

app = FastAPI()

# Initialize validator

url_validator = UrlValidator(
    allowed_schemes={'https'},
    allowed_hosts={'api.example.com'},
    block_private_ips=True
)

class ProxyResponse(BaseModel):
    status: int
    content: str

@app.get("/proxy", response_model=ProxyResponse)
async def proxy(url: str = Query(..., description="URL to fetch")):
    try:
        # Validate URL
        validated_url = url_validator.validate(url)

        # Make request
        response = requests.get(
            validated_url,
            timeout=10,
            allow_redirects=False
        )

        return ProxyResponse(
            status=response.status_code,
            content=response.text
        )

    except ValueError as e:
        raise HTTPException(status_code=400, detail=f"Invalid URL: {str(e)}")
    except requests.exceptions.RequestException as e:
        raise HTTPException(status_code=500, detail=f"Request failed: {str(e)}")

aiohttp (Async)

# SECURE - aiohttp with URL validation

import aiohttp
from aiohttp import web

async def proxy_handler(request):
    url = request.query.get('url')

    if not url:
        return web.Response(text='URL parameter required', status=400)

    try:
        # Validate URL
        url_validator = UrlValidator(
            allowed_schemes={'https'},
            allowed_hosts={'api.example.com'},
            block_private_ips=True
        )
        validated_url = url_validator.validate(url)

        # Make async request
        timeout = aiohttp.ClientTimeout(total=10)
        async with aiohttp.ClientSession(timeout=timeout) as session:
            async with session.get(validated_url, allow_redirects=False) as response:
                content = await response.text()

                return web.json_response({
                    'status': response.status,
                    'content': content
                })

    except ValueError as e:
        return web.Response(text=f'Invalid URL: {str(e)}', status=400)
    except aiohttp.ClientError as e:
        return web.Response(text=f'Request failed: {str(e)}', status=500)

app = web.Application()
app.router.add_get('/proxy', proxy_handler)

Protecting Cloud Metadata Endpoints

# SECURE - Block AWS/Azure/GCP metadata endpoints

import ipaddress
from urllib.parse import urlparse

class MetadataProtection:
    BLOCKED_HOSTS = {
        '169.254.169.254',           # AWS/Azure metadata
        'metadata.google.internal',  # GCP metadata
        'metadata',
        'metadata.azure.com'
    }

    BLOCKED_PATHS = {
        '/latest/meta-data',
        '/latest/user-data',
        '/latest/dynamic',
        '/computeMetadata/v1',
        '/metadata/instance'
    }

    def validate_not_metadata(self, url: str):
        parsed = urlparse(url)

        host = parsed.hostname.lower() if parsed.hostname else ''
        path = parsed.path

        # Block metadata service hostnames
        if host in self.BLOCKED_HOSTS:
            raise ValueError("Access to metadata service blocked")

        # Block metadata paths
        for blocked_path in self.BLOCKED_PATHS:
            if path.startswith(blocked_path):
                raise ValueError("Access to metadata endpoint blocked")

        # Block link-local addresses (169.254.x.x)
        try:
            import socket
            ip_addresses = socket.getaddrinfo(host, None)

            for ip_info in ip_addresses:
                ip_str = ip_info[4][0]
                ip = ipaddress.ip_address(ip_str)

                if ip.is_link_local:
                    raise ValueError("Link-local addresses blocked")
        except Exception:
            raise ValueError("DNS resolution failed")

Testing and Validation

SSRF vulnerabilities should be identified through:

  • Static Analysis Tools: Use tools like Bandit, Semgrep, SonarQube, or Snyk to identify potential SSRF sinks
  • Dynamic Application Security Testing (DAST): Tools like OWASP ZAP, Burp Suite, or Acunetix can test for SSRF by manipulating URL parameters
  • Manual Penetration Testing: Test with internal IP addresses (127.0.0.1, 192.168.x.x), cloud metadata endpoints (169.254.169.254), and file:// protocols
  • Code Review: Ensure all HTTP client usage includes URL validation against an allowlist and blocks private IP ranges
  • Network Monitoring: Monitor outbound requests to detect unexpected internal network access

Additional Resources