Skip to content

CWE-90: LDAP Injection - Python

Overview

LDAP Injection in Python applications occurs when untrusted user input is used to construct LDAP queries without proper validation or escaping. LDAP (Lightweight Directory Access Protocol) is commonly used for authentication, authorization, and directory services. Attackers can manipulate LDAP queries to bypass authentication, escalate privileges, extract sensitive directory data, or cause denial of service.

Primary Defence: Use safe filter construction with ldap3, escape LDAP special characters (*, (, ), \, NUL) using ldap3.utils.conv.escape_filter_chars() before including user input, validate all input against strict allowlists, never allow wildcards in authentication queries, use bind authentication instead of search-based authentication, and implement proper error handling to prevent information disclosure through LDAP injection attacks.

Common Python LDAP Vulnerability Scenarios:

  • Building LDAP search filters with string concatenation
  • Using user input directly in DN (Distinguished Names)
  • Accepting wildcards in authentication queries
  • Insufficient validation of special LDAP characters
  • Blind LDAP injection via error messages

Python LDAP Libraries:

  • ldap3: Modern, pure Python LDAP client (recommended)
  • python-ldap: Legacy LDAP library (C-based, requires escape functions)
  • django-auth-ldap: Django LDAP authentication backend
  • flask-ldap3-login: Flask LDAP authentication

LDAP Special Characters: RFC 4515 requires escaping: *, (, ), \, NUL (some codebases also escape / as defense in depth)

Common Vulnerable Patterns

String Concatenation in LDAP Filter

# VULNERABLE - Direct string concatenation
import ldap

def vulnerable_ldap_auth(username, password):
    conn = ldap.initialize('ldap://localhost:389')

    # VULNERABLE - User input directly in search filter
    search_filter = f"(uid={username})"

    try:
        # Search for user
        result = conn.search_s(
            'ou=users,dc=example,dc=com',
            ldap.SCOPE_SUBTREE,
            search_filter
        )

        if result:
            user_dn = result[0][0]
            # Try to bind with credentials
            conn.simple_bind_s(user_dn, password)
            return True
    except ldap.INVALID_CREDENTIALS:
        return False

    return False

# Attack: username = "admin)(&" bypasses authentication
# Resulting filter: (uid=admin)(&)
# This matches ANY user with uid=admin, ignoring password check

Why this is vulnerable:

  • No input validation or escaping
  • Special characters )( manipulate filter logic
  • Attackers can craft filters to bypass authentication
  • Wildcard attacks possible with *

Django LDAP Without Sanitization

# VULNERABLE - Django view with LDAP search
from django.http import JsonResponse
import ldap

def vulnerable_user_search(request):
    search_term = request.GET.get('name', '')

    # VULNERABLE - Unsanitized input in LDAP filter
    search_filter = f"(cn=*{search_term}*)"

    conn = ldap.initialize('ldap://ldap.example.com:389')
    conn.simple_bind_s('cn=admin,dc=example,dc=com', 'admin_password')

    try:
        results = conn.search_s(
            'ou=people,dc=example,dc=com',
            ldap.SCOPE_SUBTREE,
            search_filter,
            ['cn', 'mail', 'telephoneNumber']
        )

        users = [{
            'name': r[1].get('cn', [b''])[0].decode('utf-8'),
            'email': r[1].get('mail', [b''])[0].decode('utf-8')
        } for r in results]

        return JsonResponse({'users': users})
    except ldap.LDAPError as e:
        return JsonResponse({'error': str(e)}, status=500)

Why this is vulnerable:

  • Wildcard search accepts any input
  • Attackers can inject filter operators
  • Information disclosure via error messages
  • No character filtering

Flask LDAP Authentication

# VULNERABLE - Flask LDAP login without escaping
from flask import Flask, request, jsonify
import ldap

app = Flask(__name__)

@app.route('/login', methods=['POST'])
def vulnerable_login():
    username = request.json.get('username')
    password = request.json.get('password')

    conn = ldap.initialize('ldap://localhost:389')

    # VULNERABLE - String formatting in filter
    search_filter = f"(&(objectClass=person)(uid={username}))"

    try:
        # Bind as service account
        conn.simple_bind_s('cn=service,dc=example,dc=com', 'service_pass')

        # Search for user
        results = conn.search_s(
            'ou=users,dc=example,dc=com',
            ldap.SCOPE_SUBTREE,
            search_filter
        )

        if not results:
            return jsonify({'error': 'User not found'}), 401

        user_dn = results[0][0]

        # Authenticate user
        user_conn = ldap.initialize('ldap://localhost:389')
        user_conn.simple_bind_s(user_dn, password)

        return jsonify({'status': 'authenticated'})

    except ldap.INVALID_CREDENTIALS:
        return jsonify({'error': 'Invalid credentials'}), 401
    except ldap.LDAPError as e:
        return jsonify({'error': str(e)}), 500

Why this is vulnerable:

  • Username not escaped before filter construction
  • Attack: username = "*)(uid=*))(|(uid=*" exposes all users
  • Error messages leak information
  • No input validation

LDAP Attribute Injection

# VULNERABLE - User-controlled attribute names
import ldap

def vulnerable_attribute_search(username, attribute_name):
    conn = ldap.initialize('ldap://localhost:389')
    conn.simple_bind_s('cn=admin,dc=example,dc=com', 'password')

    # VULNERABLE - User controls search filter AND attributes
    search_filter = f"(uid={username})"

    try:
        results = conn.search_s(
            'ou=users,dc=example,dc=com',
            ldap.SCOPE_SUBTREE,
            search_filter,
            [attribute_name]  # VULNERABLE - User-controlled attribute
        )

        if results:
            return results[0][1].get(attribute_name, [b''])[0].decode('utf-8')
    except ldap.LDAPError:
        return None

Why this is vulnerable:

  • Attacker controls attribute names
  • Can extract sensitive attributes: userPassword, loginShell
  • Information disclosure
  • Privilege escalation via group attributes

DN Injection

# VULNERABLE - User input in Distinguished Name
import ldap

def vulnerable_dn_lookup(org_unit, user_id):
    conn = ldap.initialize('ldap://localhost:389')

    # VULNERABLE - User input in DN construction
    user_dn = f"uid={user_id},ou={org_unit},dc=example,dc=com"

    try:
        conn.simple_bind_s(user_dn, 'some_password')
        return True
    except ldap.INVALID_CREDENTIALS:
        return False

Why this is vulnerable:

  • DN components not validated
  • Attack: org_unit = "users,dc=example,dc=com)(uid=admin"
  • Allows authentication as different user
  • DN escape requirements different from filter escaping

Wildcard Search Injection

# VULNERABLE - Accepting wildcards from users
import ldap

def vulnerable_wildcard_search(search_pattern):
    conn = ldap.initialize('ldap://localhost:389')
    conn.simple_bind_s('cn=admin,dc=example,dc=com', 'password')

    # VULNERABLE - Wildcard search with user input
    search_filter = f"(cn={search_pattern})"

    results = conn.search_s(
        'ou=people,dc=example,dc=com',
        ldap.SCOPE_SUBTREE,
        search_filter
    )

    return [(r[1].get('cn', [b''])[0].decode('utf-8')) for r in results]

# Attack: search_pattern = "*" returns ALL entries
# Attack: search_pattern = "a*)(objectClass=*" performs OR injection

Why this is vulnerable:

  • Wildcards allow enumeration attacks
  • Can extract entire directory
  • Denial of service via resource exhaustion
  • Information leakage

ldap3 Without Escaping

# VULNERABLE - ldap3 library with string concatenation
from ldap3 import Server, Connection, ALL

def vulnerable_ldap3_search(username):
    server = Server('ldap://localhost:389', get_info=ALL)
    conn = Connection(server, 'cn=admin,dc=example,dc=com', 'password')
    conn.bind()

    # VULNERABLE - Even with ldap3, string concatenation is dangerous
    search_filter = f"(uid={username})"

    conn.search(
        'ou=users,dc=example,dc=com',
        search_filter,
        attributes=['cn', 'mail']
    )

    return conn.entries

Why this is vulnerable:

  • ldap3 doesn't automatically escape string concatenation
  • Must use explicit escaping functions
  • False sense of security with modern library
  • Still vulnerable to injection

Error-Based LDAP Injection

# VULNERABLE - Verbose error messages
import ldap
from flask import Flask, request, jsonify

app = Flask(__name__)

@app.route('/check-user', methods=['GET'])
def vulnerable_check_user():
    username = request.args.get('username')

    conn = ldap.initialize('ldap://localhost:389')
    search_filter = f"(uid={username})"

    try:
        results = conn.search_s(
            'ou=users,dc=example,dc=com',
            ldap.SCOPE_SUBTREE,
            search_filter
        )

        return jsonify({
            'exists': len(results) > 0,
            'filter': search_filter  # VULNERABLE - Exposes filter
        })
    except ldap.FILTER_ERROR as e:
        # VULNERABLE - Detailed error messages
        return jsonify({
            'error': 'Invalid filter',
            'details': str(e),
            'filter': search_filter
        }), 400

Why this is vulnerable:

  • Error messages reveal filter structure
  • Attackers can probe for valid injection points
  • Information disclosure
  • Blind injection possible

Secure Patterns

Using ldap3 with escape_filter_chars

# SECURE - ldap3 with proper escaping
from ldap3 import Server, Connection, ALL
from ldap3.utils.conv import escape_filter_chars

def secure_ldap_search(username):
    server = Server('ldap://localhost:389', get_info=ALL)
    conn = Connection(
        server,
        'cn=admin,dc=example,dc=com',
        'password',
        auto_bind=True
    )

    # SECURE - Escape special characters
    safe_username = escape_filter_chars(username)
    search_filter = f"(uid={safe_username})"

    conn.search(
        'ou=users,dc=example,dc=com',
        search_filter,
        attributes=['cn', 'mail', 'telephoneNumber']
    )

    users = [{
        'name': str(entry.cn),
        'email': str(entry.mail),
        'phone': str(entry.telephoneNumber)
    } for entry in conn.entries]

    conn.unbind()
    return users

Why this works:

  • The ldap3 library's escape_filter_chars() function escapes all LDAP filter special characters (*, (, ), \, NUL) by converting them to backslash-hex escape sequences following RFC 4515.
  • This prevents filter injection where attackers manipulate filter logic by injecting operators.
  • For example, without escaping, username = "*)(uid=*))(|(uid=*" creates multiple filter clauses that bypass authentication checks.
  • The ldap3 library is modern, pure Python, and actively maintained, making it the recommended choice for new LDAP applications.
  • Using escape_filter_chars() ensures user input is treated as literal search values, not filter syntax.
  • Combined with proper connection handling (auto_bind=True for automatic binding, unbind() for cleanup), this provides robust protection against LDAP injection attacks.

python-ldap with filter_escape

# SECURE - python-ldap with escaping
import ldap
from ldap.filter import escape_filter_chars

def secure_python_ldap_auth(username, password):
    conn = ldap.initialize('ldap://localhost:389')

    # SECURE - Escape filter characters
    safe_username = escape_filter_chars(username)
    search_filter = f"(&(objectClass=person)(uid={safe_username}))"

    try:
        # Bind as service account
        conn.simple_bind_s('cn=service,dc=example,dc=com', 'service_password')

        # Search for user
        results = conn.search_s(
            'ou=users,dc=example,dc=com',
            ldap.SCOPE_SUBTREE,
            search_filter,
            ['dn']
        )

        if not results:
            return False

        user_dn = results[0][0]

        # Authenticate as user
        user_conn = ldap.initialize('ldap://localhost:389')
        user_conn.simple_bind_s(user_dn, password)
        user_conn.unbind_s()

        conn.unbind_s()
        return True

    except ldap.INVALID_CREDENTIALS:
        return False
    except ldap.LDAPError:
        return False

Why this works:

  • The python-ldap library's escape_filter_chars() function (from ldap.filter module) provides RFC 4515-compliant escaping for LDAP filter values.
  • It converts special characters like * (wildcard), ( and ) (operators), \ (escape character), and NUL into backslash-hex escape sequences, preventing filter manipulation.
  • python-ldap is a mature, C-based library (binding to OpenLDAP's libldap) that's been widely used for decades.
  • While ldap3 is recommended for new projects (pure Python, better API), python-ldap remains common in legacy applications.
  • The escaping function ensures attackers cannot inject filter syntax through user input values.
  • Proper exception handling (except ldap.INVALID_CREDENTIALS for auth failures, except ldap.LDAPError for other errors) prevents information disclosure while logging errors appropriately.

Django with Input Validation

# SECURE - Django with validation and escaping
from django.http import JsonResponse
from django.core.validators import validate_slug
from django.core.exceptions import ValidationError
from ldap3 import Server, Connection
from ldap3.utils.conv import escape_filter_chars
import re

def secure_user_search(request):
    search_term = request.GET.get('name', '')

    # SECURE - Validate input format
    if not re.match(r'^[a-zA-Z0-9\s\-]+$', search_term):
        return JsonResponse({
            'error': 'Invalid search term'
        }, status=400)

    # SECURE - Additional length check
    if len(search_term) > 50:
        return JsonResponse({
            'error': 'Search term too long'
        }, status=400)

    # SECURE - Escape LDAP special characters
    safe_search = escape_filter_chars(search_term)
    search_filter = f"(cn=*{safe_search}*)"

    server = Server('ldap://ldap.example.com:389')
    conn = Connection(
        server,
        'cn=readonly,dc=example,dc=com',
        'readonly_password',
        auto_bind=True
    )

    try:
        conn.search(
            'ou=people,dc=example,dc=com',
            search_filter,
            attributes=['cn', 'mail']
        )

        users = [{
            'name': str(entry.cn),
            'email': str(entry.mail)
        } for entry in conn.entries[:100]]  # SECURE - Limit results

        return JsonResponse({'users': users})

    except Exception:
        # SECURE - Generic error message
        return JsonResponse({
            'error': 'Search failed'
        }, status=500)
    finally:
        conn.unbind()

Why this works:

  • Combining Django's validation framework with ldap3's escape_filter_chars() provides defense-in-depth against LDAP injection.
  • The regex allowlist (r'^[a-zA-Z0-9\s\-]+$') and length validation (len(search_term) > 50) reject obviously malicious input early, preventing it from reaching LDAP operations.
  • This catches injection attempts with special characters or overly long payloads that could cause DoS.
  • However, validation alone isn't sufficient - you must still escape because legitimate search terms might contain characters that need escaping in some contexts.
  • The result limit (conn.entries[:100]) prevents result flooding attacks.
  • Generic error messages ('Search failed') prevent information disclosure about directory structure or existence of entries.
  • Using a read-only LDAP account minimizes damage if compromised.
  • The finally block ensures connection cleanup even if exceptions occur.

Flask with Parameterized Filters

# SECURE - Flask with comprehensive security
from flask import Flask, request, jsonify
from ldap3 import Server, Connection, ALL
from ldap3.utils.conv import escape_filter_chars
import re

app = Flask(__name__)

# SECURE - Configuration
LDAP_SERVER = 'ldap://localhost:389'
LDAP_BASE_DN = 'ou=users,dc=example,dc=com'
LDAP_BIND_DN = 'cn=service,dc=example,dc=com'
LDAP_BIND_PASSWORD = 'service_password'

# SECURE - Allowlist of allowed attributes
ALLOWED_ATTRIBUTES = {'cn', 'mail', 'telephoneNumber', 'title'}

def validate_username(username):
    """Validate username format"""
    if not username or len(username) > 64:
        return False
    # SECURE - Alphanumeric, dot, underscore, hyphen only
    return bool(re.match(r'^[a-zA-Z0-9._-]+$', username))

@app.route('/login', methods=['POST'])
def secure_login():
    username = request.json.get('username', '')
    password = request.json.get('password', '')

    # SECURE - Validate input format
    if not validate_username(username):
        return jsonify({'error': 'Invalid username format'}), 400

    if not password or len(password) > 128:
        return jsonify({'error': 'Invalid password'}), 400

    server = Server(LDAP_SERVER, get_info=ALL)

    # SECURE - Service account for search
    search_conn = Connection(
        server,
        LDAP_BIND_DN,
        LDAP_BIND_PASSWORD,
        auto_bind=True
    )

    try:
        # SECURE - Escape username for filter
        safe_username = escape_filter_chars(username)
        search_filter = f"(&(objectClass=person)(uid={safe_username}))"

        # Search for user
        search_conn.search(
            LDAP_BASE_DN,
            search_filter,
            attributes=['dn']
        )

        if not search_conn.entries:
            return jsonify({'error': 'Authentication failed'}), 401

        user_dn = str(search_conn.entries[0].entry_dn)
        search_conn.unbind()

        # SECURE - Authenticate as the user
        user_conn = Connection(server, user_dn, password)

        if not user_conn.bind():
            return jsonify({'error': 'Authentication failed'}), 401

        user_conn.unbind()

        return jsonify({
            'status': 'authenticated',
            'username': username
        })

    except Exception as e:
        app.logger.error(f'LDAP error: {e}')
        # SECURE - Generic error message
        return jsonify({'error': 'Authentication failed'}), 401

@app.route('/search', methods=['GET'])
def secure_search():
    query = request.args.get('q', '')

    # SECURE - Strict validation
    if not query or len(query) > 50:
        return jsonify({'error': 'Invalid query'}), 400

    # SECURE - Alphanumeric and spaces only
    if not re.match(r'^[a-zA-Z0-9\s]+$', query):
        return jsonify({'error': 'Invalid characters in query'}), 400

    server = Server(LDAP_SERVER)
    conn = Connection(
        server,
        LDAP_BIND_DN,
        LDAP_BIND_PASSWORD,
        auto_bind=True
    )

    try:
        # SECURE - Escape and limit wildcards
        safe_query = escape_filter_chars(query)
        search_filter = f"(cn=*{safe_query}*)"

        conn.search(
            LDAP_BASE_DN,
            search_filter,
            attributes=list(ALLOWED_ATTRIBUTES),
            size_limit=100  # SECURE - Limit results
        )

        results = [{
            'name': str(entry.cn),
            'email': str(entry.mail) if hasattr(entry, 'mail') else None
        } for entry in conn.entries]

        return jsonify({'results': results})

    except Exception as e:
        app.logger.error(f'Search error: {e}')
        return jsonify({'error': 'Search failed'}), 500
    finally:
        conn.unbind()

if __name__ == '__main__':
    app.run(debug=False)

Why this works:

  • Flask combined with ldap3 provides comprehensive LDAP injection protection through multiple defensive layers.
  • The validate_username() function uses regex allowlists to restrict input format (alphanumeric, dot, underscore, hyphen only), rejecting injection attempts early.
  • Configuration constants (ALLOWED_ATTRIBUTES allowlist, LDAP_* settings) centralize security policies.
  • Using a service account for searches (LDAP_BIND_DN) rather than admin credentials limits exposure.
  • The two-step authentication (search for user, then bind as that user) prevents username enumeration - failed searches and failed binds both return generic 'Authentication failed' messages.
  • Escape_filter_chars() ensures special characters are neutralized.
  • Size limits (size_limit=100) prevent DoS via result flooding.
  • The attribute allowlist prevents attackers from requesting sensitive LDAP attributes they shouldn't access.
  • Proper logging (app.logger.error) aids debugging without exposing details to users.

DN Escaping

# SECURE - Escaping for Distinguished Names
from ldap3.utils.dn import escape_rdn

def secure_dn_construction(org_unit, user_id):
    # SECURE - Validate components
    if not re.match(r'^[a-zA-Z0-9]+$', org_unit):
        raise ValueError('Invalid org unit')

    if not re.match(r'^[a-zA-Z0-9._-]+$', user_id):
        raise ValueError('Invalid user ID')

    # SECURE - Escape DN components
    safe_user_id = escape_rdn(user_id)
    safe_org_unit = escape_rdn(org_unit)

    # Construct DN
    user_dn = f"uid={safe_user_id},ou={safe_org_unit},dc=example,dc=com"

    return user_dn

Why this works:

  • Distinguished Name escaping requires different handling than filter escaping because DNs have their own syntax with special characters like ,, +, ", \, <, >, ;, and = that separate or structure DN components.
  • The escape_rdn() function from ldap3's dn module escapes these DN-specific characters following RFC 4514, treating them as literal values in Relative Distinguished Names (RDNs).
  • For example, if a user ID contains a comma, escape_rdn() converts it to \, so it's treated as part of the value, not a DN separator.
  • However, this pattern should be used sparingly - constructing DNs from user input is riskier than using filter-based searches.
  • The validation before escaping (re.match() allowlists) provides defense-in-depth by rejecting obviously malicious input.
  • Best practice: search by attribute with escaped filters and use the DN returned by the directory, which is fully trusted.

Connection Pooling with Security

# SECURE - Connection pool with security
from ldap3 import Server, Connection, Tls, SAFE_SYNC
from ldap3.utils.conv import escape_filter_chars
import ssl

class SecureLDAPClient:
    def __init__(self):
        # SECURE - TLS configuration
        tls = Tls(
            validate=ssl.CERT_REQUIRED,
            version=ssl.PROTOCOL_TLS_CLIENT,  # Older Python: ssl.PROTOCOL_TLSv1_2
            ca_certs_file='/path/to/ca-bundle.crt'
        )

        self.server = Server(
            'ldaps://ldap.example.com:636',
            use_ssl=True,
            tls=tls,
            get_info=ALL
        )

    def authenticate(self, username, password):
        """Securely authenticate user"""
        # SECURE - Validate inputs
        if not self._validate_username(username):
            return False

        if not password or len(password) > 128:
            return False

        # Service connection
        conn = Connection(
            self.server,
            'cn=service,dc=example,dc=com',
            'service_password',
            client_strategy=SAFE_SYNC,
            auto_bind=True
        )

        try:
            # SECURE - Escaped filter
            safe_username = escape_filter_chars(username)
            search_filter = f"(uid={safe_username})"

            conn.search(
                'ou=users,dc=example,dc=com',
                search_filter,
                attributes=['dn']
            )

            if not conn.entries:
                return False

            user_dn = str(conn.entries[0].entry_dn)
            conn.unbind()

            # User authentication
            user_conn = Connection(
                self.server,
                user_dn,
                password,
                client_strategy=SAFE_SYNC
            )

            result = user_conn.bind()
            user_conn.unbind()

            return result

        except Exception:
            return False

    def _validate_username(self, username):
        """Validate username format"""
        if not username or len(username) > 64:
            return False
        return bool(re.match(r'^[a-zA-Z0-9._-]+$', username))

Why this works:

  • This pattern demonstrates enterprise-grade LDAP security with TLS/SSL encryption and certificate validation.
  • The Tls configuration enforces CERT_REQUIRED validation, ensuring the LDAP server's identity is verified using a trusted CA bundle, preventing man-in-the-middle attacks.
  • Using ldaps:// with port 636 and use_ssl=True encrypts all LDAP traffic, protecting credentials and directory data in transit.
  • The SAFE_SYNC client strategy ensures thread-safe operations.
  • The class-based design encapsulates security logic: private _validate_username() enforces input rules, while the authenticate() method combines validation, escaping, and two-step authentication (service account search, then user bind).
  • Exception handling is defensive - returning False rather than exposing exception details prevents information disclosure.
  • This pattern is ideal for production environments requiring encrypted LDAP connections with proper certificate validation.

Verification

After implementing the recommended secure patterns, verify the fix through multiple approaches:

  • Manual testing: Submit malicious payloads relevant to this vulnerability and confirm they're handled safely without executing unintended operations
  • Code review: Confirm all instances use the secure pattern (parameterized queries, safe APIs, proper encoding) with no string concatenation or unsafe operations
  • Static analysis: Use security scanners to verify no new vulnerabilities exist and the original finding is resolved
  • Regression testing: Ensure legitimate user inputs and application workflows continue to function correctly
  • Edge case validation: Test with special characters, boundary conditions, and unusual inputs to verify proper handling
  • Framework verification: If using a framework or library, confirm the recommended APIs are used correctly according to documentation
  • Authentication/session testing: Verify security controls remain effective and cannot be bypassed (if applicable to the vulnerability type)
  • Rescan: Run the security scanner again to confirm the finding is resolved and no new issues were introduced

Additional Resources