CWE-114: Process Control - Python
Overview
Process control vulnerabilities in Python applications occur when untrusted user input is used to control process execution, lifecycle, or behavior. Attackers can exploit these vulnerabilities to terminate critical processes, spawn malicious processes, exhaust system resources, or escalate privileges by manipulating process management operations.
Key Security Issues:
- Unauthorized Process Termination: Killing critical system or application processes
- Resource Exhaustion: Spawning unlimited processes to cause denial of service
- Privilege Escalation: Manipulating process priorities or spawning privileged processes
- Command Injection: Injecting malicious commands through process control parameters
- Information Disclosure: Accessing process information to map system architecture
Primary Defence: Use subprocess.run() with explicit argument lists (not shell=True), implement authorization checks and allowlists for process operations, enforce resource limits using resource.setrlimit() or container limits, and validate all process control parameters before execution to prevent command injection and resource exhaustion.
Common Python Scenarios:
- Web applications killing background worker processes based on user input
- Admin panels allowing process restart with user-supplied PIDs
- Job schedulers terminating jobs using unvalidated job IDs
- Container orchestration accepting process control commands
- Service managers with insufficient authorization checks
- Monitoring tools displaying process details without access control
Why This Matters in Python:
os.kill(),subprocess, andsignalmodules provide powerful but dangerous process control- Python's ease of use can lead to overlooking security implications
- Microservices and containerized apps often need process management
- Django/Flask admin panels frequently implement process control features
- Celery workers and task queues require process lifecycle management
Common Vulnerable Patterns
Unvalidated Process Termination
import os
import signal
from flask import Flask, request
app = Flask(__name__)
@app.route('/admin/kill-process', methods=['POST'])
def kill_process():
# DANGEROUS: User controls which process to kill
pid = int(request.form.get('pid'))
try:
os.kill(pid, signal.SIGKILL) # No validation or authorization
return f"Process {pid} terminated"
except ProcessLookupError:
return "Process not found", 404
Why this is vulnerable:
- No authorization check - any user can kill any process
- No validation of PID - can target system processes
- No ownership verification
- No audit logging of who killed what
Command Injection via Process Control
import subprocess
def set_process_priority(pid, priority):
# DANGEROUS: Command injection vulnerability
command = f"renice {priority} -p {pid}"
subprocess.run(command, shell=True) # shell=True is dangerous
Why this is vulnerable:
- Using
shell=Trueenables command injection - String interpolation allows injection of shell metacharacters
- No validation of priority or PID parameters
- User input directly in shell command
Unrestricted Process Spawning
import subprocess
from flask import request
@app.route('/run-job', methods=['POST'])
def run_job():
# DANGEROUS: Unrestricted process spawning
job_type = request.form.get('job_type')
job_args = request.form.get('args', '').split()
# No validation or resource limits
subprocess.Popen([f'/opt/jobs/{job_type}.py'] + job_args)
return "Job started"
Why this is vulnerable:
- No rate limiting - can spawn unlimited processes
- Path traversal possible in job_type
- No resource limits on spawned processes
- Arguments not validated
Signal Handling Without Authorization
import os
import signal
def pause_process(pid):
# DANGEROUS: No authorization or validation
os.kill(pid, signal.SIGSTOP) # Can pause any process
def resume_process(pid):
os.kill(pid, signal.SIGCONT) # Can resume any process
Why this is vulnerable:
- No check if user owns the process
- Can affect system-critical processes
- No logging or audit trail
- Missing authorization
Process Information Disclosure
import psutil
@app.route('/process-info')
def process_info():
pid = int(request.args.get('pid'))
# DANGEROUS: Exposes all process information
proc = psutil.Process(pid)
return {
'name': proc.name(),
'cmdline': proc.cmdline(), # May contain secrets
'environ': proc.environ(), # Environment variables
'cwd': proc.cwd(),
'connections': [str(c) for c in proc.connections()]
}
Why this is vulnerable:
- No authorization - anyone can view any process
- Command line may contain passwords or API keys
- Environment variables often contain secrets
- Network connections reveal internal architecture
Race Condition in Process Management
import logging
import os
import psutil
logger = logging.getLogger(__name__)
def manage_worker(action, worker_id):
# DANGEROUS: Time-of-check-time-of-use vulnerability
pid = get_worker_pid(worker_id)
if pid and psutil.pid_exists(pid):
# Race condition: PID could change or be reused here
if action == 'kill':
os.kill(pid, signal.SIGKILL)
Why this is vulnerable:
- PID can be reused between check and use
- No atomic operation
- Could kill wrong process
- No verification process is still the expected one
Insufficient Process Isolation
import subprocess
def start_user_job(username, script_path):
# DANGEROUS: Insufficient isolation
subprocess.Popen([
'python3',
script_path
], env={'USER': username}) # Only sets USER, inherits everything else
Why this is vulnerable:
- Inherits parent environment (PATH, secrets, etc.)
- No resource limits (CPU, memory, file descriptors)
- Runs with same user as parent process
- No sandboxing or containerization
Celery Task Control Without Authorization
from celery import current_app
@app.route('/admin/revoke-task', methods=['POST'])
def revoke_task():
# DANGEROUS: No authorization
task_id = request.form.get('task_id')
current_app.control.revoke(task_id, terminate=True)
return f"Task {task_id} revoked"
Why this is vulnerable:
- Any user can revoke any task
- No validation of task_id format
- No verification user owns the task
- Can disrupt critical background jobs
Secure Patterns
Process Termination with AuthorizationCORRECT
import os
import signal
import logging
from typing import Set, Dict
from dataclasses import dataclass
from datetime import datetime
@dataclass
class ManagedProcess:
pid: int
name: str
owner: str
started_at: datetime
logger = logging.getLogger(__name__)
class SecureProcessManager:
def __init__(self):
self.managed_processes: Dict[str, ManagedProcess] = {}
self.authorized_users: Set[str] = set()
def register_process(self, process_id: str, process: ManagedProcess):
"""Register a process for management"""
self.managed_processes[process_id] = process
logger.info(f"Process registered: {process_id} (PID: {process.pid}) by {process.owner}")
def kill_process(self, process_id: str, current_user: str) -> bool:
"""Safely terminate a process with authorization"""
# Authorization check
if current_user not in self.authorized_users:
logger.warning(f"Unauthorized kill attempt by {current_user}")
raise PermissionError(f"User {current_user} not authorized for process control")
# Validate process ID
process = self.managed_processes.get(process_id)
if not process:
logger.warning(f"Attempted to kill unmanaged process: {process_id}")
raise ValueError(f"Process {process_id} not under management")
# Ownership verification
if process.owner != current_user and current_user != 'admin':
logger.warning(
f"User {current_user} attempted to kill process owned by {process.owner}"
)
raise PermissionError("Cannot kill process owned by another user")
try:
# Verify process still exists and is the same one
os.kill(process.pid, 0) # Signal 0 checks existence
# Send SIGTERM first (graceful shutdown)
os.kill(process.pid, signal.SIGTERM)
logger.info(f"SIGTERM sent to process {process_id} (PID: {process.pid}) by {current_user}")
# Could add timeout and SIGKILL fallback here
del self.managed_processes[process_id]
return True
except ProcessLookupError:
logger.warning(f"Process {process_id} (PID: {process.pid}) no longer exists")
del self.managed_processes[process_id]
return False
except PermissionError as e:
logger.error(f"Permission denied killing process {process_id}: {e}")
raise
Why this works:
- Strict authorization checks before any operation
- Process allowlist prevents arbitrary process control
- Ownership verification ensures users only control their own processes
- Comprehensive audit logging
- Graceful shutdown with SIGTERM before SIGKILL
Safe Process Priority ManagementCORRECT
import os
import psutil
class ProcessPriorityManager:
# Only allow reducing priority (higher nice values)
MIN_NICE = 0 # Normal priority
MAX_NICE = 19 # Lowest priority
def set_process_priority(
self,
process_id: str,
nice_value: int,
current_user: str,
process_manager: SecureProcessManager
):
"""Safely adjust process priority"""
# Validate nice value range
if not (self.MIN_NICE <= nice_value <= self.MAX_NICE):
raise ValueError(
f"Nice value must be between {self.MIN_NICE} and {self.MAX_NICE}"
)
# Get managed process
process = process_manager.managed_processes.get(process_id)
if not process:
raise ValueError(f"Process {process_id} not under management")
# Verify ownership
if process.owner != current_user and current_user != 'admin':
raise PermissionError("Cannot modify process owned by another user")
try:
# Use psutil for cross-platform compatibility
proc = psutil.Process(process.pid)
# Verify it's still the same process
if proc.name() != process.name:
raise ValueError("Process name mismatch - PID may have been reused")
# Set nice value (only allows reducing priority)
proc.nice(nice_value)
logger.info(
f"Process {process_id} priority set to {nice_value} by {current_user}"
)
except psutil.NoSuchProcess:
raise ProcessLookupError(f"Process {process_id} no longer exists")
Why this works:
- Only allows reducing priority (cannot increase above normal)
- Validates nice value range
- Uses psutil for cross-platform compatibility
- Verifies process identity to prevent PID reuse attacks
- Authorization and ownership checks
Restricted Process Spawning with Resource LimitsCORRECT
import logging
import os
import resource
import subprocess
from pathlib import Path
from typing import List, Dict
logger = logging.getLogger(__name__)
class SecureProcessSpawner:
# Allowlist of permitted executables
ALLOWED_EXECUTABLES = {
'worker': '/opt/app/worker.py',
'processor': '/opt/app/processor.py',
'analyzer': '/opt/app/analyzer.py'
}
# Maximum concurrent processes per user
MAX_PROCESSES_PER_USER = 5
def __init__(self):
self.user_process_count: Dict[str, int] = {}
def spawn_process(
self,
job_type: str,
args: List[str],
current_user: str
) -> subprocess.Popen:
"""Spawn process with security controls"""
# Validate job type against allowlist
if job_type not in self.ALLOWED_EXECUTABLES:
logger.warning(f"Unauthorized job type requested: {job_type}")
raise ValueError(f"Job type '{job_type}' not permitted")
executable = self.ALLOWED_EXECUTABLES[job_type]
# Rate limiting per user
user_count = self.user_process_count.get(current_user, 0)
if user_count >= self.MAX_PROCESSES_PER_USER:
raise RuntimeError(
f"User {current_user} has reached process limit of {self.MAX_PROCESSES_PER_USER}"
)
# Validate arguments (no path traversal, no shell metacharacters)
validated_args = self._validate_arguments(args)
# Prepare secure environment
secure_env = {
'PATH': '/usr/bin:/bin',
'USER': current_user,
'HOME': f'/home/{current_user}',
'PYTHONDONTWRITEBYTECODE': '1',
'PYTHONUNBUFFERED': '1'
}
# Create process with resource limits
def set_limits():
# Limit CPU time to 1 hour
resource.setrlimit(resource.RLIMIT_CPU, (3600, 3600))
# Limit memory to 1GB
resource.setrlimit(resource.RLIMIT_AS, (1024*1024*1024, 1024*1024*1024))
# Limit number of file descriptors
resource.setrlimit(resource.RLIMIT_NOFILE, (1024, 1024))
# Prevent core dumps
resource.setrlimit(resource.RLIMIT_CORE, (0, 0))
try:
process = subprocess.Popen(
['python3', executable] + validated_args,
env=secure_env,
preexec_fn=set_limits, # Unix only
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdin=subprocess.DEVNULL, # No input
cwd='/tmp', # Safe working directory
# Additional security on Linux
start_new_session=True # Prevent signal propagation
)
self.user_process_count[current_user] = user_count + 1
logger.info(
f"Process spawned: {job_type} by {current_user} "
f"(PID: {process.pid})"
)
return process
except Exception as e:
logger.error(f"Failed to spawn process: {e}")
raise
def _validate_arguments(self, args: List[str]) -> List[str]:
"""Validate process arguments"""
validated = []
for arg in args:
# Reject path traversal attempts
if '..' in arg or arg.startswith('/'):
raise ValueError(f"Invalid argument: {arg}")
# Reject shell metacharacters
dangerous_chars = set(';&|`$(){}[]<>*?~')
if any(c in arg for c in dangerous_chars):
raise ValueError(f"Argument contains dangerous characters: {arg}")
# Limit argument length
if len(arg) > 255:
raise ValueError(f"Argument too long: {len(arg)} chars")
validated.append(arg)
return validated
def cleanup_finished(self, current_user: str):
"""Clean up process count when process finishes"""
if current_user in self.user_process_count:
self.user_process_count[current_user] -= 1
Why this works:
- Executable allowlist prevents arbitrary code execution
- Rate limiting per user prevents resource exhaustion
- Resource limits (CPU, memory, file descriptors) prevent DoS
- Arguments validated to prevent injection
- Clean environment prevents secret leakage
- Process isolation with new session
Safe Signal HandlingCORRECT
import logging
import os
import signal
import psutil
logger = logging.getLogger(__name__)
class SecureSignalManager:
# Only allow safe signals
ALLOWED_SIGNALS = {
'TERM': signal.SIGTERM, # Graceful termination
'HUP': signal.SIGHUP, # Reload configuration
'USR1': signal.SIGUSR1, # User-defined
'USR2': signal.SIGUSR2 # User-defined
}
def send_signal(
self,
process_id: str,
signal_name: str,
current_user: str,
process_manager: SecureProcessManager
):
"""Send signal to process with authorization"""
# Validate signal
if signal_name not in self.ALLOWED_SIGNALS:
raise ValueError(f"Signal '{signal_name}' not permitted")
sig = self.ALLOWED_SIGNALS[signal_name]
# Get managed process
process = process_manager.managed_processes.get(process_id)
if not process:
raise ValueError(f"Process {process_id} not under management")
# Authorization check
if process.owner != current_user and current_user != 'admin':
logger.warning(
f"User {current_user} attempted to signal process owned by {process.owner}"
)
raise PermissionError("Cannot signal process owned by another user")
try:
# Verify process identity
proc = psutil.Process(process.pid)
if proc.name() != process.name:
raise ValueError("Process identity mismatch")
# Send signal
os.kill(process.pid, sig)
logger.info(
f"Signal {signal_name} sent to process {process_id} "
f"(PID: {process.pid}) by {current_user}"
)
except psutil.NoSuchProcess:
raise ProcessLookupError(f"Process {process_id} no longer exists")
Why this works:
- Signal allowlist prevents dangerous signals (SIGKILL, SIGSTOP)
- Authorization and ownership verification
- Process identity verification prevents PID reuse
- Comprehensive logging
Secure Process Information DisclosureCORRECT
import psutil
from typing import Dict, Any
class SecureProcessInfo:
def get_process_info(
self,
process_id: str,
current_user: str,
process_manager: SecureProcessManager
) -> Dict[str, Any]:
"""Get process information with authorization"""
# Get managed process
process = process_manager.managed_processes.get(process_id)
if not process:
raise ValueError(f"Process {process_id} not under management")
# Authorization - only owner or admin
if process.owner != current_user and current_user != 'admin':
raise PermissionError("Cannot view process owned by another user")
try:
proc = psutil.Process(process.pid)
# Return only safe, sanitized information
return {
'id': process_id,
'pid': process.pid,
'name': process.name,
'status': proc.status(),
'cpu_percent': proc.cpu_percent(interval=0.1),
'memory_mb': proc.memory_info().rss / 1024 / 1024,
'started_at': process.started_at.isoformat(),
'owner': process.owner
# Do NOT include: cmdline, environ, connections, open files
}
except psutil.NoSuchProcess:
raise ProcessLookupError(f"Process {process_id} no longer exists")
Why this works:
- Authorization check prevents information disclosure
- Returns only safe, non-sensitive information
- Excludes command line arguments (may contain secrets)
- Excludes environment variables
- Excludes network connections and file handles
Celery Task Management with AuthorizationCORRECT
import logging
from celery import current_app
from flask import request, g
import re
logger = logging.getLogger(__name__)
class SecureCeleryManager:
# Valid task ID format (UUID)
TASK_ID_PATTERN = re.compile(
r'^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$'
)
def __init__(self):
self.task_owners = {} # task_id -> username mapping
def register_task(self, task_id: str, owner: str):
"""Register task ownership"""
self.task_owners[task_id] = owner
def revoke_task(self, task_id: str, current_user: str, terminate: bool = False):
"""Revoke Celery task with authorization"""
# Validate task ID format
if not self.TASK_ID_PATTERN.match(task_id):
raise ValueError(f"Invalid task ID format: {task_id}")
# Check ownership
owner = self.task_owners.get(task_id)
if not owner:
raise ValueError(f"Task {task_id} not found or not owned by you")
if owner != current_user and current_user != 'admin':
logger.warning(
f"User {current_user} attempted to revoke task owned by {owner}"
)
raise PermissionError("Cannot revoke task owned by another user")
# Revoke with appropriate signal
current_app.control.revoke(
task_id,
terminate=terminate, # Only if explicitly requested
signal='SIGTERM' if terminate else None # Graceful termination
)
logger.info(
f"Task {task_id} revoked by {current_user} "
f"(terminate={terminate})"
)
del self.task_owners[task_id]
Why this works:
- Task ID format validation prevents injection
- Ownership tracking and verification
- Authorization check before revocation
- Graceful termination by default
- Audit logging
Docker Container Process ControlCORRECT
import docker
import logging
import os
from typing import Dict
logger = logging.getLogger(__name__)
class SecureContainerManager:
def __init__(self):
self.client = docker.from_env()
self.managed_containers: Dict[str, str] = {} # container_id -> owner
def start_container(
self,
image: str,
command: str,
current_user: str
) -> str:
"""Start container with security controls"""
# Image allowlist
ALLOWED_IMAGES = {
'worker': 'myapp/worker:latest',
'analyzer': 'myapp/analyzer:latest'
}
if image not in ALLOWED_IMAGES:
raise ValueError(f"Image '{image}' not permitted")
# Start container with security options
container = self.client.containers.run(
ALLOWED_IMAGES[image],
command,
detach=True,
# Security options
read_only=True, # Read-only filesystem
mem_limit='512m', # Memory limit
cpu_quota=50000, # CPU limit (50% of one core)
pids_limit=100, # Limit number of processes
network_mode='bridge', # Network isolation
cap_drop=['ALL'], # Drop all capabilities
security_opt=['no-new-privileges'], # Prevent privilege escalation
# User namespace
userns_mode='host',
user=f'{os.getuid()}:{os.getgid()}' # Run as non-root
)
self.managed_containers[container.id] = current_user
logger.info(
f"Container {container.id} started by {current_user} "
f"(image: {image})"
)
return container.id
def stop_container(self, container_id: str, current_user: str):
"""Stop container with authorization"""
# Check ownership
owner = self.managed_containers.get(container_id)
if not owner:
raise ValueError(f"Container {container_id} not found")
if owner != current_user and current_user != 'admin':
raise PermissionError("Cannot stop container owned by another user")
container = self.client.containers.get(container_id)
container.stop(timeout=10) # Graceful stop with timeout
logger.info(f"Container {container_id} stopped by {current_user}")
del self.managed_containers[container_id]
Why this works:
- Image allowlist prevents arbitrary container execution
- Resource limits prevent DoS
- Read-only filesystem
- Dropped capabilities and security options
- Runs as non-root user
- Network isolation
Key Security Functions
Process Validator
import re
from typing import Optional
class ProcessValidator:
"""Validate process control parameters"""
# Valid process ID pattern (internal identifier)
PROCESS_ID_PATTERN = re.compile(r'^[a-z0-9_-]{1,64}$')
@staticmethod
def validate_process_id(process_id: str) -> bool:
"""Validate process identifier format"""
if not isinstance(process_id, str):
raise TypeError("Process ID must be a string")
if not ProcessValidator.PROCESS_ID_PATTERN.match(process_id):
raise ValueError(
f"Invalid process ID format: {process_id}. "
"Must be alphanumeric, dash, or underscore (1-64 chars)"
)
return True
@staticmethod
def validate_pid(pid: int) -> bool:
"""Validate system PID"""
if not isinstance(pid, int):
raise TypeError("PID must be an integer")
if pid <= 0:
raise ValueError(f"Invalid PID: {pid}")
# Prevent targeting low PIDs (system processes)
if pid < 100:
raise ValueError(f"Cannot target system process (PID < 100): {pid}")
return True
@staticmethod
def validate_signal_name(signal_name: str) -> bool:
"""Validate signal name"""
ALLOWED_SIGNALS = {'TERM', 'HUP', 'USR1', 'USR2'}
if signal_name not in ALLOWED_SIGNALS:
raise ValueError(
f"Signal '{signal_name}' not allowed. "
f"Permitted: {', '.join(ALLOWED_SIGNALS)}"
)
return True
Process Ownership Checker
import logging
import os
import psutil
logger = logging.getLogger(__name__)
def check_process_ownership(pid: int, expected_user: str) -> bool:
"""Verify process is owned by expected user"""
try:
proc = psutil.Process(pid)
process_user = proc.username()
# On Unix, compare usernames
if process_user != expected_user:
logger.warning(
f"Process {pid} ownership mismatch: "
f"expected {expected_user}, got {process_user}"
)
return False
return True
except psutil.NoSuchProcess:
return False
except psutil.AccessDenied:
logger.warning(f"Access denied checking ownership of PID {pid}")
return False
Verification
After implementing the recommended secure patterns, verify the fix through multiple approaches:
- Manual testing: Submit malicious payloads relevant to this vulnerability and confirm they're handled safely without executing unintended operations
- Code review: Confirm all instances use the secure pattern (parameterized queries, safe APIs, proper encoding) with no string concatenation or unsafe operations
- Static analysis: Use security scanners to verify no new vulnerabilities exist and the original finding is resolved
- Regression testing: Ensure legitimate user inputs and application workflows continue to function correctly
- Edge case validation: Test with special characters, boundary conditions, and unusual inputs to verify proper handling
- Framework verification: If using a framework or library, confirm the recommended APIs are used correctly according to documentation
- Authentication/session testing: Verify security controls remain effective and cannot be bypassed (if applicable to the vulnerability type)
- Rescan: Run the security scanner again to confirm the finding is resolved and no new issues were introduced
Security Checklist
- All process control operations require authentication and authorization
- Process allowlist implemented - only managed processes can be controlled
- PID validation prevents targeting system processes (PID < 100)
- Ownership verification ensures users only control their own processes
- Signal allowlist restricts to safe signals (no SIGKILL, SIGSTOP for users)
- Rate limiting prevents process spawning DoS
- Resource limits applied to all spawned processes (CPU, memory, file descriptors)
- No
shell=Truein subprocess calls with user input - Arguments validated before passing to subprocess
- Environment sanitized for spawned processes (no secret leakage)
- Process information disclosure restricted to authorized users
- Comprehensive audit logging of all process control operations
- Error messages don't leak sensitive process information
- Celery/background task management includes authorization
- Container process control uses security options and resource limits