CWE-90: LDAP Injection - Python
Overview
LDAP Injection in Python applications occurs when untrusted user input is used to construct LDAP queries without proper validation or escaping. LDAP (Lightweight Directory Access Protocol) is commonly used for authentication, authorization, and directory services. Attackers can manipulate LDAP queries to bypass authentication, escalate privileges, extract sensitive directory data, or cause denial of service.
Primary Defence: Use safe filter construction with ldap3, escape LDAP special characters (*, (, ), \, NUL) using ldap3.utils.conv.escape_filter_chars() before including user input, validate all input against strict allowlists, never allow wildcards in authentication queries, use bind authentication instead of search-based authentication, and implement proper error handling to prevent information disclosure through LDAP injection attacks.
Common Python LDAP Vulnerability Scenarios:
- Building LDAP search filters with string concatenation
- Using user input directly in DN (Distinguished Names)
- Accepting wildcards in authentication queries
- Insufficient validation of special LDAP characters
- Blind LDAP injection via error messages
Python LDAP Libraries:
- ldap3: Modern, pure Python LDAP client (recommended)
- python-ldap: Legacy LDAP library (C-based, requires escape functions)
- django-auth-ldap: Django LDAP authentication backend
- flask-ldap3-login: Flask LDAP authentication
LDAP Special Characters:
RFC 4515 requires escaping: *, (, ), \, NUL (some codebases also escape / as defense in depth)
Common Vulnerable Patterns
String Concatenation in LDAP Filter
# VULNERABLE - Direct string concatenation
import ldap
def vulnerable_ldap_auth(username, password):
conn = ldap.initialize('ldap://localhost:389')
# VULNERABLE - User input directly in search filter
search_filter = f"(uid={username})"
try:
# Search for user
result = conn.search_s(
'ou=users,dc=example,dc=com',
ldap.SCOPE_SUBTREE,
search_filter
)
if result:
user_dn = result[0][0]
# Try to bind with credentials
conn.simple_bind_s(user_dn, password)
return True
except ldap.INVALID_CREDENTIALS:
return False
return False
# Attack: username = "admin)(&" bypasses authentication
# Resulting filter: (uid=admin)(&)
# This matches ANY user with uid=admin, ignoring password check
Why this is vulnerable:
- No input validation or escaping
- Special characters
)(manipulate filter logic - Attackers can craft filters to bypass authentication
- Wildcard attacks possible with
*
Django LDAP Without Sanitization
# VULNERABLE - Django view with LDAP search
from django.http import JsonResponse
import ldap
def vulnerable_user_search(request):
search_term = request.GET.get('name', '')
# VULNERABLE - Unsanitized input in LDAP filter
search_filter = f"(cn=*{search_term}*)"
conn = ldap.initialize('ldap://ldap.example.com:389')
conn.simple_bind_s('cn=admin,dc=example,dc=com', 'admin_password')
try:
results = conn.search_s(
'ou=people,dc=example,dc=com',
ldap.SCOPE_SUBTREE,
search_filter,
['cn', 'mail', 'telephoneNumber']
)
users = [{
'name': r[1].get('cn', [b''])[0].decode('utf-8'),
'email': r[1].get('mail', [b''])[0].decode('utf-8')
} for r in results]
return JsonResponse({'users': users})
except ldap.LDAPError as e:
return JsonResponse({'error': str(e)}, status=500)
Why this is vulnerable:
- Wildcard search accepts any input
- Attackers can inject filter operators
- Information disclosure via error messages
- No character filtering
Flask LDAP Authentication
# VULNERABLE - Flask LDAP login without escaping
from flask import Flask, request, jsonify
import ldap
app = Flask(__name__)
@app.route('/login', methods=['POST'])
def vulnerable_login():
username = request.json.get('username')
password = request.json.get('password')
conn = ldap.initialize('ldap://localhost:389')
# VULNERABLE - String formatting in filter
search_filter = f"(&(objectClass=person)(uid={username}))"
try:
# Bind as service account
conn.simple_bind_s('cn=service,dc=example,dc=com', 'service_pass')
# Search for user
results = conn.search_s(
'ou=users,dc=example,dc=com',
ldap.SCOPE_SUBTREE,
search_filter
)
if not results:
return jsonify({'error': 'User not found'}), 401
user_dn = results[0][0]
# Authenticate user
user_conn = ldap.initialize('ldap://localhost:389')
user_conn.simple_bind_s(user_dn, password)
return jsonify({'status': 'authenticated'})
except ldap.INVALID_CREDENTIALS:
return jsonify({'error': 'Invalid credentials'}), 401
except ldap.LDAPError as e:
return jsonify({'error': str(e)}), 500
Why this is vulnerable:
- Username not escaped before filter construction
- Attack:
username = "*)(uid=*))(|(uid=*"exposes all users - Error messages leak information
- No input validation
LDAP Attribute Injection
# VULNERABLE - User-controlled attribute names
import ldap
def vulnerable_attribute_search(username, attribute_name):
conn = ldap.initialize('ldap://localhost:389')
conn.simple_bind_s('cn=admin,dc=example,dc=com', 'password')
# VULNERABLE - User controls search filter AND attributes
search_filter = f"(uid={username})"
try:
results = conn.search_s(
'ou=users,dc=example,dc=com',
ldap.SCOPE_SUBTREE,
search_filter,
[attribute_name] # VULNERABLE - User-controlled attribute
)
if results:
return results[0][1].get(attribute_name, [b''])[0].decode('utf-8')
except ldap.LDAPError:
return None
Why this is vulnerable:
- Attacker controls attribute names
- Can extract sensitive attributes:
userPassword,loginShell - Information disclosure
- Privilege escalation via group attributes
DN Injection
# VULNERABLE - User input in Distinguished Name
import ldap
def vulnerable_dn_lookup(org_unit, user_id):
conn = ldap.initialize('ldap://localhost:389')
# VULNERABLE - User input in DN construction
user_dn = f"uid={user_id},ou={org_unit},dc=example,dc=com"
try:
conn.simple_bind_s(user_dn, 'some_password')
return True
except ldap.INVALID_CREDENTIALS:
return False
Why this is vulnerable:
- DN components not validated
- Attack:
org_unit = "users,dc=example,dc=com)(uid=admin" - Allows authentication as different user
- DN escape requirements different from filter escaping
Wildcard Search Injection
# VULNERABLE - Accepting wildcards from users
import ldap
def vulnerable_wildcard_search(search_pattern):
conn = ldap.initialize('ldap://localhost:389')
conn.simple_bind_s('cn=admin,dc=example,dc=com', 'password')
# VULNERABLE - Wildcard search with user input
search_filter = f"(cn={search_pattern})"
results = conn.search_s(
'ou=people,dc=example,dc=com',
ldap.SCOPE_SUBTREE,
search_filter
)
return [(r[1].get('cn', [b''])[0].decode('utf-8')) for r in results]
# Attack: search_pattern = "*" returns ALL entries
# Attack: search_pattern = "a*)(objectClass=*" performs OR injection
Why this is vulnerable:
- Wildcards allow enumeration attacks
- Can extract entire directory
- Denial of service via resource exhaustion
- Information leakage
ldap3 Without Escaping
# VULNERABLE - ldap3 library with string concatenation
from ldap3 import Server, Connection, ALL
def vulnerable_ldap3_search(username):
server = Server('ldap://localhost:389', get_info=ALL)
conn = Connection(server, 'cn=admin,dc=example,dc=com', 'password')
conn.bind()
# VULNERABLE - Even with ldap3, string concatenation is dangerous
search_filter = f"(uid={username})"
conn.search(
'ou=users,dc=example,dc=com',
search_filter,
attributes=['cn', 'mail']
)
return conn.entries
Why this is vulnerable:
- ldap3 doesn't automatically escape string concatenation
- Must use explicit escaping functions
- False sense of security with modern library
- Still vulnerable to injection
Error-Based LDAP Injection
# VULNERABLE - Verbose error messages
import ldap
from flask import Flask, request, jsonify
app = Flask(__name__)
@app.route('/check-user', methods=['GET'])
def vulnerable_check_user():
username = request.args.get('username')
conn = ldap.initialize('ldap://localhost:389')
search_filter = f"(uid={username})"
try:
results = conn.search_s(
'ou=users,dc=example,dc=com',
ldap.SCOPE_SUBTREE,
search_filter
)
return jsonify({
'exists': len(results) > 0,
'filter': search_filter # VULNERABLE - Exposes filter
})
except ldap.FILTER_ERROR as e:
# VULNERABLE - Detailed error messages
return jsonify({
'error': 'Invalid filter',
'details': str(e),
'filter': search_filter
}), 400
Why this is vulnerable:
- Error messages reveal filter structure
- Attackers can probe for valid injection points
- Information disclosure
- Blind injection possible
Secure Patterns
Using ldap3 with escape_filter_chars
# SECURE - ldap3 with proper escaping
from ldap3 import Server, Connection, ALL
from ldap3.utils.conv import escape_filter_chars
def secure_ldap_search(username):
server = Server('ldap://localhost:389', get_info=ALL)
conn = Connection(
server,
'cn=admin,dc=example,dc=com',
'password',
auto_bind=True
)
# SECURE - Escape special characters
safe_username = escape_filter_chars(username)
search_filter = f"(uid={safe_username})"
conn.search(
'ou=users,dc=example,dc=com',
search_filter,
attributes=['cn', 'mail', 'telephoneNumber']
)
users = [{
'name': str(entry.cn),
'email': str(entry.mail),
'phone': str(entry.telephoneNumber)
} for entry in conn.entries]
conn.unbind()
return users
Why this works:
- The ldap3 library's
escape_filter_chars()function escapes all LDAP filter special characters (*,(,),\,NUL) by converting them to backslash-hex escape sequences following RFC 4515. - This prevents filter injection where attackers manipulate filter logic by injecting operators.
- For example, without escaping,
username = "*)(uid=*))(|(uid=*"creates multiple filter clauses that bypass authentication checks. - The ldap3 library is modern, pure Python, and actively maintained, making it the recommended choice for new LDAP applications.
- Using
escape_filter_chars()ensures user input is treated as literal search values, not filter syntax. - Combined with proper connection handling (
auto_bind=Truefor automatic binding,unbind()for cleanup), this provides robust protection against LDAP injection attacks.
python-ldap with filter_escape
# SECURE - python-ldap with escaping
import ldap
from ldap.filter import escape_filter_chars
def secure_python_ldap_auth(username, password):
conn = ldap.initialize('ldap://localhost:389')
# SECURE - Escape filter characters
safe_username = escape_filter_chars(username)
search_filter = f"(&(objectClass=person)(uid={safe_username}))"
try:
# Bind as service account
conn.simple_bind_s('cn=service,dc=example,dc=com', 'service_password')
# Search for user
results = conn.search_s(
'ou=users,dc=example,dc=com',
ldap.SCOPE_SUBTREE,
search_filter,
['dn']
)
if not results:
return False
user_dn = results[0][0]
# Authenticate as user
user_conn = ldap.initialize('ldap://localhost:389')
user_conn.simple_bind_s(user_dn, password)
user_conn.unbind_s()
conn.unbind_s()
return True
except ldap.INVALID_CREDENTIALS:
return False
except ldap.LDAPError:
return False
Why this works:
- The python-ldap library's
escape_filter_chars()function (fromldap.filtermodule) provides RFC 4515-compliant escaping for LDAP filter values. - It converts special characters like
*(wildcard),(and)(operators),\(escape character), andNULinto backslash-hex escape sequences, preventing filter manipulation. - python-ldap is a mature, C-based library (binding to OpenLDAP's libldap) that's been widely used for decades.
- While ldap3 is recommended for new projects (pure Python, better API), python-ldap remains common in legacy applications.
- The escaping function ensures attackers cannot inject filter syntax through user input values.
- Proper exception handling (
except ldap.INVALID_CREDENTIALSfor auth failures,except ldap.LDAPErrorfor other errors) prevents information disclosure while logging errors appropriately.
Django with Input Validation
# SECURE - Django with validation and escaping
from django.http import JsonResponse
from django.core.validators import validate_slug
from django.core.exceptions import ValidationError
from ldap3 import Server, Connection
from ldap3.utils.conv import escape_filter_chars
import re
def secure_user_search(request):
search_term = request.GET.get('name', '')
# SECURE - Validate input format
if not re.match(r'^[a-zA-Z0-9\s\-]+$', search_term):
return JsonResponse({
'error': 'Invalid search term'
}, status=400)
# SECURE - Additional length check
if len(search_term) > 50:
return JsonResponse({
'error': 'Search term too long'
}, status=400)
# SECURE - Escape LDAP special characters
safe_search = escape_filter_chars(search_term)
search_filter = f"(cn=*{safe_search}*)"
server = Server('ldap://ldap.example.com:389')
conn = Connection(
server,
'cn=readonly,dc=example,dc=com',
'readonly_password',
auto_bind=True
)
try:
conn.search(
'ou=people,dc=example,dc=com',
search_filter,
attributes=['cn', 'mail']
)
users = [{
'name': str(entry.cn),
'email': str(entry.mail)
} for entry in conn.entries[:100]] # SECURE - Limit results
return JsonResponse({'users': users})
except Exception:
# SECURE - Generic error message
return JsonResponse({
'error': 'Search failed'
}, status=500)
finally:
conn.unbind()
Why this works:
- Combining Django's validation framework with ldap3's escape_filter_chars() provides defense-in-depth against LDAP injection.
- The regex allowlist (
r'^[a-zA-Z0-9\s\-]+$') and length validation (len(search_term) > 50) reject obviously malicious input early, preventing it from reaching LDAP operations. - This catches injection attempts with special characters or overly long payloads that could cause DoS.
- However, validation alone isn't sufficient - you must still escape because legitimate search terms might contain characters that need escaping in some contexts.
- The result limit (
conn.entries[:100]) prevents result flooding attacks. - Generic error messages (
'Search failed') prevent information disclosure about directory structure or existence of entries. - Using a read-only LDAP account minimizes damage if compromised.
- The
finallyblock ensures connection cleanup even if exceptions occur.
Flask with Parameterized Filters
# SECURE - Flask with comprehensive security
from flask import Flask, request, jsonify
from ldap3 import Server, Connection, ALL
from ldap3.utils.conv import escape_filter_chars
import re
app = Flask(__name__)
# SECURE - Configuration
LDAP_SERVER = 'ldap://localhost:389'
LDAP_BASE_DN = 'ou=users,dc=example,dc=com'
LDAP_BIND_DN = 'cn=service,dc=example,dc=com'
LDAP_BIND_PASSWORD = 'service_password'
# SECURE - Allowlist of allowed attributes
ALLOWED_ATTRIBUTES = {'cn', 'mail', 'telephoneNumber', 'title'}
def validate_username(username):
"""Validate username format"""
if not username or len(username) > 64:
return False
# SECURE - Alphanumeric, dot, underscore, hyphen only
return bool(re.match(r'^[a-zA-Z0-9._-]+$', username))
@app.route('/login', methods=['POST'])
def secure_login():
username = request.json.get('username', '')
password = request.json.get('password', '')
# SECURE - Validate input format
if not validate_username(username):
return jsonify({'error': 'Invalid username format'}), 400
if not password or len(password) > 128:
return jsonify({'error': 'Invalid password'}), 400
server = Server(LDAP_SERVER, get_info=ALL)
# SECURE - Service account for search
search_conn = Connection(
server,
LDAP_BIND_DN,
LDAP_BIND_PASSWORD,
auto_bind=True
)
try:
# SECURE - Escape username for filter
safe_username = escape_filter_chars(username)
search_filter = f"(&(objectClass=person)(uid={safe_username}))"
# Search for user
search_conn.search(
LDAP_BASE_DN,
search_filter,
attributes=['dn']
)
if not search_conn.entries:
return jsonify({'error': 'Authentication failed'}), 401
user_dn = str(search_conn.entries[0].entry_dn)
search_conn.unbind()
# SECURE - Authenticate as the user
user_conn = Connection(server, user_dn, password)
if not user_conn.bind():
return jsonify({'error': 'Authentication failed'}), 401
user_conn.unbind()
return jsonify({
'status': 'authenticated',
'username': username
})
except Exception as e:
app.logger.error(f'LDAP error: {e}')
# SECURE - Generic error message
return jsonify({'error': 'Authentication failed'}), 401
@app.route('/search', methods=['GET'])
def secure_search():
query = request.args.get('q', '')
# SECURE - Strict validation
if not query or len(query) > 50:
return jsonify({'error': 'Invalid query'}), 400
# SECURE - Alphanumeric and spaces only
if not re.match(r'^[a-zA-Z0-9\s]+$', query):
return jsonify({'error': 'Invalid characters in query'}), 400
server = Server(LDAP_SERVER)
conn = Connection(
server,
LDAP_BIND_DN,
LDAP_BIND_PASSWORD,
auto_bind=True
)
try:
# SECURE - Escape and limit wildcards
safe_query = escape_filter_chars(query)
search_filter = f"(cn=*{safe_query}*)"
conn.search(
LDAP_BASE_DN,
search_filter,
attributes=list(ALLOWED_ATTRIBUTES),
size_limit=100 # SECURE - Limit results
)
results = [{
'name': str(entry.cn),
'email': str(entry.mail) if hasattr(entry, 'mail') else None
} for entry in conn.entries]
return jsonify({'results': results})
except Exception as e:
app.logger.error(f'Search error: {e}')
return jsonify({'error': 'Search failed'}), 500
finally:
conn.unbind()
if __name__ == '__main__':
app.run(debug=False)
Why this works:
- Flask combined with ldap3 provides comprehensive LDAP injection protection through multiple defensive layers.
- The
validate_username()function uses regex allowlists to restrict input format (alphanumeric, dot, underscore, hyphen only), rejecting injection attempts early. - Configuration constants (
ALLOWED_ATTRIBUTESallowlist,LDAP_*settings) centralize security policies. - Using a service account for searches (
LDAP_BIND_DN) rather than admin credentials limits exposure. - The two-step authentication (search for user, then bind as that user) prevents username enumeration - failed searches and failed binds both return generic
'Authentication failed'messages. - Escape_filter_chars() ensures special characters are neutralized.
- Size limits (
size_limit=100) prevent DoS via result flooding. - The attribute allowlist prevents attackers from requesting sensitive LDAP attributes they shouldn't access.
- Proper logging (
app.logger.error) aids debugging without exposing details to users.
DN Escaping
# SECURE - Escaping for Distinguished Names
from ldap3.utils.dn import escape_rdn
def secure_dn_construction(org_unit, user_id):
# SECURE - Validate components
if not re.match(r'^[a-zA-Z0-9]+$', org_unit):
raise ValueError('Invalid org unit')
if not re.match(r'^[a-zA-Z0-9._-]+$', user_id):
raise ValueError('Invalid user ID')
# SECURE - Escape DN components
safe_user_id = escape_rdn(user_id)
safe_org_unit = escape_rdn(org_unit)
# Construct DN
user_dn = f"uid={safe_user_id},ou={safe_org_unit},dc=example,dc=com"
return user_dn
Why this works:
- Distinguished Name escaping requires different handling than filter escaping because DNs have their own syntax with special characters like
,,+,",\,<,>,;, and=that separate or structure DN components. - The
escape_rdn()function from ldap3'sdnmodule escapes these DN-specific characters following RFC 4514, treating them as literal values in Relative Distinguished Names (RDNs). - For example, if a user ID contains a comma,
escape_rdn()converts it to\,so it's treated as part of the value, not a DN separator. - However, this pattern should be used sparingly - constructing DNs from user input is riskier than using filter-based searches.
- The validation before escaping (
re.match()allowlists) provides defense-in-depth by rejecting obviously malicious input. - Best practice: search by attribute with escaped filters and use the DN returned by the directory, which is fully trusted.
Connection Pooling with Security
# SECURE - Connection pool with security
from ldap3 import Server, Connection, Tls, SAFE_SYNC
from ldap3.utils.conv import escape_filter_chars
import ssl
class SecureLDAPClient:
def __init__(self):
# SECURE - TLS configuration
tls = Tls(
validate=ssl.CERT_REQUIRED,
version=ssl.PROTOCOL_TLS_CLIENT, # Older Python: ssl.PROTOCOL_TLSv1_2
ca_certs_file='/path/to/ca-bundle.crt'
)
self.server = Server(
'ldaps://ldap.example.com:636',
use_ssl=True,
tls=tls,
get_info=ALL
)
def authenticate(self, username, password):
"""Securely authenticate user"""
# SECURE - Validate inputs
if not self._validate_username(username):
return False
if not password or len(password) > 128:
return False
# Service connection
conn = Connection(
self.server,
'cn=service,dc=example,dc=com',
'service_password',
client_strategy=SAFE_SYNC,
auto_bind=True
)
try:
# SECURE - Escaped filter
safe_username = escape_filter_chars(username)
search_filter = f"(uid={safe_username})"
conn.search(
'ou=users,dc=example,dc=com',
search_filter,
attributes=['dn']
)
if not conn.entries:
return False
user_dn = str(conn.entries[0].entry_dn)
conn.unbind()
# User authentication
user_conn = Connection(
self.server,
user_dn,
password,
client_strategy=SAFE_SYNC
)
result = user_conn.bind()
user_conn.unbind()
return result
except Exception:
return False
def _validate_username(self, username):
"""Validate username format"""
if not username or len(username) > 64:
return False
return bool(re.match(r'^[a-zA-Z0-9._-]+$', username))
Why this works:
- This pattern demonstrates enterprise-grade LDAP security with TLS/SSL encryption and certificate validation.
- The
Tlsconfiguration enforcesCERT_REQUIREDvalidation, ensuring the LDAP server's identity is verified using a trusted CA bundle, preventing man-in-the-middle attacks. - Using
ldaps://with port 636 anduse_ssl=Trueencrypts all LDAP traffic, protecting credentials and directory data in transit. - The
SAFE_SYNCclient strategy ensures thread-safe operations. - The class-based design encapsulates security logic: private
_validate_username()enforces input rules, while theauthenticate()method combines validation, escaping, and two-step authentication (service account search, then user bind). - Exception handling is defensive - returning
Falserather than exposing exception details prevents information disclosure. - This pattern is ideal for production environments requiring encrypted LDAP connections with proper certificate validation.
Verification
After implementing the recommended secure patterns, verify the fix through multiple approaches:
- Manual testing: Submit malicious payloads relevant to this vulnerability and confirm they're handled safely without executing unintended operations
- Code review: Confirm all instances use the secure pattern (parameterized queries, safe APIs, proper encoding) with no string concatenation or unsafe operations
- Static analysis: Use security scanners to verify no new vulnerabilities exist and the original finding is resolved
- Regression testing: Ensure legitimate user inputs and application workflows continue to function correctly
- Edge case validation: Test with special characters, boundary conditions, and unusual inputs to verify proper handling
- Framework verification: If using a framework or library, confirm the recommended APIs are used correctly according to documentation
- Authentication/session testing: Verify security controls remain effective and cannot be bypassed (if applicable to the vulnerability type)
- Rescan: Run the security scanner again to confirm the finding is resolved and no new issues were introduced