FastAPI 结合 JWT

步骤

  1. **用户登录:**用户通过提交用户名和密码获取 JWT。
  2. **获取 Token:**服务器验证用户凭据后,生成并返回 JWT 给用户。
  3. **访问受保护的路由:**用户在访问受保护的路由时,需要在请求头中携带该 JWT。
  4. **Token 验证:**服务器验证 JWT 的合法性和有效性,允许或拒绝访问受保护资源。

安装

pip install fastapi uvicorn pyjwt

步骤

导入必要的模块

from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
from datetime import datetime, timedelta, timezone
import jwt

设置配置和初始化应用

SECRET_KEY = "your_secret_key"  # 用于签名 JWT 的密钥
ALGORITHM = "HS256"             # 加密算法
ACCESS_TOKEN_EXPIRE_MINUTES = 30  # Token 过期时间

app = FastAPI()

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

创建数据模型

class Token(BaseModel):
    access_token: str
    token_type: str

class TokenData(BaseModel):
    username: str | None = None

class User(BaseModel):
    username: str
    email: str | None = None
    full_name: str | None = None
    disabled: bool | None = None

class UserInDB(User):
    hashed_password: str

实现辅助函数

生成 JWT Token

def create_access_token(data: dict, expires_delta: timedelta | None = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.now(timezone.utc) + expires_delta
    else:
        expire = datetime.now(timezone.utc) + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

获取用户数据

fake_users_db = {
    "testuser": {
        "username": "testuser",
        "full_name": "Test User",
        "email": "testuser@example.com",
        "hashed_password": "fakehashedpassword",
        "disabled": False,
    }
}

def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)

验证密码

def verify_password(plain_password, hashed_password):
    return plain_password == hashed_password

获取当前用户

async def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except jwt.PyJWTError:
        raise credentials_exception
    user = get_user(fake_users_db, username=token_data.username)
    if user is None:
        raise credentials_exception
    return user

async def get_current_active_user(current_user: User = Depends(get_current_user)):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user

用户登录获取 Token

@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    user = get_user(fake_users_db, form_data.username)
    if not user or not verify_password(form_data.password, user.hashed_password):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )
    return {"access_token": access_token, "token_type": "bearer"}

受保护的路由示例

@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
    return current_user

所有代码

from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
from datetime import datetime, timedelta, timezone
import uvicorn
import jwt
import os

# JWT 相关配置
SECRET_KEY = "123456789ashdgjha.slakdv.laksd*as-d/sd3"  # 用于签名 JWT 的密钥(需要妥善保管,实际应用中应存储在环境变量或配置文件中)
ALGORITHM = "HS256"  # 使用的加密算法
ACCESS_TOKEN_EXPIRE_MINUTES = 30  # Token 的有效时间,以分钟为单位

app = FastAPI()  # 创建 FastAPI 应用实例

# OAuth2PasswordBearer 实例,用于依赖项
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

# 模拟的用户数据库,通常在实际应用中应从数据库中获取用户信息
fake_users_db = {
    "testuser": {
        "username": "testuser",
        "full_name": "Test User",
        "email": "testuser@example.com",
        "hashed_password": "fakehashedpassword",  # 在实际应用中,存储经过哈希处理的密码
        "disabled": False,  # 用户是否被禁用
    }
}

# Pydantic 模型,用于定义请求和响应的数据结构
class Token(BaseModel):
    access_token: str  # Token 字符串
    token_type: str  # Token 类型(一般为 "bearer")

class TokenData(BaseModel):
    username: str | None = None  # 从 Token 中提取的用户名

class User(BaseModel):
    username: str  # 用户名
    email: str | None = None  # 邮箱地址,可选
    full_name: str | None = None  # 用户全名,可选
    disabled: bool | None = None  # 用户是否被禁用,可选

class UserInDB(User):
    hashed_password: str  # 存储在数据库中的哈希密码

# 生成 JWT Token 的函数
def create_access_token(data: dict, expires_delta: timedelta | None = None):
    """
    生成 JWT Token。

    参数:
    - data (dict): 要编码到 JWT 中的数据。
    - expires_delta (timedelta, 可选): Token 的过期时间。

    返回:
    - str: 编码后的 JWT 字符串。
    """
    to_encode = data.copy()  # 创建副本,以避免修改原始数据
    if expires_delta:
        expire = datetime.now(timezone.utc) + expires_delta
    else:
        expire = datetime.now(timezone.utc) + timedelta(minutes=15)
    to_encode.update({"exp": expire})  # 添加过期时间到 JWT 数据中
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)  # 生成 JWT
    return encoded_jwt

# 从假数据库获取用户信息
def get_user(db, username: str):
    """
    从数据库中获取用户信息。

    参数:
    - db (dict): 用户数据库(在本例中为假数据)。
    - username (str): 用户名。

    返回:
    - UserInDB | None: 返回匹配的用户信息,如果不存在则返回 None。
    """
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)

# 验证用户密码
def verify_password(plain_password, hashed_password):
    """
    验证用户密码。

    参数:
    - plain_password (str): 用户输入的明文密码。
    - hashed_password (str): 存储在数据库中的哈希密码。

    返回:
    - bool: 密码匹配返回 True,否则返回 False。
    """
    return plain_password == hashed_password  # 在实际应用中,这里应该使用哈希函数进行比较

# 验证 Token 并获取当前用户
async def get_current_user(token: str = Depends(oauth2_scheme)):
    """
    从 JWT Token 中提取用户信息,并验证 Token 的合法性。

    参数:
    - token (str): JWT Token。

    返回:
    - User: 返回当前用户信息。

    抛出:
    - HTTPException: 当 Token 无效或用户不存在时抛出 401 错误。
    """
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])  # 解码 JWT
        username: str = payload.get("sub")  # 获取 JWT 中的用户名
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except jwt.PyJWTError:
        raise credentials_exception
    user = get_user(fake_users_db, username=token_data.username)  # 获取用户信息
    if user is None:
        raise credentials_exception
    return user

# 验证用户是否被禁用
async def get_current_active_user(current_user: User = Depends(get_current_user)):
    """
    验证当前用户是否被禁用。

    参数:
    - current_user (User): 当前用户信息。

    返回:
    - User: 如果用户未被禁用,返回用户信息。

    抛出:
    - HTTPException: 当用户被禁用时抛出 400 错误。
    """
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user

# 用户登录获取 Token
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    """
    用户登录接口,用于获取 JWT Token。

    参数:
    - form_data (OAuth2PasswordRequestForm): 包含用户名和密码的表单数据。

    返回:
    - dict: 包含 access_token 和 token_type 的响应数据。

    抛出:
    - HTTPException: 当用户名或密码错误时抛出 401 错误。
    """
    user = get_user(fake_users_db, form_data.username)
    if not user or not verify_password(form_data.password, user.hashed_password):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)  # 设置 Token 过期时间
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )
    return {"access_token": access_token, "token_type": "bearer"}

# 受保护的路由示例
@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
    """
    获取当前用户信息的受保护路由。

    参数:
    - current_user (User): 当前登录的用户信息(通过 JWT 验证)。

    返回:
    - User: 返回当前用户的信息。
    """
    return current_user

# 运行应用
if __name__ == "__main__":
    uvicorn.run(
        f"{os.path.basename(__file__).split('.')[0]}:app",
        host="127.0.0.1",
        port=8000,
        reload=True,  # 启用自动重载
    )

测试

获取 Token

FastAPI 结合 JWT-LMLPHP

访问受保护的路由

token正确

FastAPI 结合 JWT-LMLPHP

token错误

FastAPI 结合 JWT-LMLPHP

总结

注意

08-02 11:27