Skip to content

CWE-522: Insufficiently Protected Credentials - Python

Overview

Insufficiently Protected Credentials in Python applications occurs when passwords, API keys, tokens, or other authentication secrets are stored in plaintext, weakly encrypted, hardcoded in source code, or transmitted insecurely. Python's extensive ecosystem includes powerful cryptographic libraries like bcrypt, Argon2, and cryptography, but developers must actively choose and configure these tools correctly.

Primary Defence: Use bcrypt or argon2-cffi for password hashing with appropriate cost factors, and never store passwords in plaintext.

Common Vulnerable Patterns

Storing Passwords in Plaintext

# VULNERABLE - Plaintext password storage
from flask import Flask, request
from flask_sqlalchemy import SQLAlchemy

app = Flask(__name__)
db = SQLAlchemy(app)

class User(db.Model):
    username = db.Column(db.String(80), unique=True, nullable=False)
    password = db.Column(db.String(120), nullable=False)  # Plaintext!

@app.route('/register', methods=['POST'])
def register():
    username = request.json['username']
    password = request.json['password']

    # Stores password directly without hashing!
    user = User(username=username, password=password)
    db.session.add(user)
    db.session.commit()

    return {'status': 'success'}

@app.route('/login', methods=['POST'])
def login():
    username = request.json['username']
    password = request.json['password']

    user = User.query.filter_by(username=username).first()
    # Direct password comparison!
    if user and user.password == password:
        return {'token': generate_token(user)}
    return {'error': 'Invalid credentials'}, 401

Why this is vulnerable:

  • Database access reveals passwords immediately.
  • Password reuse turns one breach into many.

Hardcoded Credentials

# VULNERABLE - Credentials in source code
import psycopg2

# Hardcoded database credentials!
DB_HOST = "prod-database.company.com"
DB_USER = "admin"
DB_PASSWORD = "SuperSecret123!"  # NEVER DO THIS!
DB_NAME = "production"

def get_db_connection():
    return psycopg2.connect(
        host=DB_HOST,
        user=DB_USER,
        password=DB_PASSWORD,
        database=DB_NAME
    )

# Hardcoded API keys!
API_KEY = "sk-live-abc123def456ghi789"
SECRET_KEY = "my-secret-key-12345"

app = Flask(__name__)
app.config['SECRET_KEY'] = SECRET_KEY  # Hardcoded!

Why this is vulnerable:

  • Secrets in source or repo history are easily leaked.
  • Rotation requires code changes and redeploys.

Weak Password Hashing

# VULNERABLE - Using weak hash functions
import hashlib

@app.route('/register', methods=['POST'])
def register():
    username = request.json['username']
    password = request.json['password']

    # MD5 and SHA1 are broken for password hashing!
    password_hash = hashlib.md5(password.encode()).hexdigest()
    # or password_hash = hashlib.sha1(password.encode()).hexdigest()

    user = User(username=username, password_hash=password_hash)
    db.session.add(user)
    db.session.commit()

    return {'status': 'success'}

Why this is vulnerable:

  • Fast hashes make brute-force practical.
  • No salt or work factor enables rainbow tables.

Credentials in Environment Variables Without Protection

# VULNERABLE - .env file committed to git
# .env (checked into version control!)
DATABASE_URL=postgresql://admin:password123@localhost/mydb
API_KEY=sk-live-abc123def456
SECRET_KEY=django-insecure-development-key
AWS_ACCESS_KEY_ID=AKIAIOSFODNN7EXAMPLE
AWS_SECRET_ACCESS_KEY=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY

# settings.py
import os
from dotenv import load_dotenv

load_dotenv()  # Loads from .env

# If .env is in git, credentials are exposed!
DATABASE_URL = os.getenv('DATABASE_URL')
API_KEY = os.getenv('API_KEY')

Why this is vulnerable:

  • Secrets persist in git history and forks.
  • Public repo exposure is common and hard to fully undo.

Insecure JWT Token Generation

# VULNERABLE - Weak JWT implementation
import jwt
from datetime import datetime, timedelta

# Weak secret key!
JWT_SECRET = "secret"  # Too simple!
JWT_ALGORITHM = "HS256"

def create_token(user_id):
    payload = {
        'user_id': user_id,
        'password': user.password_hash,  # Including password hash in token!
        'exp': datetime.utcnow() + timedelta(days=365)  # Too long-lived!
    }
    # Weak secret makes tokens easy to forge
    return jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)

Transmitting Credentials Over HTTP

# VULNERABLE - HTTP transmission
@app.route('/api/authenticate', methods=['POST'])
def authenticate():
    # Credentials sent over HTTP (not HTTPS)!
    username = request.json['username']
    password = request.json['password']

    # Even if hashed after receipt, transmission was insecure
    if verify_credentials(username, password):
        return {'token': generate_token()}
    return {'error': 'Invalid'}, 401

# No HTTPS enforcement
if __name__ == '__main__':
    app.run(debug=True)  # HTTP only!

Secure Patterns

Bcrypt Password Hashing

# SECURE - Proper bcrypt implementation
from flask import Flask, request, jsonify
from flask_sqlalchemy import SQLAlchemy
import bcrypt

app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///users.db'
db = SQLAlchemy(app)

class User(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(80), unique=True, nullable=False)
    password_hash = db.Column(db.String(128), nullable=False)

@app.route('/register', methods=['POST'])
def register():
    username = request.json['username']
    password = request.json['password']

    # Generate salt and hash password with bcrypt
    salt = bcrypt.gensalt(rounds=12)  # 12 rounds is recommended minimum
    password_hash = bcrypt.hashpw(password.encode('utf-8'), salt)

    user = User(
        username=username,
        password_hash=password_hash.decode('utf-8')  # Store as string
    )
    db.session.add(user)
    db.session.commit()

    return jsonify({'status': 'success', 'user_id': user.id})

@app.route('/login', methods=['POST'])
def login():
    username = request.json['username']
    password = request.json['password']

    user = User.query.filter_by(username=username).first()

    if not user:
        # Generic error - don't reveal if user exists
        return jsonify({'error': 'Invalid credentials'}), 401

    # Verify password using bcrypt
    if bcrypt.checkpw(password.encode('utf-8'), user.password_hash.encode('utf-8')):
        token = generate_secure_token(user.id)
        return jsonify({'token': token})

    return jsonify({'error': 'Invalid credentials'}), 401

Why this works:

  • Bcrypt uses per-password salts and a tunable cost to make offline cracking expensive.
  • checkpw() verifies safely and stored hashes are in standard formats.

Django Password Hashing

# SECURE - Django built-in password hashing
# settings.py
PASSWORD_HASHERS = [
    'django.contrib.auth.hashers.Argon2PasswordHasher',  # Recommended
    'django.contrib.auth.hashers.PBKDF2PasswordHasher',  # Fallback
    'django.contrib.auth.hashers.PBKDF2SHA1PasswordHasher',
    'django.contrib.auth.hashers.BCryptSHA256PasswordHasher',
]

# Install argon2: pip install django[argon2]

# views.py
from django.contrib.auth.models import User
from django.contrib.auth import authenticate, login
from django.contrib.auth.hashers import make_password, check_password
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
import json

@csrf_exempt
def register(request):
    if request.method == 'POST':
        data = json.loads(request.body)
        username = data.get('username')
        password = data.get('password')

        # Django automatically uses configured hasher
        user = User.objects.create_user(
            username=username,
            password=password  # Automatically hashed!
        )

        return JsonResponse({'status': 'success', 'user_id': user.id})

@csrf_exempt
def user_login(request):
    if request.method == 'POST':
        data = json.loads(request.body)
        username = data.get('username')
        password = data.get('password')

        # Authenticate handles secure password checking
        user = authenticate(username=username, password=password)

        if user is not None:
            login(request, user)
            return JsonResponse({'status': 'success'})

        return JsonResponse({'error': 'Invalid credentials'}, status=401)

Why this works:

  • Django hashes passwords automatically with modern hashers and supports upgrades.
  • authenticate() verifies against configured hashers without custom logic.

Secrets Management with HashiCorp Vault

# SECURE - HashiCorp Vault integration
import hvac
import os

class VaultSecrets:
    def __init__(self):
        # Vault address and token from environment (not in code)
        vault_url = os.getenv('VAULT_ADDR', 'http://localhost:8200')
        vault_token = os.getenv('VAULT_TOKEN')

        self.client = hvac.Client(url=vault_url, token=vault_token)

        if not self.client.is_authenticated():
            raise Exception('Failed to authenticate with Vault')

    def get_secret(self, path):
        """Retrieve secret from Vault"""
        try:
            secret = self.client.secrets.kv.v2.read_secret_version(path=path)
            return secret['data']['data']
        except Exception as e:
            # Log error but don't expose vault details
            print(f'Failed to retrieve secret: {path}')
            raise

    def get_database_credentials(self):
        """Get database credentials from Vault"""
        creds = self.get_secret('database/postgresql')
        return {
            'host': creds['host'],
            'port': creds['port'],
            'user': creds['username'],
            'password': creds['password'],
            'database': creds['database']
        }

# Usage in application
vault = VaultSecrets()
db_creds = vault.get_database_credentials()

# Use credentials without hardcoding
import psycopg2

conn = psycopg2.connect(
    host=db_creds['host'],
    port=db_creds['port'],
    user=db_creds['user'],
    password=db_creds['password'],
    database=db_creds['database']
)

Why this works:

  • Secrets are stored centrally and fetched at runtime, encrypted at rest.
  • Policies and short-lived tokens enable least privilege and rotation.

AWS Secrets Manager Integration

# SECURE - AWS Secrets Manager
import boto3
import json
from botocore.exceptions import ClientError

class AWSSecretsManager:
    def __init__(self, region_name='us-east-1'):
        self.client = boto3.client('secretsmanager', region_name=region_name)

    def get_secret(self, secret_name):
        """Retrieve secret from AWS Secrets Manager"""
        try:
            response = self.client.get_secret_value(SecretId=secret_name)

            if 'SecretString' in response:
                return json.loads(response['SecretString'])
            else:
                # Binary secrets
                return response['SecretBinary']
        except ClientError as e:
            if e.response['Error']['Code'] == 'ResourceNotFoundException':
                print(f'Secret {secret_name} not found')
            elif e.response['Error']['Code'] == 'InvalidRequestException':
                print(f'Invalid request for secret {secret_name}')
            elif e.response['Error']['Code'] == 'InvalidParameterException':
                print(f'Invalid parameter for secret {secret_name}')
            raise

# Usage
secrets = AWSSecretsManager(region_name='us-east-1')

# Get database credentials
db_secret = secrets.get_secret('production/database')
DATABASE_URL = f"postgresql://{db_secret['username']}:{db_secret['password']}@{db_secret['host']}/{db_secret['database']}"

# Get API keys
api_secret = secrets.get_secret('production/api-keys')
STRIPE_API_KEY = api_secret['stripe_key']
SENDGRID_API_KEY = api_secret['sendgrid_key']

Why this works:

  • Secrets are encrypted with KMS and accessed at runtime via IAM roles.
  • Rotation and CloudTrail logging reduce exposure and support auditing.

Secure JWT Implementation

# SECURE - Proper JWT token handling
from datetime import datetime, timedelta
import jwt
import secrets

# Generate strong secret key
def generate_secret_key():
    """Generate cryptographically secure secret key"""
    return secrets.token_urlsafe(64)

# Store in secrets manager, not in code
JWT_SECRET = vault.get_secret('jwt/secret-key')['key']
JWT_ALGORITHM = 'HS256'
JWT_EXPIRATION_HOURS = 24  # Short-lived tokens

def create_access_token(user_id: int, expires_delta: timedelta = None):
    """Create secure JWT token"""
    if expires_delta is None:
        expires_delta = timedelta(hours=JWT_EXPIRATION_HOURS)

    expire = datetime.utcnow() + expires_delta

    # Minimal claims - don't include sensitive data
    payload = {
        'user_id': user_id,
        'exp': expire,
        'iat': datetime.utcnow(),  # Issued at
        'jti': secrets.token_urlsafe(16)  # JWT ID for revocation
    }

    return jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)

def verify_token(token: str):
    """Verify and decode JWT token"""
    try:
        payload = jwt.decode(
            token,
            JWT_SECRET,
            algorithms=[JWT_ALGORITHM],
            options={'verify_exp': True}  # Verify expiration
        )
        return payload
    except jwt.ExpiredSignatureError:
        raise ValueError('Token has expired')
    except jwt.InvalidTokenError:
        raise ValueError('Invalid token')

# Refresh token implementation
def create_refresh_token(user_id: int):
    """Create long-lived refresh token"""
    expires_delta = timedelta(days=30)
    expire = datetime.utcnow() + expires_delta

    payload = {
        'user_id': user_id,
        'exp': expire,
        'type': 'refresh'
    }

    return jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)

Why this works:

  • Strong secrets and short-lived access tokens reduce blast radius.
  • Validation checks and separate refresh tokens tighten control.

HTTPS Enforcement

# SECURE - Force HTTPS in Flask
from flask import Flask, request, redirect
from flask_talisman import Talisman

app = Flask(__name__)

# Enforce HTTPS with Flask-Talisman
Talisman(app, 
         force_https=True,
         strict_transport_security=True,
         strict_transport_security_max_age=31536000,  # 1 year
         content_security_policy=None)

@app.before_request
def enforce_https():
    """Additional HTTPS enforcement"""
    if not request.is_secure and not app.debug:
        url = request.url.replace('http://', 'https://', 1)
        return redirect(url, code=301)

# For production deployment
if __name__ == '__main__':
    # Use HTTPS in production
    context = ('/path/to/cert.pem', '/path/to/key.pem')
    app.run(ssl_context=context, host='0.0.0.0', port=443)

Why this works:

  • TLS encrypts credentials in transit.
  • HSTS and redirects ensure HTTP is not accepted in production.

FastAPI with OAuth2 and Password Hashing

# SECURE - FastAPI OAuth2 implementation
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from passlib.context import CryptContext
from pydantic import BaseModel
from datetime import datetime, timedelta
import jwt

app = FastAPI()

# Password hashing context
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

# OAuth2 scheme
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

# Secret from secure storage
SECRET_KEY = vault.get_secret('api/secret-key')['key']
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

class User(BaseModel):
    username: str
    email: str
    password_hash: str

def verify_password(plain_password: str, hashed_password: str) -> bool:
    """Verify password against hash"""
    return pwd_context.verify(plain_password, hashed_password)

def get_password_hash(password: str) -> str:
    """Hash password using bcrypt"""
    return pwd_context.hash(password)

def create_access_token(data: dict, expires_delta: timedelta = None):
    """Create JWT access token"""
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)

    to_encode.update({"exp": expire})
    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)

@app.post("/register")
async def register(username: str, password: str, email: str):
    """Register new user with hashed password"""
    password_hash = get_password_hash(password)

    user = User(
        username=username,
        email=email,
        password_hash=password_hash
    )

    # Save to database
    # db.add(user)

    return {"status": "success", "username": username}

@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    """Login and get access token"""
    # Get user from database
    user = get_user(form_data.username)

    if not user or not verify_password(form_data.password, user.password_hash):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )

    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username},
        expires_delta=access_token_expires
    )

    return {"access_token": access_token, "token_type": "bearer"}

async def get_current_user(token: str = Depends(oauth2_scheme)):
    """Get current user from token"""
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise HTTPException(status_code=401, detail="Invalid token")
    except jwt.ExpiredSignatureError:
        raise HTTPException(status_code=401, detail="Token expired")
    except jwt.InvalidTokenError:
        raise HTTPException(status_code=401, detail="Invalid token")

    user = get_user(username)
    if user is None:
        raise HTTPException(status_code=401, detail="User not found")

    return user

@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_user)):
    """Protected endpoint requiring authentication"""
    return current_user

Why this works:

  • OAuth2 flow with Passlib bcrypt provides consistent hashing and token handling.
  • Dependency injection centralizes auth checks for protected routes.

Verification

To verify credentials are properly protected:

  • Check password storage: Verify passwords in the database are hashed (bcrypt hashes start with $2b$, Argon2 with $argon2), not plaintext
  • Confirm salt usage: Ensure identical passwords for different users generate different hashes
  • Review configuration: Check that settings files use environment variables or secret managers, not hardcoded credentials
  • Test JWT tokens: Verify tokens expire and include proper claims (expiration, issuer, audience)
  • Search source code: Grep codebase for potential hardcoded secrets, API keys, or passwords
  • Test authentication flow: Verify login succeeds with correct credentials and fails with incorrect ones
  • Check .env files: Ensure .env is in .gitignore and never committed to version control
  • Use security scanners: Run tools like bandit to detect hardcoded credentials and weak cryptographic usage
# SECURE - Tests to verify credential protection
import pytest
import bcrypt
from app import app, db, User

class TestCredentialSecurity:
    @pytest.fixture
    def client(self):
        app.config['TESTING'] = True
        app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///:memory:'
        with app.test_client() as client:
            with app.app_context():
                db.create_all()
            yield client

    def test_passwords_are_hashed(self, client):
        """Verify passwords are not stored in plaintext"""
        response = client.post('/register', json={
            'username': 'testuser',
            'password': 'MyPassword123!'
        })

        assert response.status_code == 200

        # Check database directly
        user = User.query.filter_by(username='testuser').first()

        # Password should not be stored in plaintext
        assert user.password_hash != 'MyPassword123!'

        # Password hash should look like bcrypt format
        assert user.password_hash.startswith('$2b$')

        # Verify password can be checked
        assert bcrypt.checkpw(
            'MyPassword123!'.encode('utf-8'),
            user.password_hash.encode('utf-8')
        )

    def test_password_hash_is_unique(self, client):
        """Verify same password produces different hashes (salted)"""
        password = 'SamePassword123!'

        client.post('/register', json={
            'username': 'user1',
            'password': password
        })

        client.post('/register', json={
            'username': 'user2',
            'password': password
        })

        user1 = User.query.filter_by(username='user1').first()
        user2 = User.query.filter_by(username='user2').first()

        # Same password should produce different hashes due to salt
        assert user1.password_hash != user2.password_hash

    def test_weak_passwords_rejected(self, client):
        """Verify password strength requirements"""
        weak_passwords = [
            '12345',
            'password',
            'abc',
            '11111111'
        ]

        for weak_pass in weak_passwords:
            response = client.post('/register', json={
                'username': f'user_{weak_pass}',
                'password': weak_pass
            })

            # Should reject weak passwords
            assert response.status_code == 400
            data = response.get_json()
            assert 'password' in data['error'].lower()

    def test_no_hardcoded_secrets(self):
        """Verify no hardcoded credentials in code"""
        import app as app_module
        import inspect

        source = inspect.getsource(app_module)

        # Check for common hardcoded patterns
        forbidden_patterns = [
            'password =',
            'secret_key =',
            'api_key =',
            'AWS_SECRET',
            'DB_PASSWORD'
        ]

        for pattern in forbidden_patterns:
            # Allow in comments or variable names, but not assignments
            lines = [line for line in source.split('\n') 
                    if pattern.lower() in line.lower() and '=' in line
                    and not line.strip().startswith('#')]

            assert len(lines) == 0, f'Found potential hardcoded secret: {pattern}'

    def test_jwt_tokens_expire(self):
        """Verify JWT tokens have expiration"""
        import jwt
        from datetime import datetime

        user_id = 123
        token = create_access_token(user_id)

        # Decode without verification to check claims
        unverified = jwt.decode(token, options={"verify_signature": False})

        # Must have expiration claim
        assert 'exp' in unverified

        # Expiration should be in the future
        exp_time = datetime.fromtimestamp(unverified['exp'])
        assert exp_time > datetime.utcnow()

        # But not too far in the future (< 30 days)
        max_exp = datetime.utcnow() + timedelta(days=30)
        assert exp_time < max_exp

Additional Resources