CWE-522: Insufficiently Protected Credentials - Python
Overview
Insufficiently Protected Credentials in Python applications occurs when passwords, API keys, tokens, or other authentication secrets are stored in plaintext, weakly encrypted, hardcoded in source code, or transmitted insecurely. Python's extensive ecosystem includes powerful cryptographic libraries like bcrypt, Argon2, and cryptography, but developers must actively choose and configure these tools correctly.
Primary Defence: Use bcrypt or argon2-cffi for password hashing with appropriate cost factors, and never store passwords in plaintext.
Common Vulnerable Patterns
Storing Passwords in Plaintext
# VULNERABLE - Plaintext password storage
from flask import Flask, request
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)
db = SQLAlchemy(app)
class User(db.Model):
username = db.Column(db.String(80), unique=True, nullable=False)
password = db.Column(db.String(120), nullable=False) # Plaintext!
@app.route('/register', methods=['POST'])
def register():
username = request.json['username']
password = request.json['password']
# Stores password directly without hashing!
user = User(username=username, password=password)
db.session.add(user)
db.session.commit()
return {'status': 'success'}
@app.route('/login', methods=['POST'])
def login():
username = request.json['username']
password = request.json['password']
user = User.query.filter_by(username=username).first()
# Direct password comparison!
if user and user.password == password:
return {'token': generate_token(user)}
return {'error': 'Invalid credentials'}, 401
Why this is vulnerable:
- Database access reveals passwords immediately.
- Password reuse turns one breach into many.
Hardcoded Credentials
# VULNERABLE - Credentials in source code
import psycopg2
# Hardcoded database credentials!
DB_HOST = "prod-database.company.com"
DB_USER = "admin"
DB_PASSWORD = "SuperSecret123!" # NEVER DO THIS!
DB_NAME = "production"
def get_db_connection():
return psycopg2.connect(
host=DB_HOST,
user=DB_USER,
password=DB_PASSWORD,
database=DB_NAME
)
# Hardcoded API keys!
API_KEY = "sk-live-abc123def456ghi789"
SECRET_KEY = "my-secret-key-12345"
app = Flask(__name__)
app.config['SECRET_KEY'] = SECRET_KEY # Hardcoded!
Why this is vulnerable:
- Secrets in source or repo history are easily leaked.
- Rotation requires code changes and redeploys.
Weak Password Hashing
# VULNERABLE - Using weak hash functions
import hashlib
@app.route('/register', methods=['POST'])
def register():
username = request.json['username']
password = request.json['password']
# MD5 and SHA1 are broken for password hashing!
password_hash = hashlib.md5(password.encode()).hexdigest()
# or password_hash = hashlib.sha1(password.encode()).hexdigest()
user = User(username=username, password_hash=password_hash)
db.session.add(user)
db.session.commit()
return {'status': 'success'}
Why this is vulnerable:
- Fast hashes make brute-force practical.
- No salt or work factor enables rainbow tables.
Credentials in Environment Variables Without Protection
# VULNERABLE - .env file committed to git
# .env (checked into version control!)
DATABASE_URL=postgresql://admin:password123@localhost/mydb
API_KEY=sk-live-abc123def456
SECRET_KEY=django-insecure-development-key
AWS_ACCESS_KEY_ID=AKIAIOSFODNN7EXAMPLE
AWS_SECRET_ACCESS_KEY=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
# settings.py
import os
from dotenv import load_dotenv
load_dotenv() # Loads from .env
# If .env is in git, credentials are exposed!
DATABASE_URL = os.getenv('DATABASE_URL')
API_KEY = os.getenv('API_KEY')
Why this is vulnerable:
- Secrets persist in git history and forks.
- Public repo exposure is common and hard to fully undo.
Insecure JWT Token Generation
# VULNERABLE - Weak JWT implementation
import jwt
from datetime import datetime, timedelta
# Weak secret key!
JWT_SECRET = "secret" # Too simple!
JWT_ALGORITHM = "HS256"
def create_token(user_id):
payload = {
'user_id': user_id,
'password': user.password_hash, # Including password hash in token!
'exp': datetime.utcnow() + timedelta(days=365) # Too long-lived!
}
# Weak secret makes tokens easy to forge
return jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)
Transmitting Credentials Over HTTP
# VULNERABLE - HTTP transmission
@app.route('/api/authenticate', methods=['POST'])
def authenticate():
# Credentials sent over HTTP (not HTTPS)!
username = request.json['username']
password = request.json['password']
# Even if hashed after receipt, transmission was insecure
if verify_credentials(username, password):
return {'token': generate_token()}
return {'error': 'Invalid'}, 401
# No HTTPS enforcement
if __name__ == '__main__':
app.run(debug=True) # HTTP only!
Secure Patterns
Bcrypt Password Hashing
# SECURE - Proper bcrypt implementation
from flask import Flask, request, jsonify
from flask_sqlalchemy import SQLAlchemy
import bcrypt
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///users.db'
db = SQLAlchemy(app)
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
password_hash = db.Column(db.String(128), nullable=False)
@app.route('/register', methods=['POST'])
def register():
username = request.json['username']
password = request.json['password']
# Generate salt and hash password with bcrypt
salt = bcrypt.gensalt(rounds=12) # 12 rounds is recommended minimum
password_hash = bcrypt.hashpw(password.encode('utf-8'), salt)
user = User(
username=username,
password_hash=password_hash.decode('utf-8') # Store as string
)
db.session.add(user)
db.session.commit()
return jsonify({'status': 'success', 'user_id': user.id})
@app.route('/login', methods=['POST'])
def login():
username = request.json['username']
password = request.json['password']
user = User.query.filter_by(username=username).first()
if not user:
# Generic error - don't reveal if user exists
return jsonify({'error': 'Invalid credentials'}), 401
# Verify password using bcrypt
if bcrypt.checkpw(password.encode('utf-8'), user.password_hash.encode('utf-8')):
token = generate_secure_token(user.id)
return jsonify({'token': token})
return jsonify({'error': 'Invalid credentials'}), 401
Why this works:
- Bcrypt uses per-password salts and a tunable cost to make offline cracking expensive.
checkpw()verifies safely and stored hashes are in standard formats.
Django Password Hashing
# SECURE - Django built-in password hashing
# settings.py
PASSWORD_HASHERS = [
'django.contrib.auth.hashers.Argon2PasswordHasher', # Recommended
'django.contrib.auth.hashers.PBKDF2PasswordHasher', # Fallback
'django.contrib.auth.hashers.PBKDF2SHA1PasswordHasher',
'django.contrib.auth.hashers.BCryptSHA256PasswordHasher',
]
# Install argon2: pip install django[argon2]
# views.py
from django.contrib.auth.models import User
from django.contrib.auth import authenticate, login
from django.contrib.auth.hashers import make_password, check_password
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
import json
@csrf_exempt
def register(request):
if request.method == 'POST':
data = json.loads(request.body)
username = data.get('username')
password = data.get('password')
# Django automatically uses configured hasher
user = User.objects.create_user(
username=username,
password=password # Automatically hashed!
)
return JsonResponse({'status': 'success', 'user_id': user.id})
@csrf_exempt
def user_login(request):
if request.method == 'POST':
data = json.loads(request.body)
username = data.get('username')
password = data.get('password')
# Authenticate handles secure password checking
user = authenticate(username=username, password=password)
if user is not None:
login(request, user)
return JsonResponse({'status': 'success'})
return JsonResponse({'error': 'Invalid credentials'}, status=401)
Why this works:
- Django hashes passwords automatically with modern hashers and supports upgrades.
authenticate()verifies against configured hashers without custom logic.
Secrets Management with HashiCorp Vault
# SECURE - HashiCorp Vault integration
import hvac
import os
class VaultSecrets:
def __init__(self):
# Vault address and token from environment (not in code)
vault_url = os.getenv('VAULT_ADDR', 'http://localhost:8200')
vault_token = os.getenv('VAULT_TOKEN')
self.client = hvac.Client(url=vault_url, token=vault_token)
if not self.client.is_authenticated():
raise Exception('Failed to authenticate with Vault')
def get_secret(self, path):
"""Retrieve secret from Vault"""
try:
secret = self.client.secrets.kv.v2.read_secret_version(path=path)
return secret['data']['data']
except Exception as e:
# Log error but don't expose vault details
print(f'Failed to retrieve secret: {path}')
raise
def get_database_credentials(self):
"""Get database credentials from Vault"""
creds = self.get_secret('database/postgresql')
return {
'host': creds['host'],
'port': creds['port'],
'user': creds['username'],
'password': creds['password'],
'database': creds['database']
}
# Usage in application
vault = VaultSecrets()
db_creds = vault.get_database_credentials()
# Use credentials without hardcoding
import psycopg2
conn = psycopg2.connect(
host=db_creds['host'],
port=db_creds['port'],
user=db_creds['user'],
password=db_creds['password'],
database=db_creds['database']
)
Why this works:
- Secrets are stored centrally and fetched at runtime, encrypted at rest.
- Policies and short-lived tokens enable least privilege and rotation.
AWS Secrets Manager Integration
# SECURE - AWS Secrets Manager
import boto3
import json
from botocore.exceptions import ClientError
class AWSSecretsManager:
def __init__(self, region_name='us-east-1'):
self.client = boto3.client('secretsmanager', region_name=region_name)
def get_secret(self, secret_name):
"""Retrieve secret from AWS Secrets Manager"""
try:
response = self.client.get_secret_value(SecretId=secret_name)
if 'SecretString' in response:
return json.loads(response['SecretString'])
else:
# Binary secrets
return response['SecretBinary']
except ClientError as e:
if e.response['Error']['Code'] == 'ResourceNotFoundException':
print(f'Secret {secret_name} not found')
elif e.response['Error']['Code'] == 'InvalidRequestException':
print(f'Invalid request for secret {secret_name}')
elif e.response['Error']['Code'] == 'InvalidParameterException':
print(f'Invalid parameter for secret {secret_name}')
raise
# Usage
secrets = AWSSecretsManager(region_name='us-east-1')
# Get database credentials
db_secret = secrets.get_secret('production/database')
DATABASE_URL = f"postgresql://{db_secret['username']}:{db_secret['password']}@{db_secret['host']}/{db_secret['database']}"
# Get API keys
api_secret = secrets.get_secret('production/api-keys')
STRIPE_API_KEY = api_secret['stripe_key']
SENDGRID_API_KEY = api_secret['sendgrid_key']
Why this works:
- Secrets are encrypted with KMS and accessed at runtime via IAM roles.
- Rotation and CloudTrail logging reduce exposure and support auditing.
Secure JWT Implementation
# SECURE - Proper JWT token handling
from datetime import datetime, timedelta
import jwt
import secrets
# Generate strong secret key
def generate_secret_key():
"""Generate cryptographically secure secret key"""
return secrets.token_urlsafe(64)
# Store in secrets manager, not in code
JWT_SECRET = vault.get_secret('jwt/secret-key')['key']
JWT_ALGORITHM = 'HS256'
JWT_EXPIRATION_HOURS = 24 # Short-lived tokens
def create_access_token(user_id: int, expires_delta: timedelta = None):
"""Create secure JWT token"""
if expires_delta is None:
expires_delta = timedelta(hours=JWT_EXPIRATION_HOURS)
expire = datetime.utcnow() + expires_delta
# Minimal claims - don't include sensitive data
payload = {
'user_id': user_id,
'exp': expire,
'iat': datetime.utcnow(), # Issued at
'jti': secrets.token_urlsafe(16) # JWT ID for revocation
}
return jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)
def verify_token(token: str):
"""Verify and decode JWT token"""
try:
payload = jwt.decode(
token,
JWT_SECRET,
algorithms=[JWT_ALGORITHM],
options={'verify_exp': True} # Verify expiration
)
return payload
except jwt.ExpiredSignatureError:
raise ValueError('Token has expired')
except jwt.InvalidTokenError:
raise ValueError('Invalid token')
# Refresh token implementation
def create_refresh_token(user_id: int):
"""Create long-lived refresh token"""
expires_delta = timedelta(days=30)
expire = datetime.utcnow() + expires_delta
payload = {
'user_id': user_id,
'exp': expire,
'type': 'refresh'
}
return jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)
Why this works:
- Strong secrets and short-lived access tokens reduce blast radius.
- Validation checks and separate refresh tokens tighten control.
HTTPS Enforcement
# SECURE - Force HTTPS in Flask
from flask import Flask, request, redirect
from flask_talisman import Talisman
app = Flask(__name__)
# Enforce HTTPS with Flask-Talisman
Talisman(app,
force_https=True,
strict_transport_security=True,
strict_transport_security_max_age=31536000, # 1 year
content_security_policy=None)
@app.before_request
def enforce_https():
"""Additional HTTPS enforcement"""
if not request.is_secure and not app.debug:
url = request.url.replace('http://', 'https://', 1)
return redirect(url, code=301)
# For production deployment
if __name__ == '__main__':
# Use HTTPS in production
context = ('/path/to/cert.pem', '/path/to/key.pem')
app.run(ssl_context=context, host='0.0.0.0', port=443)
Why this works:
- TLS encrypts credentials in transit.
- HSTS and redirects ensure HTTP is not accepted in production.
FastAPI with OAuth2 and Password Hashing
# SECURE - FastAPI OAuth2 implementation
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from passlib.context import CryptContext
from pydantic import BaseModel
from datetime import datetime, timedelta
import jwt
app = FastAPI()
# Password hashing context
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# OAuth2 scheme
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# Secret from secure storage
SECRET_KEY = vault.get_secret('api/secret-key')['key']
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
class User(BaseModel):
username: str
email: str
password_hash: str
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify password against hash"""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
"""Hash password using bcrypt"""
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: timedelta = None):
"""Create JWT access token"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
@app.post("/register")
async def register(username: str, password: str, email: str):
"""Register new user with hashed password"""
password_hash = get_password_hash(password)
user = User(
username=username,
email=email,
password_hash=password_hash
)
# Save to database
# db.add(user)
return {"status": "success", "username": username}
@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
"""Login and get access token"""
# Get user from database
user = get_user(form_data.username)
if not user or not verify_password(form_data.password, user.password_hash):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username},
expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
async def get_current_user(token: str = Depends(oauth2_scheme)):
"""Get current user from token"""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise HTTPException(status_code=401, detail="Invalid token")
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Token expired")
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail="Invalid token")
user = get_user(username)
if user is None:
raise HTTPException(status_code=401, detail="User not found")
return user
@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_user)):
"""Protected endpoint requiring authentication"""
return current_user
Why this works:
- OAuth2 flow with Passlib bcrypt provides consistent hashing and token handling.
- Dependency injection centralizes auth checks for protected routes.
Verification
To verify credentials are properly protected:
- Check password storage: Verify passwords in the database are hashed (bcrypt hashes start with
$2b$, Argon2 with$argon2), not plaintext - Confirm salt usage: Ensure identical passwords for different users generate different hashes
- Review configuration: Check that settings files use environment variables or secret managers, not hardcoded credentials
- Test JWT tokens: Verify tokens expire and include proper claims (expiration, issuer, audience)
- Search source code: Grep codebase for potential hardcoded secrets, API keys, or passwords
- Test authentication flow: Verify login succeeds with correct credentials and fails with incorrect ones
- Check
.envfiles: Ensure.envis in.gitignoreand never committed to version control - Use security scanners: Run tools like
banditto detect hardcoded credentials and weak cryptographic usage
# SECURE - Tests to verify credential protection
import pytest
import bcrypt
from app import app, db, User
class TestCredentialSecurity:
@pytest.fixture
def client(self):
app.config['TESTING'] = True
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///:memory:'
with app.test_client() as client:
with app.app_context():
db.create_all()
yield client
def test_passwords_are_hashed(self, client):
"""Verify passwords are not stored in plaintext"""
response = client.post('/register', json={
'username': 'testuser',
'password': 'MyPassword123!'
})
assert response.status_code == 200
# Check database directly
user = User.query.filter_by(username='testuser').first()
# Password should not be stored in plaintext
assert user.password_hash != 'MyPassword123!'
# Password hash should look like bcrypt format
assert user.password_hash.startswith('$2b$')
# Verify password can be checked
assert bcrypt.checkpw(
'MyPassword123!'.encode('utf-8'),
user.password_hash.encode('utf-8')
)
def test_password_hash_is_unique(self, client):
"""Verify same password produces different hashes (salted)"""
password = 'SamePassword123!'
client.post('/register', json={
'username': 'user1',
'password': password
})
client.post('/register', json={
'username': 'user2',
'password': password
})
user1 = User.query.filter_by(username='user1').first()
user2 = User.query.filter_by(username='user2').first()
# Same password should produce different hashes due to salt
assert user1.password_hash != user2.password_hash
def test_weak_passwords_rejected(self, client):
"""Verify password strength requirements"""
weak_passwords = [
'12345',
'password',
'abc',
'11111111'
]
for weak_pass in weak_passwords:
response = client.post('/register', json={
'username': f'user_{weak_pass}',
'password': weak_pass
})
# Should reject weak passwords
assert response.status_code == 400
data = response.get_json()
assert 'password' in data['error'].lower()
def test_no_hardcoded_secrets(self):
"""Verify no hardcoded credentials in code"""
import app as app_module
import inspect
source = inspect.getsource(app_module)
# Check for common hardcoded patterns
forbidden_patterns = [
'password =',
'secret_key =',
'api_key =',
'AWS_SECRET',
'DB_PASSWORD'
]
for pattern in forbidden_patterns:
# Allow in comments or variable names, but not assignments
lines = [line for line in source.split('\n')
if pattern.lower() in line.lower() and '=' in line
and not line.strip().startswith('#')]
assert len(lines) == 0, f'Found potential hardcoded secret: {pattern}'
def test_jwt_tokens_expire(self):
"""Verify JWT tokens have expiration"""
import jwt
from datetime import datetime
user_id = 123
token = create_access_token(user_id)
# Decode without verification to check claims
unverified = jwt.decode(token, options={"verify_signature": False})
# Must have expiration claim
assert 'exp' in unverified
# Expiration should be in the future
exp_time = datetime.fromtimestamp(unverified['exp'])
assert exp_time > datetime.utcnow()
# But not too far in the future (< 30 days)
max_exp = datetime.utcnow() + timedelta(days=30)
assert exp_time < max_exp