Sanic 是基于 Python 的一个支持高并发的异步 web 框架,sanic-jwt 则是针对Sanic 开发的一个基于 PyJWT 封装的 JWT 授权认证模块。

sanic-jwt

安装

pip install sanic-jwt

实例

下面实例主要总结:

  • 常用参数的配置;
  • 修改发生异常时返回的响应数据;
  • 解析和修改payload;
  • 查找用户;
from sanic import Sanic, request, response
from sanic_jwt import initialize, Configuration, Responses, protected, exceptions, Authentication, inject_user


class User:

    def __init__(self, uid, username, sex, password, info, black_level=0):
        self.user_id = uid
        self.sex = sex
        self.username = username
        self.password = password
        self.personal_info = info  # 只能登录后个人可见的信息
        self.black_level = black_level  # 黑名单等级,默认0为正常用户

    def __repr__(self):
        return "User(id='{}')".format(self.user_id)

    def to_dict(self):

        return {
            "uid": self.user_id,  # 注意:此处 "uid" 要与 MyJWTConfig 中的 user_id 设置一致!
            "sex": self.sex,
            "username": self.username,
            "personal_info": self.personal_info
        }


# 模拟一个用户列表
users = [
    User(1, "user1", "", "123",  "这是仅 user1 可见信息", 1),
    User(2, "user2", "", "456",  "这是仅 user2 可见信息", 0)
]

username_table = {u.username: u for u in users}
userid_table = {u.user_id: u for u in users}


async def authenticate(req: request.Request):
    username = req.json.get("username", None)
    password = req.json.get("password", None)

    if not username or not password:
        raise exceptions.AuthenticationFailed("用户名或密码为空!")
    user = username_table.get(username, None)
    if user is None:
        raise exceptions.AuthenticationFailed("用户名或密码不正确!")
    if password != user.password:
        raise exceptions.AuthenticationFailed("用户名或密码不正确!")
    return user


class MyJWTConfig(Configuration):
    # -------------- url_prefix ---------------------
    # [描述] 获取授权的路由地址
    # [默认] '/auth'
    url_prefix = '/login'

    # -------------- secret -------------------------
    # [描述] 加密密码
    # [默认] 'This is a big secret. Shhhhh'
    # [建议] 该密码是 JWT 的安全核心所在,需要保密,尽量使用更长更复杂的密码
    secret = ',$FCyFZ^b16#m:ragM#d-!;4!U5zgZDF(EhswOL_HGV#xN1Ll%MaBU42AN=jXgp7'

    # -------------- expiration_delta ----------------------
    # [描述] 过期时间,单位为秒
    # [默认] 30 分钟,即:60 * 30
    # [建议] 该时间不宜过长,同时建议开启 refresh_token_enabled 以便自动更新 token
    expiration_delta = 60 * 60  # 改为 10 分钟过期

    # -------------- cookie_set ---------------------
    # [描述] 是否将获取到的 token 信息写入到 cookie
    # [默认] False,即不写入cookie
    # 只有该项为 True,其它 cookie 相关设置才会起效。
    # cookie_set = True

    # -------------- cookie_access_token_name ---------------
    # [描述] cookie 中存储 token 的名称。
    # [默认] 'access_token'
    # cookie_access_token_name = "token"

    #  -------------- cookie_access_token_name ---------------
    # [描述] 包含用户 id 的用户对象的键或属性,这里对应 User 类的用户唯一标识
    # [默认] 'user_id'
    user_id = "uid"

    claim_iat = True  # 显示签发时间,JWT的默认保留字段,在 sanic-jwt 中默认不显示该项


class MyJWTAuthentication(Authentication):

    # 从 payload 中解析用户信息,然后返回查找到的用户
    # args[0]: request
    # args[1]: payload
    async def retrieve_user(self, *args, **kwargs):
        user_id_attribute = self.config.user_id()
        if not args or len(args) < 2 or user_id_attribute not in args[1]:
            return {}
        user_id = dict(args[1]).get(user_id_attribute)
        # TODO: 根据项目实际情况进行修改
        user = userid_table.get(user_id)
        return user

    # 拓展 payload
    async def extend_payload(self, payload, *args, **kwargs):
        # 可以获取 User 中的一些属性添加到 payload 中
        # 注意:payload 信息是公开的,这里不要添加敏感信息
        user_id_attribute = self.config.user_id()
        user_id = payload.get(user_id_attribute)
        # TODO: 根据项目实际情况进行修改
        user: User = userid_table.get(user_id)
        payload.update({'sex': user.sex})  # 比如添加性别属性
        return payload

    async def extract_payload(self, req, verify=True, *args, **kwargs):
        return await super().extract_payload(req, verify)


class MyJWTResponse(Responses):

    # 自定义发生异常的返回数据
    @staticmethod
    def exception_response(req: request.Request, exception: exceptions):
        # sanic-jwt.exceptions 下面定义的异常类型:
        # AuthenticationFailed
        # MissingAuthorizationHeader
        # MissingAuthorizationCookie
        # InvalidAuthorizationHeader
        # MissingRegisteredClaim
        # Unauthorized
        msg = str(exception)
        if exception.status_code == 500:
            msg = str(exception)
        elif isinstance(exception, exceptions.AuthenticationFailed):
            msg = str(exception)
        else:
            if "expired" in msg:
                msg = "授权已失效,请重新登录!"
            else:
                msg = "未授权,请先登录!"
        result = {
            "status": exception.status_code,
            "data": None,
            "msg": msg
        }
        return response.json(result, status=exception.status_code)


app = Sanic("my_auth_app")
initialize(app, authenticate=authenticate,
           authentication_class=MyJWTAuthentication, configuration_class=MyJWTConfig, responses_class=MyJWTResponse)


@app.route("/index")
@protected()  # 保护该路由,只有授权用户才能访问
async def protected_route_index(req: request.Request):
    # 从 request 中获取 payload,然后返回给前端
    payload = await req.app.auth.extract_payload(req)
    return response.json({'payloadInfo': payload})


@app.route("/info")
@inject_user()  # 注入用户信息
@protected()    # 保护该路由,只有授权用户才能访问
async def protected_route_info(req: request.Request, user: User):
    if user.black_level == 0:
        return response.json({'userName': user.username, "personalInfo": user.personal_info})
    else:  # 进入黑名单等级之后限制查看
        return response.json({'userName': user.username, "personalInfo": ""})


if __name__ == "__main__":
    app.run(host="0.0.0.0", port=8080, auto_reload=True)

Configuration 参数

 下面列出的参数可以根据需要在前面的 MyJWTConfig 这个类下进行添加设置。

参考:

sanic-jwt

https://blog.csdn.net/zhouping118/article/details/88736986

03-07 04:24