Skip to content

CWE-943: NoSQL Injection - Python

Overview

NoSQL Injection in Python applications occurs when untrusted input is used to construct NoSQL database queries (MongoDB, Redis, CouchDB, DynamoDB, etc.) without proper validation or sanitization. Untrusted input can originate from HTTP requests, external APIs, databases, files, message queues, or any source outside the application's control. Attackers can exploit this to bypass authentication, extract sensitive data, modify database contents, or execute unauthorized operations.

Primary Defence: Use MongoEngine's query API or PyMongo's typed query methods instead of building raw query dictionaries from user input, validate and type-check all input before using in queries, strip or reject NoSQL operator prefixes ($, .) from user input, implement allowlists for permitted query operators and fields, and use parameterized queries or ORM methods to prevent NoSQL injection attacks.

Common Python NoSQL Vulnerabilities:

  • MongoDB query injection via operator injection ($ne, $gt, $where, $regex)
  • MongoDB aggregation pipeline injection
  • Redis command injection via unsanitized keys/values
  • CouchDB view query manipulation
  • DynamoDB expression injection

Popular Python NoSQL Libraries:

  • PyMongo: Official MongoDB driver
  • Motor: Async MongoDB driver for Python
  • mongoengine: MongoDB ODM (Object-Document Mapper)
  • redis-py: Redis client
  • boto3: AWS DynamoDB client

Common Vulnerable Patterns

MongoDB Operator Injection

# VULNERABLE - Direct untrusted input in MongoDB query
from pymongo import MongoClient

client = MongoClient('mongodb://localhost:27017/')
db = client['app_database']

def authenticate_user(username, password):
    # VULNERABLE - Untrusted input directly in query
    user = db.users.find_one({
        'username': username,
        'password': password
    })

    return user is not None

# Attack: username = {"$ne": null}, password = {"$ne": null}
# Query becomes: {'username': {'$ne': None}, 'password': {'$ne': None}}
# Returns first user (authentication bypass!)

Why this is vulnerable:

  • MongoDB operators ($ne, $gt, $regex) accepted in queries
  • JSON/dict injection from request parameters
  • No type validation
  • Authentication bypass possible

Flask API with JSON Injection

# VULNERABLE - Accepting arbitrary JSON in queries
from flask import Flask, request, jsonify
from pymongo import MongoClient

app = Flask(__name__)
client = MongoClient('mongodb://localhost:27017/')
db = client['shop']

@app.route('/api/products', methods=['POST'])
def search_products():
    # VULNERABLE - Arbitrary query object from untrusted source
    query = request.get_json()

    # No validation on query structure
    products = list(db.products.find(query))

    return jsonify(products)

# Attack POST body: {"price": {"$gt": 0}, "admin_only": {"$ne": true}}
# Bypasses access controls, retrieves admin products

Why this is vulnerable:

  • Accepts arbitrary query operators
  • No field allowlist
  • Can access hidden/admin fields
  • Data exfiltration possible

MongoDB $where Operator Injection

# VULNERABLE - JavaScript code injection via $where
from pymongo import MongoClient

client = MongoClient('mongodb://localhost:27017/')
db = client['app']

def find_users_by_age(age):
    # VULNERABLE - String concatenation in $where
    query = {
        '$where': f'this.age > {age}'
    }

    users = list(db.users.find(query))
    return users

# Attack: age = "0; return true; //"
# Executes arbitrary JavaScript: this.age > 0; return true; //
# Returns all users regardless of age

Why this is vulnerable:

  • $where executes JavaScript on MongoDB server
  • String concatenation allows code injection
  • Denial of service (infinite loops)
  • Data exfiltration

Django with mongoengine Injection

# VULNERABLE - MongoEngine with raw queries
from mongoengine import Document, StringField, connect
from flask import request

connect('mydb')

class User(Document):
    username = StringField()
    email = StringField()
    role = StringField()

def get_user_profile(username):
    # VULNERABLE - Using __raw__ with untrusted input
    query = {'username': username}
    user = User.objects(__raw__=query).first()

    return user

# Attack: username = {"$ne": None, "role": "admin"}
# Returns first admin user instead of specific user

Why this is vulnerable:

  • __raw__ accepts MongoDB operators
  • No validation on query structure
  • Privilege escalation
  • ODM bypassed

Redis Command Injection

# VULNERABLE - Redis key injection
import redis
from flask import Flask, request

app = Flask(__name__)
r = redis.Redis(host='localhost', port=6379)

@app.route('/cache/<key>')
def get_cache(key):
    # VULNERABLE - Untrusted input in Redis key
    value = r.get(key)
    return value or 'Not found'

@app.route('/set_cache')
def set_cache():
    key = request.args.get('key')
    value = request.args.get('value')

    # VULNERABLE - Command injection possible
    r.set(key, value)
    return 'OK'

# Attack: key = "test\r\nFLUSHDB\r\n"
# Injects Redis command to flush entire database

Why this is vulnerable:

  • CRLF injection in Redis protocol
  • Can execute arbitrary Redis commands
  • Database wipeout possible
  • Data exfiltration

MongoDB Aggregation Injection

# VULNERABLE - Aggregation pipeline with untrusted input
from pymongo import MongoClient

client = MongoClient('mongodb://localhost:27017/')
db = client['analytics']

def get_user_stats(user_id, sort_field):
    # VULNERABLE - Untrusted input in aggregation pipeline
    pipeline = [
        {'$match': {'user_id': user_id}},
        {'$sort': {sort_field: -1}},
        {'$limit': 10}
    ]

    results = list(db.events.aggregate(pipeline))
    return results

# Attack: sort_field = {"$where": "sleep(5000)"}
# Causes denial of service with sleep injection

Why this is vulnerable:

  • Aggregation operators injectable
  • No field validation
  • DoS via $where
  • Data pipeline manipulation

MongoDB Regex Injection

# VULNERABLE - Regex injection in queries
from pymongo import MongoClient
import re

client = MongoClient('mongodb://localhost:27017/')
db = client['app']

def search_users(search_term):
    # VULNERABLE - Untrusted input in regex without escaping
    query = {
        'username': {'$regex': search_term, '$options': 'i'}
    }

    users = list(db.users.find(query))
    return users

# Attack: search_term = ".*"
# Returns ALL users (DoS, data exfiltration)
# Attack: search_term = "^admin.*$"
# Discovers admin usernames

Why this is vulnerable:

  • Regex patterns from untrusted sources
  • ReDoS (Regular Expression Denial of Service)
  • Information disclosure
  • No escaping or limits

DynamoDB Expression Injection

# VULNERABLE - DynamoDB filter expression injection
import boto3
from boto3.dynamodb.conditions import Attr

dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('Users')

def search_users(attribute_name, value):
    # VULNERABLE - Untrusted attribute name
    response = table.scan(
        FilterExpression=Attr(attribute_name).eq(value)
    )

    return response['Items']

# Attack: attribute_name = "admin", value = True
# Bypasses intended search, finds admin users

Why this is vulnerable:

  • Attribute names from untrusted sources
  • Can access hidden attributes
  • No field allowlist
  • Authorization bypass

Secure Patterns

MongoDB with Type Validation

# SECURE - Strict type validation for MongoDB queries
from pymongo import MongoClient
from typing import Optional

client = MongoClient('mongodb://localhost:27017/')
db = client['app_database']

def validate_string(value: str, max_length: int = 100) -> str:
    """Validate string input."""
    if not isinstance(value, str):
        raise ValueError("Expected string value")

    if len(value) > max_length:
        raise ValueError(f"Value exceeds max length {max_length}")

    return value

def authenticate_user(username: str, password: str) -> bool:
    """Secure user authentication with type validation."""
    # SECURE - Validate input types
    clean_username = validate_string(username, max_length=50)
    clean_password = validate_string(password, max_length=100)

    # SECURE - Only string values allowed, no operators
    user = db.users.find_one({
        'username': clean_username,
        'password': clean_password  # In production, use hashed passwords!
    })

    return user is not None

# Attack attempts with dicts/operators will fail type validation

Why this works: Python's type hints combined with runtime validation prevent NoSQL injection by rejecting non-string inputs. The validate_string() function explicitly checks isinstance(value, str), rejecting dictionaries like {"$ne": None} that attackers use for operator injection. Maximum length enforcement (100 characters) prevents DoS attacks with massive inputs. The query construction uses only validated strings in simple equality comparisons {"username": clean_username}, which MongoDB treats as literal string matching - no operators can be injected. Even if an attacker passes JSON with operators through the API, the type validation catches it before query construction, making operator injection impossible.

Flask with Query Allowlist

# SECURE - Field allowlist and validation
from flask import Flask, request, jsonify
from pymongo import MongoClient

app = Flask(__name__)
client = MongoClient('mongodb://localhost:27017/')
db = client['shop']

# SECURE - Define allowed query fields
ALLOWED_FIELDS = {
    'name': str,
    'category': str,
    'price_min': (int, float),
    'price_max': (int, float)
}

def build_safe_query(params: dict) -> dict:
    """Build safe MongoDB query from parameters."""
    query = {}

    for field, value in params.items():
        # SECURE - Only allow allowlisted fields
        if field not in ALLOWED_FIELDS:
            continue

        expected_types = ALLOWED_FIELDS[field]
        if not isinstance(expected_types, tuple):
            expected_types = (expected_types,)

        # SECURE - Validate type
        if not isinstance(value, expected_types):
            continue

        # SECURE - Build safe query conditions
        if field == 'price_min':
            query['price'] = query.get('price', {})
            query['price']['$gte'] = value
        elif field == 'price_max':
            query['price'] = query.get('price', {})
            query['price']['$lte'] = value
        else:
            query[field] = value

    return query

@app.route('/api/products', methods=['GET'])
def search_products():
    params = request.args.to_dict()

    # SECURE - Build validated query
    safe_query = build_safe_query(params)

    products = list(db.products.find(safe_query).limit(100))

    return jsonify(products)

Why this works: The field allowlist approach prevents arbitrary field queries and operator injection. The ALLOWED_FIELDS dictionary acts as an allow-list with expected types, rejecting any fields not explicitly permitted (like admin_only or _internal_flags). Type validation ensures each field receives the correct type - strings get strings, numbers get numbers - preventing type confusion attacks. Controlled operator usage with $gte and $lte is safe because the code constructs them programmatically with validated values, not user-supplied operators. The limit(100) prevents resource exhaustion. This pattern gives users query flexibility (searching by name, category, price range) without exposing the application to injection, because the query structure is controlled by code, not user input.

No $where, Use Safe Operators

# SECURE - Avoid $where, use safe operators
from pymongo import MongoClient
from typing import Union

client = MongoClient('mongodb://localhost:27017/')
db = client['app']

def validate_age(age: Union[int, str]) -> int:
    """Validate and convert age to integer."""
    try:
        age_int = int(age)
        if age_int < 0 or age_int > 150:
            raise ValueError("Age out of valid range")
        return age_int
    except (ValueError, TypeError):
        raise ValueError("Invalid age value")

def find_users_by_age(min_age: Union[int, str]) -> list:
    """Find users by minimum age using safe operators."""
    # SECURE - Validate input
    clean_age = validate_age(min_age)

    # SECURE - Use safe $gte operator instead of $where
    query = {
        'age': {'$gte': clean_age}
    }

    users = list(db.users.find(query).limit(100))
    return users

Why this works: The $where operator executes JavaScript on the MongoDB server, making it extremely dangerous for injection attacks. By completely avoiding $where and using safe comparison operators like $gt, the code eliminates JavaScript injection risk. The validate_age() function converts strings to integers with bounds checking (0-150), preventing type confusion and invalid values. Using {"age": {"$gt": validated_age}} is safe because $gt performs numeric comparison without code execution - attackers cannot inject JavaScript or other operators. The limit(1000) prevents resource exhaustion. This pattern demonstrates that most queries can be accomplished with safe operators, making $where unnecessary and eliminating a major injection vector.

MongoEngine with Field Validation

# SECURE - MongoEngine with proper field access
from mongoengine import Document, StringField, EmailField, connect
from flask import request, abort

connect('mydb')

class User(Document):
    username = StringField(required=True, max_length=50)
    email = EmailField(required=True)
    role = StringField(choices=['user', 'admin'])

def validate_username(username: str) -> str:
    """Validate username format."""
    if not isinstance(username, str):
        raise ValueError("Username must be a string")

    if not username.isalnum():
        raise ValueError("Username must be alphanumeric")

    if len(username) > 50:
        raise ValueError("Username too long")

    return username

def get_user_profile(username: str):
    """Get user profile securely."""
    # SECURE - Validate input
    clean_username = validate_username(username)

    # SECURE - Use ODM fields, not __raw__
    user = User.objects(username=clean_username).first()

    if not user:
        abort(404)

    return user

Why this works: MongoEngine's ODM (Object-Document Mapper) provides built-in protection through schema validation and type-safe queries. The User class defines the schema with field types (StringField, EmailField) and constraints (required=True, max_length=50), preventing invalid data. The validate_username() function adds an additional layer with regex pattern matching ^[a-zA-Z0-9_.-]+$, rejecting any special characters that could be part of injection attempts. Using User.objects(username=clean_username) leverages MongoEngine's query builder, which constructs safe queries internally - avoiding raw query dictionaries that could contain operators. By never using __raw__ queries with untrusted input, the code benefits from MongoEngine's protection layer, making operator injection impossible.

Redis with Input Sanitization

# SECURE - Redis with key validation
import redis
from flask import Flask, request, abort
import re

app = Flask(__name__)
r = redis.Redis(host='localhost', port=6379)

def validate_redis_key(key: str) -> str:
    """Validate Redis key format."""
    if not isinstance(key, str):
        raise ValueError("Key must be a string")

    # SECURE - Only allow alphanumeric, dash, underscore
    if not re.match(r'^[a-zA-Z0-9_-]{1,100}$', key):
        raise ValueError("Invalid key format")

    return key

def validate_redis_value(value: str) -> str:
    """Validate Redis value."""
    if not isinstance(value, str):
        raise ValueError("Value must be a string")

    # SECURE - Remove CRLF to prevent command injection
    clean_value = value.replace('\r', '').replace('\n', '')

    if len(clean_value) > 10000:
        raise ValueError("Value too large")

    return clean_value

@app.route('/cache/<key>')
def get_cache(key):
    try:
        # SECURE - Validate key
        clean_key = validate_redis_key(key)
        value = r.get(clean_key)
        return value.decode('utf-8') if value else 'Not found'
    except ValueError as e:
        abort(400, str(e))

@app.route('/set_cache', methods=['POST'])
def set_cache():
    try:
        key = request.form.get('key', '')
        value = request.form.get('value', '')

        # SECURE - Validate both key and value
        clean_key = validate_redis_key(key)
        clean_value = validate_redis_value(value)

        # SECURE - Use setex with expiration
        r.setex(clean_key, 3600, clean_value)

        return 'OK'
    except ValueError as e:
        abort(400, str(e))

Why this works:

Redis command injection exploits CRLF characters (\r\n) in the Redis protocol to inject additional commands. The validate_key function uses a strict regex pattern ^[a-zA-Z0-9:_-]+$ allowing only alphanumeric characters, colons, underscores, and hyphens - preventing CRLF injection. The sanitize_value function explicitly removes \r, \n, and control characters that could be used for protocol injection. Length limits (200 characters for keys, 10KB for values) prevent DoS attacks. By validating both keys and values before any Redis operation, the code ensures that user input cannot break out of the intended command structure to execute arbitrary Redis commands like FLUSHDB or CONFIG.

Safe MongoDB Aggregation

# SECURE - MongoDB aggregation with field allowlist
from pymongo import MongoClient

client = MongoClient('mongodb://localhost:27017/')
db = client['analytics']

# SECURE - Define allowed sort fields
ALLOWED_SORT_FIELDS = ['timestamp', 'event_type', 'user_id']

def validate_user_id(user_id: str) -> str:
    """Validate user ID format."""
    if not isinstance(user_id, str):
        raise ValueError("User ID must be a string")

    if not user_id.isalnum():
        raise ValueError("User ID must be alphanumeric")

    if len(user_id) > 50:
        raise ValueError("User ID too long")

    return user_id

def get_user_stats(user_id: str, sort_field: str) -> list:
    """Get user statistics with safe aggregation."""
    # SECURE - Validate user ID
    clean_user_id = validate_user_id(user_id)

    # SECURE - Validate sort field against allowlist
    if sort_field not in ALLOWED_SORT_FIELDS:
        raise ValueError(f"Invalid sort field. Allowed: {ALLOWED_SORT_FIELDS}")

    # SECURE - Build pipeline with validated values
    pipeline = [
        {'$match': {'user_id': clean_user_id}},
        {'$sort': {sort_field: -1}},
        {'$limit': 100}
    ]

    results = list(db.events.aggregate(pipeline))
    return results

Why this works: MongoDB aggregation pipelines are powerful but can be exploited if stages or operators come from untrusted sources. The ALLOWED_SORT_FIELDS allowlist prevents injection of dangerous stages or operators - attackers cannot inject $where (JavaScript execution), $lookup (access other collections), or $function (arbitrary code). The validate_user_id() function ensures the match criteria uses validated data, preventing operator injection at the pipeline entry point. Constructing the pipeline programmatically with validated inputs in controlled positions (not building from user JSON) ensures attackers cannot add malicious stages. The limit(1000) prevents resource exhaustion. This pattern enables complex queries while maintaining security by controlling the pipeline structure through code, not user input.

Verification

After implementing the recommended secure patterns, verify the fix through multiple approaches:

  • Manual testing: Submit malicious payloads relevant to this vulnerability and confirm they're handled safely without executing unintended operations
  • Code review: Confirm all instances use the secure pattern (parameterized queries, safe APIs, proper encoding) with no string concatenation or unsafe operations
  • Static analysis: Use security scanners to verify no new vulnerabilities exist and the original finding is resolved
  • Regression testing: Ensure legitimate user inputs and application workflows continue to function correctly
  • Edge case validation: Test with special characters, boundary conditions, and unusual inputs to verify proper handling
  • Framework verification: If using a framework or library, confirm the recommended APIs are used correctly according to documentation
  • Authentication/session testing: Verify security controls remain effective and cannot be bypassed (if applicable to the vulnerability type)
  • Rescan: Run the security scanner again to confirm the finding is resolved and no new issues were introduced

Additional Resources