CWE-943: NoSQL Injection - Python
Overview
NoSQL Injection in Python applications occurs when untrusted input is used to construct NoSQL database queries (MongoDB, Redis, CouchDB, DynamoDB, etc.) without proper validation or sanitization. Untrusted input can originate from HTTP requests, external APIs, databases, files, message queues, or any source outside the application's control. Attackers can exploit this to bypass authentication, extract sensitive data, modify database contents, or execute unauthorized operations.
Primary Defence: Use MongoEngine's query API or PyMongo's typed query methods instead of building raw query dictionaries from user input, validate and type-check all input before using in queries, strip or reject NoSQL operator prefixes ($, .) from user input, implement allowlists for permitted query operators and fields, and use parameterized queries or ORM methods to prevent NoSQL injection attacks.
Common Python NoSQL Vulnerabilities:
- MongoDB query injection via operator injection (
$ne,$gt,$where,$regex) - MongoDB aggregation pipeline injection
- Redis command injection via unsanitized keys/values
- CouchDB view query manipulation
- DynamoDB expression injection
Popular Python NoSQL Libraries:
- PyMongo: Official MongoDB driver
- Motor: Async MongoDB driver for Python
- mongoengine: MongoDB ODM (Object-Document Mapper)
- redis-py: Redis client
- boto3: AWS DynamoDB client
Common Vulnerable Patterns
MongoDB Operator Injection
# VULNERABLE - Direct untrusted input in MongoDB query
from pymongo import MongoClient
client = MongoClient('mongodb://localhost:27017/')
db = client['app_database']
def authenticate_user(username, password):
# VULNERABLE - Untrusted input directly in query
user = db.users.find_one({
'username': username,
'password': password
})
return user is not None
# Attack: username = {"$ne": null}, password = {"$ne": null}
# Query becomes: {'username': {'$ne': None}, 'password': {'$ne': None}}
# Returns first user (authentication bypass!)
Why this is vulnerable:
- MongoDB operators (
$ne,$gt,$regex) accepted in queries - JSON/dict injection from request parameters
- No type validation
- Authentication bypass possible
Flask API with JSON Injection
# VULNERABLE - Accepting arbitrary JSON in queries
from flask import Flask, request, jsonify
from pymongo import MongoClient
app = Flask(__name__)
client = MongoClient('mongodb://localhost:27017/')
db = client['shop']
@app.route('/api/products', methods=['POST'])
def search_products():
# VULNERABLE - Arbitrary query object from untrusted source
query = request.get_json()
# No validation on query structure
products = list(db.products.find(query))
return jsonify(products)
# Attack POST body: {"price": {"$gt": 0}, "admin_only": {"$ne": true}}
# Bypasses access controls, retrieves admin products
Why this is vulnerable:
- Accepts arbitrary query operators
- No field allowlist
- Can access hidden/admin fields
- Data exfiltration possible
MongoDB $where Operator Injection
# VULNERABLE - JavaScript code injection via $where
from pymongo import MongoClient
client = MongoClient('mongodb://localhost:27017/')
db = client['app']
def find_users_by_age(age):
# VULNERABLE - String concatenation in $where
query = {
'$where': f'this.age > {age}'
}
users = list(db.users.find(query))
return users
# Attack: age = "0; return true; //"
# Executes arbitrary JavaScript: this.age > 0; return true; //
# Returns all users regardless of age
Why this is vulnerable:
$whereexecutes JavaScript on MongoDB server- String concatenation allows code injection
- Denial of service (infinite loops)
- Data exfiltration
Django with mongoengine Injection
# VULNERABLE - MongoEngine with raw queries
from mongoengine import Document, StringField, connect
from flask import request
connect('mydb')
class User(Document):
username = StringField()
email = StringField()
role = StringField()
def get_user_profile(username):
# VULNERABLE - Using __raw__ with untrusted input
query = {'username': username}
user = User.objects(__raw__=query).first()
return user
# Attack: username = {"$ne": None, "role": "admin"}
# Returns first admin user instead of specific user
Why this is vulnerable:
__raw__accepts MongoDB operators- No validation on query structure
- Privilege escalation
- ODM bypassed
Redis Command Injection
# VULNERABLE - Redis key injection
import redis
from flask import Flask, request
app = Flask(__name__)
r = redis.Redis(host='localhost', port=6379)
@app.route('/cache/<key>')
def get_cache(key):
# VULNERABLE - Untrusted input in Redis key
value = r.get(key)
return value or 'Not found'
@app.route('/set_cache')
def set_cache():
key = request.args.get('key')
value = request.args.get('value')
# VULNERABLE - Command injection possible
r.set(key, value)
return 'OK'
# Attack: key = "test\r\nFLUSHDB\r\n"
# Injects Redis command to flush entire database
Why this is vulnerable:
- CRLF injection in Redis protocol
- Can execute arbitrary Redis commands
- Database wipeout possible
- Data exfiltration
MongoDB Aggregation Injection
# VULNERABLE - Aggregation pipeline with untrusted input
from pymongo import MongoClient
client = MongoClient('mongodb://localhost:27017/')
db = client['analytics']
def get_user_stats(user_id, sort_field):
# VULNERABLE - Untrusted input in aggregation pipeline
pipeline = [
{'$match': {'user_id': user_id}},
{'$sort': {sort_field: -1}},
{'$limit': 10}
]
results = list(db.events.aggregate(pipeline))
return results
# Attack: sort_field = {"$where": "sleep(5000)"}
# Causes denial of service with sleep injection
Why this is vulnerable:
- Aggregation operators injectable
- No field validation
- DoS via
$where - Data pipeline manipulation
MongoDB Regex Injection
# VULNERABLE - Regex injection in queries
from pymongo import MongoClient
import re
client = MongoClient('mongodb://localhost:27017/')
db = client['app']
def search_users(search_term):
# VULNERABLE - Untrusted input in regex without escaping
query = {
'username': {'$regex': search_term, '$options': 'i'}
}
users = list(db.users.find(query))
return users
# Attack: search_term = ".*"
# Returns ALL users (DoS, data exfiltration)
# Attack: search_term = "^admin.*$"
# Discovers admin usernames
Why this is vulnerable:
- Regex patterns from untrusted sources
- ReDoS (Regular Expression Denial of Service)
- Information disclosure
- No escaping or limits
DynamoDB Expression Injection
# VULNERABLE - DynamoDB filter expression injection
import boto3
from boto3.dynamodb.conditions import Attr
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('Users')
def search_users(attribute_name, value):
# VULNERABLE - Untrusted attribute name
response = table.scan(
FilterExpression=Attr(attribute_name).eq(value)
)
return response['Items']
# Attack: attribute_name = "admin", value = True
# Bypasses intended search, finds admin users
Why this is vulnerable:
- Attribute names from untrusted sources
- Can access hidden attributes
- No field allowlist
- Authorization bypass
Secure Patterns
MongoDB with Type Validation
# SECURE - Strict type validation for MongoDB queries
from pymongo import MongoClient
from typing import Optional
client = MongoClient('mongodb://localhost:27017/')
db = client['app_database']
def validate_string(value: str, max_length: int = 100) -> str:
"""Validate string input."""
if not isinstance(value, str):
raise ValueError("Expected string value")
if len(value) > max_length:
raise ValueError(f"Value exceeds max length {max_length}")
return value
def authenticate_user(username: str, password: str) -> bool:
"""Secure user authentication with type validation."""
# SECURE - Validate input types
clean_username = validate_string(username, max_length=50)
clean_password = validate_string(password, max_length=100)
# SECURE - Only string values allowed, no operators
user = db.users.find_one({
'username': clean_username,
'password': clean_password # In production, use hashed passwords!
})
return user is not None
# Attack attempts with dicts/operators will fail type validation
Why this works: Python's type hints combined with runtime validation prevent NoSQL injection by rejecting non-string inputs. The validate_string() function explicitly checks isinstance(value, str), rejecting dictionaries like {"$ne": None} that attackers use for operator injection. Maximum length enforcement (100 characters) prevents DoS attacks with massive inputs. The query construction uses only validated strings in simple equality comparisons {"username": clean_username}, which MongoDB treats as literal string matching - no operators can be injected. Even if an attacker passes JSON with operators through the API, the type validation catches it before query construction, making operator injection impossible.
Flask with Query Allowlist
# SECURE - Field allowlist and validation
from flask import Flask, request, jsonify
from pymongo import MongoClient
app = Flask(__name__)
client = MongoClient('mongodb://localhost:27017/')
db = client['shop']
# SECURE - Define allowed query fields
ALLOWED_FIELDS = {
'name': str,
'category': str,
'price_min': (int, float),
'price_max': (int, float)
}
def build_safe_query(params: dict) -> dict:
"""Build safe MongoDB query from parameters."""
query = {}
for field, value in params.items():
# SECURE - Only allow allowlisted fields
if field not in ALLOWED_FIELDS:
continue
expected_types = ALLOWED_FIELDS[field]
if not isinstance(expected_types, tuple):
expected_types = (expected_types,)
# SECURE - Validate type
if not isinstance(value, expected_types):
continue
# SECURE - Build safe query conditions
if field == 'price_min':
query['price'] = query.get('price', {})
query['price']['$gte'] = value
elif field == 'price_max':
query['price'] = query.get('price', {})
query['price']['$lte'] = value
else:
query[field] = value
return query
@app.route('/api/products', methods=['GET'])
def search_products():
params = request.args.to_dict()
# SECURE - Build validated query
safe_query = build_safe_query(params)
products = list(db.products.find(safe_query).limit(100))
return jsonify(products)
Why this works: The field allowlist approach prevents arbitrary field queries and operator injection. The ALLOWED_FIELDS dictionary acts as an allow-list with expected types, rejecting any fields not explicitly permitted (like admin_only or _internal_flags). Type validation ensures each field receives the correct type - strings get strings, numbers get numbers - preventing type confusion attacks. Controlled operator usage with $gte and $lte is safe because the code constructs them programmatically with validated values, not user-supplied operators. The limit(100) prevents resource exhaustion. This pattern gives users query flexibility (searching by name, category, price range) without exposing the application to injection, because the query structure is controlled by code, not user input.
No $where, Use Safe Operators
# SECURE - Avoid $where, use safe operators
from pymongo import MongoClient
from typing import Union
client = MongoClient('mongodb://localhost:27017/')
db = client['app']
def validate_age(age: Union[int, str]) -> int:
"""Validate and convert age to integer."""
try:
age_int = int(age)
if age_int < 0 or age_int > 150:
raise ValueError("Age out of valid range")
return age_int
except (ValueError, TypeError):
raise ValueError("Invalid age value")
def find_users_by_age(min_age: Union[int, str]) -> list:
"""Find users by minimum age using safe operators."""
# SECURE - Validate input
clean_age = validate_age(min_age)
# SECURE - Use safe $gte operator instead of $where
query = {
'age': {'$gte': clean_age}
}
users = list(db.users.find(query).limit(100))
return users
Why this works: The $where operator executes JavaScript on the MongoDB server, making it extremely dangerous for injection attacks. By completely avoiding $where and using safe comparison operators like $gt, the code eliminates JavaScript injection risk. The validate_age() function converts strings to integers with bounds checking (0-150), preventing type confusion and invalid values. Using {"age": {"$gt": validated_age}} is safe because $gt performs numeric comparison without code execution - attackers cannot inject JavaScript or other operators. The limit(1000) prevents resource exhaustion. This pattern demonstrates that most queries can be accomplished with safe operators, making $where unnecessary and eliminating a major injection vector.
MongoEngine with Field Validation
# SECURE - MongoEngine with proper field access
from mongoengine import Document, StringField, EmailField, connect
from flask import request, abort
connect('mydb')
class User(Document):
username = StringField(required=True, max_length=50)
email = EmailField(required=True)
role = StringField(choices=['user', 'admin'])
def validate_username(username: str) -> str:
"""Validate username format."""
if not isinstance(username, str):
raise ValueError("Username must be a string")
if not username.isalnum():
raise ValueError("Username must be alphanumeric")
if len(username) > 50:
raise ValueError("Username too long")
return username
def get_user_profile(username: str):
"""Get user profile securely."""
# SECURE - Validate input
clean_username = validate_username(username)
# SECURE - Use ODM fields, not __raw__
user = User.objects(username=clean_username).first()
if not user:
abort(404)
return user
Why this works: MongoEngine's ODM (Object-Document Mapper) provides built-in protection through schema validation and type-safe queries. The User class defines the schema with field types (StringField, EmailField) and constraints (required=True, max_length=50), preventing invalid data. The validate_username() function adds an additional layer with regex pattern matching ^[a-zA-Z0-9_.-]+$, rejecting any special characters that could be part of injection attempts. Using User.objects(username=clean_username) leverages MongoEngine's query builder, which constructs safe queries internally - avoiding raw query dictionaries that could contain operators. By never using __raw__ queries with untrusted input, the code benefits from MongoEngine's protection layer, making operator injection impossible.
Redis with Input Sanitization
# SECURE - Redis with key validation
import redis
from flask import Flask, request, abort
import re
app = Flask(__name__)
r = redis.Redis(host='localhost', port=6379)
def validate_redis_key(key: str) -> str:
"""Validate Redis key format."""
if not isinstance(key, str):
raise ValueError("Key must be a string")
# SECURE - Only allow alphanumeric, dash, underscore
if not re.match(r'^[a-zA-Z0-9_-]{1,100}$', key):
raise ValueError("Invalid key format")
return key
def validate_redis_value(value: str) -> str:
"""Validate Redis value."""
if not isinstance(value, str):
raise ValueError("Value must be a string")
# SECURE - Remove CRLF to prevent command injection
clean_value = value.replace('\r', '').replace('\n', '')
if len(clean_value) > 10000:
raise ValueError("Value too large")
return clean_value
@app.route('/cache/<key>')
def get_cache(key):
try:
# SECURE - Validate key
clean_key = validate_redis_key(key)
value = r.get(clean_key)
return value.decode('utf-8') if value else 'Not found'
except ValueError as e:
abort(400, str(e))
@app.route('/set_cache', methods=['POST'])
def set_cache():
try:
key = request.form.get('key', '')
value = request.form.get('value', '')
# SECURE - Validate both key and value
clean_key = validate_redis_key(key)
clean_value = validate_redis_value(value)
# SECURE - Use setex with expiration
r.setex(clean_key, 3600, clean_value)
return 'OK'
except ValueError as e:
abort(400, str(e))
Why this works:
Redis command injection exploits CRLF characters (\r\n) in the Redis protocol to inject additional commands. The validate_key function uses a strict regex pattern ^[a-zA-Z0-9:_-]+$ allowing only alphanumeric characters, colons, underscores, and hyphens - preventing CRLF injection. The sanitize_value function explicitly removes \r, \n, and control characters that could be used for protocol injection. Length limits (200 characters for keys, 10KB for values) prevent DoS attacks. By validating both keys and values before any Redis operation, the code ensures that user input cannot break out of the intended command structure to execute arbitrary Redis commands like FLUSHDB or CONFIG.
Safe MongoDB Aggregation
# SECURE - MongoDB aggregation with field allowlist
from pymongo import MongoClient
client = MongoClient('mongodb://localhost:27017/')
db = client['analytics']
# SECURE - Define allowed sort fields
ALLOWED_SORT_FIELDS = ['timestamp', 'event_type', 'user_id']
def validate_user_id(user_id: str) -> str:
"""Validate user ID format."""
if not isinstance(user_id, str):
raise ValueError("User ID must be a string")
if not user_id.isalnum():
raise ValueError("User ID must be alphanumeric")
if len(user_id) > 50:
raise ValueError("User ID too long")
return user_id
def get_user_stats(user_id: str, sort_field: str) -> list:
"""Get user statistics with safe aggregation."""
# SECURE - Validate user ID
clean_user_id = validate_user_id(user_id)
# SECURE - Validate sort field against allowlist
if sort_field not in ALLOWED_SORT_FIELDS:
raise ValueError(f"Invalid sort field. Allowed: {ALLOWED_SORT_FIELDS}")
# SECURE - Build pipeline with validated values
pipeline = [
{'$match': {'user_id': clean_user_id}},
{'$sort': {sort_field: -1}},
{'$limit': 100}
]
results = list(db.events.aggregate(pipeline))
return results
Why this works: MongoDB aggregation pipelines are powerful but can be exploited if stages or operators come from untrusted sources. The ALLOWED_SORT_FIELDS allowlist prevents injection of dangerous stages or operators - attackers cannot inject $where (JavaScript execution), $lookup (access other collections), or $function (arbitrary code). The validate_user_id() function ensures the match criteria uses validated data, preventing operator injection at the pipeline entry point. Constructing the pipeline programmatically with validated inputs in controlled positions (not building from user JSON) ensures attackers cannot add malicious stages. The limit(1000) prevents resource exhaustion. This pattern enables complex queries while maintaining security by controlling the pipeline structure through code, not user input.
Verification
After implementing the recommended secure patterns, verify the fix through multiple approaches:
- Manual testing: Submit malicious payloads relevant to this vulnerability and confirm they're handled safely without executing unintended operations
- Code review: Confirm all instances use the secure pattern (parameterized queries, safe APIs, proper encoding) with no string concatenation or unsafe operations
- Static analysis: Use security scanners to verify no new vulnerabilities exist and the original finding is resolved
- Regression testing: Ensure legitimate user inputs and application workflows continue to function correctly
- Edge case validation: Test with special characters, boundary conditions, and unusual inputs to verify proper handling
- Framework verification: If using a framework or library, confirm the recommended APIs are used correctly according to documentation
- Authentication/session testing: Verify security controls remain effective and cannot be bypassed (if applicable to the vulnerability type)
- Rescan: Run the security scanner again to confirm the finding is resolved and no new issues were introduced