Skip to content

CWE-114: Process Control - Python

Overview

Process control vulnerabilities in Python applications occur when untrusted user input is used to control process execution, lifecycle, or behavior. Attackers can exploit these vulnerabilities to terminate critical processes, spawn malicious processes, exhaust system resources, or escalate privileges by manipulating process management operations.

Key Security Issues:

  • Unauthorized Process Termination: Killing critical system or application processes
  • Resource Exhaustion: Spawning unlimited processes to cause denial of service
  • Privilege Escalation: Manipulating process priorities or spawning privileged processes
  • Command Injection: Injecting malicious commands through process control parameters
  • Information Disclosure: Accessing process information to map system architecture

Primary Defence: Use subprocess.run() with explicit argument lists (not shell=True), implement authorization checks and allowlists for process operations, enforce resource limits using resource.setrlimit() or container limits, and validate all process control parameters before execution to prevent command injection and resource exhaustion.

Common Python Scenarios:

  • Web applications killing background worker processes based on user input
  • Admin panels allowing process restart with user-supplied PIDs
  • Job schedulers terminating jobs using unvalidated job IDs
  • Container orchestration accepting process control commands
  • Service managers with insufficient authorization checks
  • Monitoring tools displaying process details without access control

Why This Matters in Python:

  • os.kill(), subprocess, and signal modules provide powerful but dangerous process control
  • Python's ease of use can lead to overlooking security implications
  • Microservices and containerized apps often need process management
  • Django/Flask admin panels frequently implement process control features
  • Celery workers and task queues require process lifecycle management

Common Vulnerable Patterns

Unvalidated Process Termination

import os
import signal
from flask import Flask, request

app = Flask(__name__)

@app.route('/admin/kill-process', methods=['POST'])
def kill_process():
    # DANGEROUS: User controls which process to kill
    pid = int(request.form.get('pid'))

    try:
        os.kill(pid, signal.SIGKILL)  # No validation or authorization
        return f"Process {pid} terminated"
    except ProcessLookupError:
        return "Process not found", 404

Why this is vulnerable:

  • No authorization check - any user can kill any process
  • No validation of PID - can target system processes
  • No ownership verification
  • No audit logging of who killed what

Command Injection via Process Control

import subprocess

def set_process_priority(pid, priority):
    # DANGEROUS: Command injection vulnerability
    command = f"renice {priority} -p {pid}"
    subprocess.run(command, shell=True)  # shell=True is dangerous

Why this is vulnerable:

  • Using shell=True enables command injection
  • String interpolation allows injection of shell metacharacters
  • No validation of priority or PID parameters
  • User input directly in shell command

Unrestricted Process Spawning

import subprocess
from flask import request

@app.route('/run-job', methods=['POST'])
def run_job():
    # DANGEROUS: Unrestricted process spawning
    job_type = request.form.get('job_type')
    job_args = request.form.get('args', '').split()

    # No validation or resource limits
    subprocess.Popen([f'/opt/jobs/{job_type}.py'] + job_args)
    return "Job started"

Why this is vulnerable:

  • No rate limiting - can spawn unlimited processes
  • Path traversal possible in job_type
  • No resource limits on spawned processes
  • Arguments not validated

Signal Handling Without Authorization

import os
import signal

def pause_process(pid):
    # DANGEROUS: No authorization or validation
    os.kill(pid, signal.SIGSTOP)  # Can pause any process

def resume_process(pid):
    os.kill(pid, signal.SIGCONT)  # Can resume any process

Why this is vulnerable:

  • No check if user owns the process
  • Can affect system-critical processes
  • No logging or audit trail
  • Missing authorization

Process Information Disclosure

import psutil

@app.route('/process-info')
def process_info():
    pid = int(request.args.get('pid'))

    # DANGEROUS: Exposes all process information
    proc = psutil.Process(pid)
    return {
        'name': proc.name(),
        'cmdline': proc.cmdline(),  # May contain secrets
        'environ': proc.environ(),  # Environment variables
        'cwd': proc.cwd(),
        'connections': [str(c) for c in proc.connections()]
    }

Why this is vulnerable:

  • No authorization - anyone can view any process
  • Command line may contain passwords or API keys
  • Environment variables often contain secrets
  • Network connections reveal internal architecture

Race Condition in Process Management

import logging
import os
import psutil

logger = logging.getLogger(__name__)

def manage_worker(action, worker_id):
    # DANGEROUS: Time-of-check-time-of-use vulnerability
    pid = get_worker_pid(worker_id)

    if pid and psutil.pid_exists(pid):
        # Race condition: PID could change or be reused here
        if action == 'kill':
            os.kill(pid, signal.SIGKILL)

Why this is vulnerable:

  • PID can be reused between check and use
  • No atomic operation
  • Could kill wrong process
  • No verification process is still the expected one

Insufficient Process Isolation

import subprocess

def start_user_job(username, script_path):
    # DANGEROUS: Insufficient isolation
    subprocess.Popen([
        'python3',
        script_path
    ], env={'USER': username})  # Only sets USER, inherits everything else

Why this is vulnerable:

  • Inherits parent environment (PATH, secrets, etc.)
  • No resource limits (CPU, memory, file descriptors)
  • Runs with same user as parent process
  • No sandboxing or containerization

Celery Task Control Without Authorization

from celery import current_app

@app.route('/admin/revoke-task', methods=['POST'])
def revoke_task():
    # DANGEROUS: No authorization
    task_id = request.form.get('task_id')

    current_app.control.revoke(task_id, terminate=True)
    return f"Task {task_id} revoked"

Why this is vulnerable:

  • Any user can revoke any task
  • No validation of task_id format
  • No verification user owns the task
  • Can disrupt critical background jobs

Secure Patterns

Process Termination with AuthorizationCORRECT

import os
import signal
import logging
from typing import Set, Dict
from dataclasses import dataclass
from datetime import datetime

@dataclass
class ManagedProcess:
    pid: int
    name: str
    owner: str
    started_at: datetime

logger = logging.getLogger(__name__)

class SecureProcessManager:
    def __init__(self):
        self.managed_processes: Dict[str, ManagedProcess] = {}
        self.authorized_users: Set[str] = set()

    def register_process(self, process_id: str, process: ManagedProcess):
        """Register a process for management"""
        self.managed_processes[process_id] = process
        logger.info(f"Process registered: {process_id} (PID: {process.pid}) by {process.owner}")

    def kill_process(self, process_id: str, current_user: str) -> bool:
        """Safely terminate a process with authorization"""
        # Authorization check
        if current_user not in self.authorized_users:
            logger.warning(f"Unauthorized kill attempt by {current_user}")
            raise PermissionError(f"User {current_user} not authorized for process control")

        # Validate process ID
        process = self.managed_processes.get(process_id)
        if not process:
            logger.warning(f"Attempted to kill unmanaged process: {process_id}")
            raise ValueError(f"Process {process_id} not under management")

        # Ownership verification
        if process.owner != current_user and current_user != 'admin':
            logger.warning(
                f"User {current_user} attempted to kill process owned by {process.owner}"
            )
            raise PermissionError("Cannot kill process owned by another user")

        try:
            # Verify process still exists and is the same one
            os.kill(process.pid, 0)  # Signal 0 checks existence

            # Send SIGTERM first (graceful shutdown)
            os.kill(process.pid, signal.SIGTERM)
            logger.info(f"SIGTERM sent to process {process_id} (PID: {process.pid}) by {current_user}")

            # Could add timeout and SIGKILL fallback here
            del self.managed_processes[process_id]
            return True

        except ProcessLookupError:
            logger.warning(f"Process {process_id} (PID: {process.pid}) no longer exists")
            del self.managed_processes[process_id]
            return False
        except PermissionError as e:
            logger.error(f"Permission denied killing process {process_id}: {e}")
            raise

Why this works:

  • Strict authorization checks before any operation
  • Process allowlist prevents arbitrary process control
  • Ownership verification ensures users only control their own processes
  • Comprehensive audit logging
  • Graceful shutdown with SIGTERM before SIGKILL

Safe Process Priority ManagementCORRECT

import os
import psutil

class ProcessPriorityManager:
    # Only allow reducing priority (higher nice values)
    MIN_NICE = 0  # Normal priority
    MAX_NICE = 19  # Lowest priority

    def set_process_priority(
        self,
        process_id: str,
        nice_value: int,
        current_user: str,
        process_manager: SecureProcessManager
    ):
        """Safely adjust process priority"""
        # Validate nice value range
        if not (self.MIN_NICE <= nice_value <= self.MAX_NICE):
            raise ValueError(
                f"Nice value must be between {self.MIN_NICE} and {self.MAX_NICE}"
            )

        # Get managed process
        process = process_manager.managed_processes.get(process_id)
        if not process:
            raise ValueError(f"Process {process_id} not under management")

        # Verify ownership
        if process.owner != current_user and current_user != 'admin':
            raise PermissionError("Cannot modify process owned by another user")

        try:
            # Use psutil for cross-platform compatibility
            proc = psutil.Process(process.pid)

            # Verify it's still the same process
            if proc.name() != process.name:
                raise ValueError("Process name mismatch - PID may have been reused")

            # Set nice value (only allows reducing priority)
            proc.nice(nice_value)

            logger.info(
                f"Process {process_id} priority set to {nice_value} by {current_user}"
            )

        except psutil.NoSuchProcess:
            raise ProcessLookupError(f"Process {process_id} no longer exists")

Why this works:

  • Only allows reducing priority (cannot increase above normal)
  • Validates nice value range
  • Uses psutil for cross-platform compatibility
  • Verifies process identity to prevent PID reuse attacks
  • Authorization and ownership checks

Restricted Process Spawning with Resource LimitsCORRECT

import logging
import os
import resource
import subprocess
from pathlib import Path
from typing import List, Dict

logger = logging.getLogger(__name__)

class SecureProcessSpawner:
    # Allowlist of permitted executables
    ALLOWED_EXECUTABLES = {
        'worker': '/opt/app/worker.py',
        'processor': '/opt/app/processor.py',
        'analyzer': '/opt/app/analyzer.py'
    }

    # Maximum concurrent processes per user
    MAX_PROCESSES_PER_USER = 5

    def __init__(self):
        self.user_process_count: Dict[str, int] = {}

    def spawn_process(
        self,
        job_type: str,
        args: List[str],
        current_user: str
    ) -> subprocess.Popen:
        """Spawn process with security controls"""
        # Validate job type against allowlist
        if job_type not in self.ALLOWED_EXECUTABLES:
            logger.warning(f"Unauthorized job type requested: {job_type}")
            raise ValueError(f"Job type '{job_type}' not permitted")

        executable = self.ALLOWED_EXECUTABLES[job_type]

        # Rate limiting per user
        user_count = self.user_process_count.get(current_user, 0)
        if user_count >= self.MAX_PROCESSES_PER_USER:
            raise RuntimeError(
                f"User {current_user} has reached process limit of {self.MAX_PROCESSES_PER_USER}"
            )

        # Validate arguments (no path traversal, no shell metacharacters)
        validated_args = self._validate_arguments(args)

        # Prepare secure environment
        secure_env = {
            'PATH': '/usr/bin:/bin',
            'USER': current_user,
            'HOME': f'/home/{current_user}',
            'PYTHONDONTWRITEBYTECODE': '1',
            'PYTHONUNBUFFERED': '1'
        }

        # Create process with resource limits
        def set_limits():
            # Limit CPU time to 1 hour
            resource.setrlimit(resource.RLIMIT_CPU, (3600, 3600))
            # Limit memory to 1GB
            resource.setrlimit(resource.RLIMIT_AS, (1024*1024*1024, 1024*1024*1024))
            # Limit number of file descriptors
            resource.setrlimit(resource.RLIMIT_NOFILE, (1024, 1024))
            # Prevent core dumps
            resource.setrlimit(resource.RLIMIT_CORE, (0, 0))

        try:
            process = subprocess.Popen(
                ['python3', executable] + validated_args,
                env=secure_env,
                preexec_fn=set_limits,  # Unix only
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                stdin=subprocess.DEVNULL,  # No input
                cwd='/tmp',  # Safe working directory
                # Additional security on Linux
                start_new_session=True  # Prevent signal propagation
            )

            self.user_process_count[current_user] = user_count + 1

            logger.info(
                f"Process spawned: {job_type} by {current_user} "
                f"(PID: {process.pid})"
            )

            return process

        except Exception as e:
            logger.error(f"Failed to spawn process: {e}")
            raise

    def _validate_arguments(self, args: List[str]) -> List[str]:
        """Validate process arguments"""
        validated = []

        for arg in args:
            # Reject path traversal attempts
            if '..' in arg or arg.startswith('/'):
                raise ValueError(f"Invalid argument: {arg}")

            # Reject shell metacharacters
            dangerous_chars = set(';&|`$(){}[]<>*?~')
            if any(c in arg for c in dangerous_chars):
                raise ValueError(f"Argument contains dangerous characters: {arg}")

            # Limit argument length
            if len(arg) > 255:
                raise ValueError(f"Argument too long: {len(arg)} chars")

            validated.append(arg)

        return validated

    def cleanup_finished(self, current_user: str):
        """Clean up process count when process finishes"""
        if current_user in self.user_process_count:
            self.user_process_count[current_user] -= 1

Why this works:

  • Executable allowlist prevents arbitrary code execution
  • Rate limiting per user prevents resource exhaustion
  • Resource limits (CPU, memory, file descriptors) prevent DoS
  • Arguments validated to prevent injection
  • Clean environment prevents secret leakage
  • Process isolation with new session

Safe Signal HandlingCORRECT

import logging
import os
import signal
import psutil

logger = logging.getLogger(__name__)

class SecureSignalManager:
    # Only allow safe signals
    ALLOWED_SIGNALS = {
        'TERM': signal.SIGTERM,  # Graceful termination
        'HUP': signal.SIGHUP,    # Reload configuration
        'USR1': signal.SIGUSR1,  # User-defined
        'USR2': signal.SIGUSR2   # User-defined
    }

    def send_signal(
        self,
        process_id: str,
        signal_name: str,
        current_user: str,
        process_manager: SecureProcessManager
    ):
        """Send signal to process with authorization"""
        # Validate signal
        if signal_name not in self.ALLOWED_SIGNALS:
            raise ValueError(f"Signal '{signal_name}' not permitted")

        sig = self.ALLOWED_SIGNALS[signal_name]

        # Get managed process
        process = process_manager.managed_processes.get(process_id)
        if not process:
            raise ValueError(f"Process {process_id} not under management")

        # Authorization check
        if process.owner != current_user and current_user != 'admin':
            logger.warning(
                f"User {current_user} attempted to signal process owned by {process.owner}"
            )
            raise PermissionError("Cannot signal process owned by another user")

        try:
            # Verify process identity
            proc = psutil.Process(process.pid)
            if proc.name() != process.name:
                raise ValueError("Process identity mismatch")

            # Send signal
            os.kill(process.pid, sig)

            logger.info(
                f"Signal {signal_name} sent to process {process_id} "
                f"(PID: {process.pid}) by {current_user}"
            )

        except psutil.NoSuchProcess:
            raise ProcessLookupError(f"Process {process_id} no longer exists")

Why this works:

  • Signal allowlist prevents dangerous signals (SIGKILL, SIGSTOP)
  • Authorization and ownership verification
  • Process identity verification prevents PID reuse
  • Comprehensive logging

Secure Process Information DisclosureCORRECT

import psutil
from typing import Dict, Any

class SecureProcessInfo:
    def get_process_info(
        self,
        process_id: str,
        current_user: str,
        process_manager: SecureProcessManager
    ) -> Dict[str, Any]:
        """Get process information with authorization"""
        # Get managed process
        process = process_manager.managed_processes.get(process_id)
        if not process:
            raise ValueError(f"Process {process_id} not under management")

        # Authorization - only owner or admin
        if process.owner != current_user and current_user != 'admin':
            raise PermissionError("Cannot view process owned by another user")

        try:
            proc = psutil.Process(process.pid)

            # Return only safe, sanitized information
            return {
                'id': process_id,
                'pid': process.pid,
                'name': process.name,
                'status': proc.status(),
                'cpu_percent': proc.cpu_percent(interval=0.1),
                'memory_mb': proc.memory_info().rss / 1024 / 1024,
                'started_at': process.started_at.isoformat(),
                'owner': process.owner
                # Do NOT include: cmdline, environ, connections, open files
            }

        except psutil.NoSuchProcess:
            raise ProcessLookupError(f"Process {process_id} no longer exists")

Why this works:

  • Authorization check prevents information disclosure
  • Returns only safe, non-sensitive information
  • Excludes command line arguments (may contain secrets)
  • Excludes environment variables
  • Excludes network connections and file handles

Celery Task Management with AuthorizationCORRECT

import logging
from celery import current_app
from flask import request, g
import re

logger = logging.getLogger(__name__)

class SecureCeleryManager:
    # Valid task ID format (UUID)
    TASK_ID_PATTERN = re.compile(
        r'^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$'
    )

    def __init__(self):
        self.task_owners = {}  # task_id -> username mapping

    def register_task(self, task_id: str, owner: str):
        """Register task ownership"""
        self.task_owners[task_id] = owner

    def revoke_task(self, task_id: str, current_user: str, terminate: bool = False):
        """Revoke Celery task with authorization"""
        # Validate task ID format
        if not self.TASK_ID_PATTERN.match(task_id):
            raise ValueError(f"Invalid task ID format: {task_id}")

        # Check ownership
        owner = self.task_owners.get(task_id)
        if not owner:
            raise ValueError(f"Task {task_id} not found or not owned by you")

        if owner != current_user and current_user != 'admin':
            logger.warning(
                f"User {current_user} attempted to revoke task owned by {owner}"
            )
            raise PermissionError("Cannot revoke task owned by another user")

        # Revoke with appropriate signal
        current_app.control.revoke(
            task_id,
            terminate=terminate,  # Only if explicitly requested
            signal='SIGTERM' if terminate else None  # Graceful termination
        )

        logger.info(
            f"Task {task_id} revoked by {current_user} "
            f"(terminate={terminate})"
        )

        del self.task_owners[task_id]

Why this works:

  • Task ID format validation prevents injection
  • Ownership tracking and verification
  • Authorization check before revocation
  • Graceful termination by default
  • Audit logging

Docker Container Process ControlCORRECT

import docker
import logging
import os
from typing import Dict

logger = logging.getLogger(__name__)

class SecureContainerManager:
    def __init__(self):
        self.client = docker.from_env()
        self.managed_containers: Dict[str, str] = {}  # container_id -> owner

    def start_container(
        self,
        image: str,
        command: str,
        current_user: str
    ) -> str:
        """Start container with security controls"""
        # Image allowlist
        ALLOWED_IMAGES = {
            'worker': 'myapp/worker:latest',
            'analyzer': 'myapp/analyzer:latest'
        }

        if image not in ALLOWED_IMAGES:
            raise ValueError(f"Image '{image}' not permitted")

        # Start container with security options
        container = self.client.containers.run(
            ALLOWED_IMAGES[image],
            command,
            detach=True,
            # Security options
            read_only=True,  # Read-only filesystem
            mem_limit='512m',  # Memory limit
            cpu_quota=50000,  # CPU limit (50% of one core)
            pids_limit=100,  # Limit number of processes
            network_mode='bridge',  # Network isolation
            cap_drop=['ALL'],  # Drop all capabilities
            security_opt=['no-new-privileges'],  # Prevent privilege escalation
            # User namespace
            userns_mode='host',
            user=f'{os.getuid()}:{os.getgid()}'  # Run as non-root
        )

        self.managed_containers[container.id] = current_user

        logger.info(
            f"Container {container.id} started by {current_user} "
            f"(image: {image})"
        )

        return container.id

    def stop_container(self, container_id: str, current_user: str):
        """Stop container with authorization"""
        # Check ownership
        owner = self.managed_containers.get(container_id)
        if not owner:
            raise ValueError(f"Container {container_id} not found")

        if owner != current_user and current_user != 'admin':
            raise PermissionError("Cannot stop container owned by another user")

        container = self.client.containers.get(container_id)
        container.stop(timeout=10)  # Graceful stop with timeout

        logger.info(f"Container {container_id} stopped by {current_user}")

        del self.managed_containers[container_id]

Why this works:

  • Image allowlist prevents arbitrary container execution
  • Resource limits prevent DoS
  • Read-only filesystem
  • Dropped capabilities and security options
  • Runs as non-root user
  • Network isolation

Key Security Functions

Process Validator

import re
from typing import Optional

class ProcessValidator:
    """Validate process control parameters"""

    # Valid process ID pattern (internal identifier)
    PROCESS_ID_PATTERN = re.compile(r'^[a-z0-9_-]{1,64}$')

    @staticmethod
    def validate_process_id(process_id: str) -> bool:
        """Validate process identifier format"""
        if not isinstance(process_id, str):
            raise TypeError("Process ID must be a string")

        if not ProcessValidator.PROCESS_ID_PATTERN.match(process_id):
            raise ValueError(
                f"Invalid process ID format: {process_id}. "
                "Must be alphanumeric, dash, or underscore (1-64 chars)"
            )

        return True

    @staticmethod
    def validate_pid(pid: int) -> bool:
        """Validate system PID"""
        if not isinstance(pid, int):
            raise TypeError("PID must be an integer")

        if pid <= 0:
            raise ValueError(f"Invalid PID: {pid}")

        # Prevent targeting low PIDs (system processes)
        if pid < 100:
            raise ValueError(f"Cannot target system process (PID < 100): {pid}")

        return True

    @staticmethod
    def validate_signal_name(signal_name: str) -> bool:
        """Validate signal name"""
        ALLOWED_SIGNALS = {'TERM', 'HUP', 'USR1', 'USR2'}

        if signal_name not in ALLOWED_SIGNALS:
            raise ValueError(
                f"Signal '{signal_name}' not allowed. "
                f"Permitted: {', '.join(ALLOWED_SIGNALS)}"
            )

        return True

Process Ownership Checker

import logging
import os
import psutil

logger = logging.getLogger(__name__)

def check_process_ownership(pid: int, expected_user: str) -> bool:
    """Verify process is owned by expected user"""
    try:
        proc = psutil.Process(pid)
        process_user = proc.username()

        # On Unix, compare usernames
        if process_user != expected_user:
            logger.warning(
                f"Process {pid} ownership mismatch: "
                f"expected {expected_user}, got {process_user}"
            )
            return False

        return True

    except psutil.NoSuchProcess:
        return False
    except psutil.AccessDenied:
        logger.warning(f"Access denied checking ownership of PID {pid}")
        return False

Verification

After implementing the recommended secure patterns, verify the fix through multiple approaches:

  • Manual testing: Submit malicious payloads relevant to this vulnerability and confirm they're handled safely without executing unintended operations
  • Code review: Confirm all instances use the secure pattern (parameterized queries, safe APIs, proper encoding) with no string concatenation or unsafe operations
  • Static analysis: Use security scanners to verify no new vulnerabilities exist and the original finding is resolved
  • Regression testing: Ensure legitimate user inputs and application workflows continue to function correctly
  • Edge case validation: Test with special characters, boundary conditions, and unusual inputs to verify proper handling
  • Framework verification: If using a framework or library, confirm the recommended APIs are used correctly according to documentation
  • Authentication/session testing: Verify security controls remain effective and cannot be bypassed (if applicable to the vulnerability type)
  • Rescan: Run the security scanner again to confirm the finding is resolved and no new issues were introduced

Security Checklist

  • All process control operations require authentication and authorization
  • Process allowlist implemented - only managed processes can be controlled
  • PID validation prevents targeting system processes (PID < 100)
  • Ownership verification ensures users only control their own processes
  • Signal allowlist restricts to safe signals (no SIGKILL, SIGSTOP for users)
  • Rate limiting prevents process spawning DoS
  • Resource limits applied to all spawned processes (CPU, memory, file descriptors)
  • No shell=True in subprocess calls with user input
  • Arguments validated before passing to subprocess
  • Environment sanitized for spawned processes (no secret leakage)
  • Process information disclosure restricted to authorized users
  • Comprehensive audit logging of all process control operations
  • Error messages don't leak sensitive process information
  • Celery/background task management includes authorization
  • Container process control uses security options and resource limits

Additional Resources