Skip to content

CWE-918: Server-Side Request Forgery (SSRF) - Python

Overview

Server-Side Request Forgery (SSRF) allows attackers to make the server perform HTTP requests to arbitrary destinations, accessing internal services, cloud metadata endpoints, or bypassing security controls. Always validate URLs against an allowlist, block private IP ranges, and restrict protocols.

Primary Defence: Validate URLs against an allowlist of permitted domains, block private/reserved IP ranges using ipaddress module, and restrict protocols to https:// only.

Common Vulnerable Patterns

Direct URL Usage from User Input

# VULNERABLE - No validation on user-provided URL

import requests

def fetch_image(image_url):
    # No validation - SSRF vulnerability!
    response = requests.get(image_url)
    return response.content

# Attack examples:

# http://localhost:8080/admin

# http://169.254.169.254/latest/meta-data/iam/security-credentials/

# file:///etc/passwd  # non-HTTP schemes should be rejected before the client runs

Unvalidated urllib Requests

# VULNERABLE - urllib without URL validation

from urllib.request import urlopen

def download_file(url):
    # No validation - SSRF vulnerability!
    with urlopen(url) as response:
        return response.read()

# Attack: url = "http://internal-api.local/sensitive-endpoint"

Flask/Django Without Validation

# VULNERABLE - Flask endpoint without URL validation

from flask import Flask, request
import requests

app = Flask(__name__)

@app.route('/proxy')
def proxy():
    url = request.args.get('url')
    # No validation - SSRF vulnerability!
    response = requests.get(url)
    return response.text

# Attack: /proxy?url=http://169.254.169.254/latest/meta-data/

Webhook Handler Without Validation

# VULNERABLE - Webhook without URL validation

import requests

def send_webhook(webhook_url, data):
    # No validation - SSRF vulnerability!
    response = requests.post(webhook_url, json=data)
    return response.status_code

# Attack: webhook_url = "http://localhost:6379/SET/key/value"  # Redis attack

Secure Patterns

URL Allowlist Validation

# SECURE - Validate URLs against allowlist

import requests
from urllib.parse import urlparse
import ipaddress
import socket

class SafeImageFetcher:
    ALLOWED_HOSTS = {
        'api.example.com',
        'cdn.example.com',
        'images.example.com'
    }

    ALLOWED_SCHEMES = {'https'}

    def fetch_image(self, image_url: str) -> bytes:
        validated_url = self._validate_url(image_url)

        response = requests.get(validated_url, timeout=10)
        response.raise_for_status()

        return response.content

    def _validate_url(self, url: str) -> str:
        try:
            parsed = urlparse(url)
        except Exception:
            raise SecurityError("Invalid URL")

        # Validate scheme
        if parsed.scheme not in self.ALLOWED_SCHEMES:
            raise SecurityError(f"Invalid URL scheme: {parsed.scheme}")

        # Validate host
        host = parsed.hostname
        if not host or host.lower() not in self.ALLOWED_HOSTS:
            raise SecurityError(f"Host not allowed: {host}")

        # Block private IP ranges
        if self._is_private_ip(host):
            raise SecurityError("Private IP addresses not allowed")

        return url

    def _is_private_ip(self, host: str) -> bool:
        try:
            # Resolve hostname to IP
            ip_addresses = socket.getaddrinfo(host, None)

            for ip_info in ip_addresses:
                ip_str = ip_info[4][0]
                ip = ipaddress.ip_address(ip_str)

                # Require globally reachable addresses; block private, loopback,
                # link-local, reserved, multicast, unspecified, and CGN ranges.
                if not ip.is_global:
                    return True

                # Check for AWS metadata IP
                if str(ip) == '169.254.169.254':
                    return True

            return False
        except Exception:
            # If DNS fails, block it
            return True

class SecurityError(Exception):
    pass

Why this works:

  • Host allowlist: Pre-approved domains (api.example.com, cdn.example.com, images.example.com) prevent arbitrary target selection
  • Scheme validation: urlparse identifies the scheme so the allowlist can reject file://, jar://, ftp://, and other non-HTTP protocols before the client runs
  • Comprehensive DNS resolution: socket.getaddrinfo resolves all IPs (IPv4/IPv6), then ipaddress.ip_address(...).is_global rejects private, loopback, link-local, reserved, multicast, unspecified, and shared-address ranges
  • Explicit AWS metadata protection: Direct check for 169.254.169.254 documents the cloud-metadata threat even though is_global also rejects link-local addresses
  • Fail-closed behavior: DNS errors block the request instead of falling back to a best-effort fetch
  • Defense-in-depth: Multi-layer validation (allowlist -> scheme -> DNS -> IP) blocks common SSRF targets, but production clients should also disable redirects and pin or revalidate the address at connection time to close DNS rebinding races
  • All-IP validation: Checks all resolved IPs (not just first), defeating attacks where getaddrinfo returns both public and private addresses

Requests with Validation and Timeout

# SECURE - requests library with comprehensive validation

import requests
from urllib.parse import urlparse
import re
import ipaddress
import socket

class SecureWebhookHandler:
    # Only allow specific domain pattern
    ALLOWED_URL_PATTERN = re.compile(r'^https://([a-z0-9-]+\.)*example\.com/.*$')

    def __init__(self):
        self.session = requests.Session()

    def send_webhook(self, webhook_url: str, data: dict) -> int:
        validated_url = self._validate_webhook_url(webhook_url)

        try:
            response = self.session.post(
                validated_url,
                json=data,
                timeout=10,
                allow_redirects=False
            )
            return response.status_code
        except requests.exceptions.RequestException as e:
            raise SecurityError(f"Request failed: {str(e)}")

    def _validate_webhook_url(self, url: str) -> str:
        if not url:
            raise SecurityError("URL cannot be empty")

        # Check against allowlist pattern
        if not self.ALLOWED_URL_PATTERN.match(url):
            raise SecurityError(f"URL not allowed: {url}")

        parsed = urlparse(url)

        # Only HTTPS
        if parsed.scheme != 'https':
            raise SecurityError("Only HTTPS allowed")

        # Block private IPs
        if self._is_private_address(parsed.hostname):
            raise SecurityError("Private IP addresses not allowed")

        return url

    def _is_private_address(self, host: str) -> bool:
        try:
            # Resolve DNS
            ip_addresses = socket.getaddrinfo(host, None)

            for ip_info in ip_addresses:
                ip_str = ip_info[4][0]
                ip = ipaddress.ip_address(ip_str)

                if not ip.is_global:
                    return True

            return False
        except Exception:
            return True

Why this works:

  • Redirect blocking: allow_redirects=False prevents redirect-based SSRF (e.g., example.com/redirect?to=localhost:6379)
  • Strict domain validation: Regex ^https://([a-z0-9-]+\.)*example\.com/.*$ prevents lookalikes (examp1e.com), typosquatting, null byte injection (example.com%00.attacker.com)
  • Protocol restriction: HTTPS-only blocks file://, ftp://, gopher://, jar:// protocols
  • DNS rebinding defense: socket.getaddrinfo + IP checks catch unsafe current resolutions; use connection pinning or client transport hooks when attacker-controlled DNS is in scope
  • Defense-in-depth: Session-based config ensures consistent settings; 10s timeout prevents DoS; allow_redirects=False redundancy at both session and request levels

URL Validator Class

# SECURE - Reusable URL validator

from urllib.parse import urlparse
import ipaddress
import socket
from typing import Set

class UrlValidator:
    def __init__(
        self,
        allowed_schemes: Set[str],
        allowed_hosts: Set[str],
        block_private_ips: bool = True
    ):
        self.allowed_schemes = {s.lower() for s in allowed_schemes}
        self.allowed_hosts = {h.lower() for h in allowed_hosts}
        self.block_private_ips = block_private_ips

    def validate(self, url: str) -> str:
        try:
            parsed = urlparse(url)
        except Exception:
            raise ValueError("Invalid URL")

        # Validate scheme
        if parsed.scheme.lower() not in self.allowed_schemes:
            raise ValueError(f"Scheme not allowed: {parsed.scheme}")

        # Validate host
        host = parsed.hostname
        if not host:
            raise ValueError("No host in URL")

        host_lower = host.lower()

        if not self._is_host_allowed(host_lower):
            raise ValueError(f"Host not allowed: {host}")

        # Block private IPs
        if self.block_private_ips and self._is_private_ip(host):
            raise ValueError("Private IP addresses not allowed")

        # Block localhost variants
        if self._is_localhost(host_lower):
            raise ValueError("Localhost not allowed")

        return url

    def _is_host_allowed(self, host: str) -> bool:
        # Exact match
        if host in self.allowed_hosts:
            return True

        # Wildcard subdomain match (*.example.com)
        for allowed_host in self.allowed_hosts:
            if allowed_host.startswith('*.'):
                if host.endswith(allowed_host[1:]):
                    return True

        return False

    def _is_private_ip(self, host: str) -> bool:
        try:
            # Resolve hostname
            ip_addresses = socket.getaddrinfo(host, None)

            for ip_info in ip_addresses:
                ip_str = ip_info[4][0]
                ip = ipaddress.ip_address(ip_str)

                # Require globally reachable addresses
                if not ip.is_global:
                    return True

                # AWS metadata endpoint
                if self._is_aws_metadata(ip):
                    return True

                # Docker internal network
                if self._is_docker_internal(ip):
                    return True

            return False
        except Exception:
            return True

    def _is_aws_metadata(self, ip: ipaddress.IPv4Address) -> bool:
        return str(ip) == '169.254.169.254'

    def _is_docker_internal(self, ip: ipaddress.IPv4Address) -> bool:
        # Docker default bridge: 172.17.0.0/16
        return str(ip).startswith('172.17.')

    def _is_localhost(self, host: str) -> bool:
        return host in ('localhost', '127.0.0.1', '::1', '0.0.0.0')

# Usage

validator = UrlValidator(
    allowed_schemes={'https'},
    allowed_hosts={'api.example.com', '*.cdn.example.com'},
    block_private_ips=True
)

safe_url = validator.validate(user_input)

Why this works:

  • Centralized reusable class: Constructor-based configuration (allowed schemes, hosts, private IP blocking) enables consistent security across requests, urllib, httpx, async clients
  • Flexible domain matching: Wildcard subdomains (*.cdn.example.com) with case-normalization (host.lower()) prevents bypass via mixed-case
  • Comprehensive IP detection: Catches localhost variants (127.0.0.1, ::1, 0.0.0.0), AWS metadata (169.254.169.254), Docker networks (172.17.x), reserved, multicast, and shared-address ranges
  • DNS rebinding defense: Pre-resolves hostnames with socket.getaddrinfo() before validation; pair this with connection pinning or dial-time validation to avoid TOCTOU races
  • OOP benefits: Enables unit testing (mock DNS), per-environment config, framework integration (Django/Flask/FastAPI) without duplicating logic

DNS Rebinding Attack Prevention

DNS rebinding is a bypass technique where attackers:

  1. Create a domain that initially resolves to a legitimate IP
  2. Application validates the IP (passes allowlist)
  3. Attacker changes DNS to point to internal IP (127.0.0.1, 192.168.x.x)
  4. Application makes request using cached DNS or re-resolves
  5. Request goes to internal service

Protection against DNS rebinding:

# Validate hostname, validate resolved addresses, then pin or revalidate at connection time

from urllib.parse import urlparse
import socket
import ipaddress

def validate_before_and_after_dns(url: str, allowed_domains: set) -> str:
    """Validate URL before a request; pair with connection pinning for rebinding defense."""
    parsed = urlparse(url)
    hostname = parsed.hostname

    # Step 1: Validate hostname against allowlist
    if hostname not in allowed_domains:
        raise SecurityException("Domain not allowed")

    # Step 2: Resolve DNS and validate ALL resolved IPs
    try:
        ip_addresses = socket.getaddrinfo(hostname, None)
        for ip_info in ip_addresses:
            ip_str = ip_info[4][0]
            ip = ipaddress.ip_address(ip_str)

            # Require globally reachable addresses
            if not ip.is_global:
                raise SecurityException(f"Domain resolves to private IP: {ip_str}")

            # Check for AWS metadata endpoint
            if str(ip) == '169.254.169.254':
                raise SecurityException("Access to AWS metadata endpoint blocked")
    except socket.gaierror:
        raise SecurityException("Cannot resolve hostname")

    # Step 3: Pin the request to a validated IP, or use a client transport
    # hook that revalidates the connection address.
    return url

class SecurityException(Exception):
    pass

# Example usage:

ALLOWED_DOMAINS = {'api.example.com', 'cdn.example.com'}

try:
    safe_url = validate_before_and_after_dns(
        user_provided_url,
        ALLOWED_DOMAINS
    )
    # Make request with validated URL
    response = requests.get(safe_url, timeout=10, allow_redirects=False)
except SecurityException as e:
    logger.error(f"SSRF protection triggered: {e}")
    raise

Why this works:

  • DNS rebinding defense: Two-phase validation catches unsafe current DNS answers; it defeats rebinding only when paired with connection pinning or dial-time validation
  • All IPs validated: socket.getaddrinfo returns all A/AAAA records - attackers can't hide private IP in secondary records; real-time resolution (no long cache) forces re-validation
  • Comprehensive IP detection: is_global rejects private, loopback, link-local, reserved, multicast, unspecified, and shared-address ranges; explicit AWS metadata check (169.254.169.254) documents the high-value target
  • Fail-closed DNS errors: Exception handling on socket.gaierror blocks requests when DNS fails instead of falling back to an unvalidated fetch
  • Long-running service protection: Critical for services where DNS records change during uptime and multi-tenant environments with attacker-controlled subdomains

Framework-Specific Guidance

Django

# SECURE - Django view with URL validation

from django.http import JsonResponse, HttpResponseBadRequest
from django.views.decorators.http import require_http_methods
import requests

# Create validator instance

url_validator = UrlValidator(
    allowed_schemes={'https'},
    allowed_hosts={'api.example.com'},
    block_private_ips=True
)

@require_http_methods(["GET"])
def proxy_view(request):
    url = request.GET.get('url')

    if not url:
        return HttpResponseBadRequest("URL parameter required")

    try:
        # Validate URL
        validated_url = url_validator.validate(url)

        # Make request with timeout and no redirects
        response = requests.get(
            validated_url,
            timeout=10,
            allow_redirects=False
        )

        return JsonResponse({
            'status': response.status_code,
            'content': response.text
        })

    except ValueError as e:
        return HttpResponseBadRequest(f"Invalid URL: {str(e)}")
    except requests.exceptions.RequestException as e:
        return HttpResponseBadRequest(f"Request failed: {str(e)}")

# settings.py - Configure allowed hosts

SSRF_ALLOWED_HOSTS = [
    'api.example.com',
    '*.cdn.example.com'
]

Flask

# SECURE - Flask with URL validation

from flask import Flask, request, jsonify
import requests

app = Flask(__name__)

# Initialize validator

url_validator = UrlValidator(
    allowed_schemes={'https'},
    allowed_hosts={'api.example.com', 'public-api.example.org'},
    block_private_ips=True
)

@app.route('/proxy')
def proxy():
    url = request.args.get('url')

    if not url:
        return jsonify({'error': 'URL parameter required'}), 400

    try:
        # Validate URL
        validated_url = url_validator.validate(url)

        # Make request
        response = requests.get(
            validated_url,
            timeout=10,
            allow_redirects=False
        )

        return jsonify({
            'status': response.status_code,
            'content': response.text
        })

    except ValueError as e:
        return jsonify({'error': f'Invalid URL: {str(e)}'}), 400
    except requests.exceptions.RequestException as e:
        return jsonify({'error': f'Request failed: {str(e)}'}), 500

FastAPI

# SECURE - FastAPI with URL validation

from fastapi import FastAPI, HTTPException, Query
from pydantic import BaseModel
import requests

app = FastAPI()

# Initialize validator

url_validator = UrlValidator(
    allowed_schemes={'https'},
    allowed_hosts={'api.example.com'},
    block_private_ips=True
)

class ProxyResponse(BaseModel):
    status: int
    content: str

@app.get("/proxy", response_model=ProxyResponse)
async def proxy(url: str = Query(..., description="URL to fetch")):
    try:
        # Validate URL
        validated_url = url_validator.validate(url)

        # Make request
        response = requests.get(
            validated_url,
            timeout=10,
            allow_redirects=False
        )

        return ProxyResponse(
            status=response.status_code,
            content=response.text
        )

    except ValueError as e:
        raise HTTPException(status_code=400, detail=f"Invalid URL: {str(e)}")
    except requests.exceptions.RequestException as e:
        raise HTTPException(status_code=500, detail=f"Request failed: {str(e)}")

aiohttp (Async)

# SECURE - aiohttp with URL validation

import aiohttp
from aiohttp import web

async def proxy_handler(request):
    url = request.query.get('url')

    if not url:
        return web.Response(text='URL parameter required', status=400)

    try:
        # Validate URL
        url_validator = UrlValidator(
            allowed_schemes={'https'},
            allowed_hosts={'api.example.com'},
            block_private_ips=True
        )
        validated_url = url_validator.validate(url)

        # Make async request
        timeout = aiohttp.ClientTimeout(total=10)
        async with aiohttp.ClientSession(timeout=timeout) as session:
            async with session.get(validated_url, allow_redirects=False) as response:
                content = await response.text()

                return web.json_response({
                    'status': response.status,
                    'content': content
                })

    except ValueError as e:
        return web.Response(text=f'Invalid URL: {str(e)}', status=400)
    except aiohttp.ClientError as e:
        return web.Response(text=f'Request failed: {str(e)}', status=500)

app = web.Application()
app.router.add_get('/proxy', proxy_handler)

Protecting Cloud Metadata Endpoints

# SECURE - Block AWS/Azure/GCP metadata endpoints

import ipaddress
from urllib.parse import urlparse

class MetadataProtection:
    BLOCKED_HOSTS = {
        '169.254.169.254',           # AWS/Azure metadata
        'metadata.google.internal',  # GCP metadata
        'metadata',
        'metadata.azure.com'
    }

    BLOCKED_PATHS = {
        '/latest/meta-data',
        '/latest/user-data',
        '/latest/dynamic',
        '/computeMetadata/v1',
        '/metadata/instance'
    }

    def validate_not_metadata(self, url: str):
        parsed = urlparse(url)

        host = parsed.hostname.lower() if parsed.hostname else ''
        path = parsed.path

        # Block metadata service hostnames
        if host in self.BLOCKED_HOSTS:
            raise ValueError("Access to metadata service blocked")

        # Block metadata paths
        for blocked_path in self.BLOCKED_PATHS:
            if path.startswith(blocked_path):
                raise ValueError("Access to metadata endpoint blocked")

        # Block link-local addresses (169.254.x.x)
        try:
            import socket
            ip_addresses = socket.getaddrinfo(host, None)

            for ip_info in ip_addresses:
                ip_str = ip_info[4][0]
                ip = ipaddress.ip_address(ip_str)

                if ip.is_link_local:
                    raise ValueError("Link-local addresses blocked")
        except Exception:
            raise ValueError("DNS resolution failed")

Additional Resources