Logo
Published on

3.1.Flask 接口认证

Authors
  • avatar
    Name
    xiaobai
    Twitter

1.目标与选型

  • 目标:保护受控资源,仅允许已登录用户访问;实现注册、登录颁发令牌(access/refresh)、令牌校验与刷新、白名单与统一错误返回。
  • 方案对比:
    • PyJWT 手动集成:灵活可控,适合自定义需求;
    • Flask-JWT-Extended:封装完善,支持装饰器、刷新令牌与附加 Claims。

依赖安装:

pip install flask PyJWT passlib[bcrypt]
# 或(使用扩展)
pip install flask Flask-JWT-Extended

2.通用配置与密码哈希

# config/auth.py
import os

JWT_SECRET_KEY = os.getenv('JWT_SECRET_KEY', 'dev-secret')
JWT_ALGORITHM = os.getenv('JWT_ALGORITHM', 'HS256')
JWT_ACCESS_TOKEN_EXPIRE_MINUTES = int(os.getenv('JWT_ACCESS_TOKEN_EXPIRE_MINUTES', '30'))
JWT_REFRESH_TOKEN_EXPIRE_DAYS = int(os.getenv('JWT_REFRESH_TOKEN_EXPIRE_DAYS', '7'))
JWT_GLOBAL_ENABLE = os.getenv('JWT_GLOBAL_ENABLE', 'false').lower() == 'true'
JWT_WHITE_LIST = set(eval(os.getenv('JWT_WHITE_LIST', "['/login','/registry','/verify']")))

from passlib.context import CryptContext
pwd_context = CryptContext(schemes=['bcrypt'], deprecated='auto')

def hash_password(plain: str) -> str:
    return pwd_context.hash(plain)

def verify_password(plain: str, hashed: str) -> bool:
    return pwd_context.verify(plain, hashed)

3.方案一:PyJWT 手动实现(access/refresh)

3.1.令牌工具

# utils/jwt_utils.py
import jwt
from datetime import datetime, timezone, timedelta
from config.auth import (
    JWT_SECRET_KEY, JWT_ALGORITHM,
    JWT_ACCESS_TOKEN_EXPIRE_MINUTES, JWT_REFRESH_TOKEN_EXPIRE_DAYS,
)

def create_access_token(username: str, minutes: int | None = None) -> str:
    exp = datetime.now(timezone.utc) + timedelta(minutes=minutes or JWT_ACCESS_TOKEN_EXPIRE_MINUTES)
    return jwt.encode({'sub': username, 'type': 'access', 'exp': exp}, JWT_SECRET_KEY, algorithm=JWT_ALGORITHM)

def create_refresh_token(username: str, days: int | None = None) -> str:
    exp = datetime.now(timezone.utc) + timedelta(days=days or JWT_REFRESH_TOKEN_EXPIRE_DAYS)
    return jwt.encode({'sub': username, 'type': 'refresh', 'exp': exp}, JWT_SECRET_KEY, algorithm=JWT_ALGORITHM)

def decode_token(token: str) -> dict | None:
    try:
        return jwt.decode(token, JWT_SECRET_KEY, algorithms=[JWT_ALGORITHM])
    except Exception:
        return None

3.2.用户模型与仓库(示例)

# models/user_repo.py(演示用,生产替换为数据库)
USERS = {}

def add_user(username: str, hashed_password: str, email: str):
    USERS[username] = {'username': username, 'hashed_password': hashed_password, 'email': email, 'valid': True}

def get_user(username: str):
    return USERS.get(username)

3.3.Blueprint 与路由

# routes/auth.py
from flask import Blueprint, request
from functools import wraps
from config.auth import JWT_GLOBAL_ENABLE, JWT_WHITE_LIST
from utils.jwt_utils import create_access_token, create_refresh_token, decode_token
from config.auth import hash_password, verify_password
from models.user_repo import add_user, get_user

auth = Blueprint('auth', __name__)

def ok(data=None, status=200):
    return {'code': 0, 'message': 'ok', 'data': data}, status

def err(message, status=400, code=1):
    return {'code': code, 'message': message, 'data': None}, status

@auth.post('/registry')
def registry():
    body = request.get_json(silent=True) or {}
    username, password, email = body.get('username'), body.get('password'), body.get('email')
    if not username or not password or not email:
        return err('username/password/email required', 422)
    if get_user(username):
        return err('user exists', 409)
    add_user(username, hash_password(password), email)
    return ok({'username': username}, 201)

@auth.post('/login')
def login():
    body = request.get_json(silent=True) or {}
    username, password = body.get('username'), body.get('password')
    u = get_user(username)
    if not u or not verify_password(password, u['hashed_password']):
        return err('invalid credentials', 401)
    return ok({
        'access_token': create_access_token(username),
        'refresh_token': create_refresh_token(username),
        'token_type': 'bearer'
    })

def require_auth(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        auth_header = request.headers.get('Authorization', '')
        if not auth_header.startswith('Bearer '):
            return err('missing bearer', 401)
        payload = decode_token(auth_header.split(' ', 1)[1])
        if not payload or payload.get('type') != 'access':
            return err('invalid or expired token', 401)
        request.current_user = payload.get('sub')
        return func(*args, **kwargs)
    return wrapper

@auth.post('/token/refresh')
def refresh():
    auth_header = request.headers.get('Authorization', '')
    if not auth_header.startswith('Bearer '):
        return err('missing bearer', 401)
    payload = decode_token(auth_header.split(' ', 1)[1])
    if not payload or payload.get('type') != 'refresh':
        return err('invalid or expired refresh token', 401)
    username = payload.get('sub')
    return ok({'access_token': create_access_token(username), 'token_type': 'bearer'})

@auth.get('/me')
@require_auth
def me():
    return ok({'user': request.current_user})

3.4.全局认证(白名单)

# 在应用初始化处注册 before_request
from flask import Flask, request
from routes.auth import auth, err
from config.auth import JWT_GLOBAL_ENABLE, JWT_WHITE_LIST
from utils.jwt_utils import decode_token

app = Flask(__name__)
app.register_blueprint(auth)

@app.before_request
def global_auth():
    if not JWT_GLOBAL_ENABLE:
        return None
    path = request.path
    if path in JWT_WHITE_LIST:
        return None
    auth_header = request.headers.get('Authorization', '')
    if not auth_header.startswith('Bearer '):
        return err('missing bearer', 401)
    payload = decode_token(auth_header.split(' ', 1)[1])
    if not payload or payload.get('type') != 'access':
        return err('invalid or expired token', 401)
    request.current_user = payload.get('sub')
    return None

4.方案二:Flask-JWT-Extended(快速接入)

from flask import Flask, request
from flask_jwt_extended import (
    JWTManager, create_access_token, create_refresh_token,
    jwt_required, get_jwt_identity
)

app = Flask(__name__)
app.config['JWT_SECRET_KEY'] = 'dev-secret'
jwt = JWTManager(app)

@app.post('/login')
def login():
    # 验证用户名密码后
    username = (request.get_json() or {}).get('username')
    access = create_access_token(identity=username)
    refresh = create_refresh_token(identity=username)
    return {'access_token': access, 'refresh_token': refresh}

@app.get('/me')
@jwt_required()
def me():
    return {'user': get_jwt_identity()}

对比:扩展提供装饰器与刷新令牌等能力,减少样板代码;若需更强自定义(如白名单、复合校验),PyJWT 方案更灵活。

5.错误返回与安全建议

  • 统一返回结构:{'code','message','data'},便于前端消费与观测。
  • 将令牌放在 Authorization: Bearer,避免置于 URL;如用 Cookie,配合 CSRF 防护。
  • 刷新令牌建议做轮换与黑名单(服务端维护失效列表)。
  • 明文密码永不入库,使用安全哈希(bcrypt);强制复杂度与限流防暴力破解。

6.curl 调试示例

# 注册
curl -X POST http://127.0.0.1:5000/registry \
  -H 'Content-Type: application/json' \
  -d '{"username":"alice","password":"123456","email":"alice@test.com"}'

# 登录
curl -X POST http://127.0.0.1:5000/login \
  -H 'Content-Type: application/json' \
  -d '{"username":"alice","password":"123456"}'

# 访问受控资源
curl -H 'Authorization: Bearer <access>' http://127.0.0.1:5000/me

# 刷新令牌
curl -X POST -H 'Authorization: Bearer <refresh>' http://127.0.0.1:5000/token/refresh

7.总结与扩展

  • 两种认证方案各有适用场景;建议结合白名单、全局钩子与统一错误格式使用。
  • 进一步可接入角色权限(RBAC)、审计日志、会话失效与设备绑定等能力。