Sanic 是基于 Python 的一个支持高并发的异步 web 框架,sanic-jwt 则是针对Sanic 开发的一个基于 PyJWT 封装的 JWT 授权认证模块。
sanic-jwt
- 项目主页:https://github.com/ahopkins/sanic-jwt
- 官方文档:https://sanic-jwt.readthedocs.io/en/latest/index.html
安装
pip install sanic-jwt
实例
下面实例主要总结:
- 常用参数的配置;
- 修改发生异常时返回的响应数据;
- 解析和修改payload;
- 查找用户;
from sanic import Sanic, request, response from sanic_jwt import initialize, Configuration, Responses, protected, exceptions, Authentication, inject_user class User: def __init__(self, uid, username, sex, password, info, black_level=0): self.user_id = uid self.sex = sex self.username = username self.password = password self.personal_info = info # 只能登录后个人可见的信息 self.black_level = black_level # 黑名单等级,默认0为正常用户 def __repr__(self): return "User(id='{}')".format(self.user_id) def to_dict(self): return { "uid": self.user_id, # 注意:此处 "uid" 要与 MyJWTConfig 中的 user_id 设置一致! "sex": self.sex, "username": self.username, "personal_info": self.personal_info } # 模拟一个用户列表 users = [ User(1, "user1", "男", "123", "这是仅 user1 可见信息", 1), User(2, "user2", "女", "456", "这是仅 user2 可见信息", 0) ] username_table = {u.username: u for u in users} userid_table = {u.user_id: u for u in users} async def authenticate(req: request.Request): username = req.json.get("username", None) password = req.json.get("password", None) if not username or not password: raise exceptions.AuthenticationFailed("用户名或密码为空!") user = username_table.get(username, None) if user is None: raise exceptions.AuthenticationFailed("用户名或密码不正确!") if password != user.password: raise exceptions.AuthenticationFailed("用户名或密码不正确!") return user class MyJWTConfig(Configuration): # -------------- url_prefix --------------------- # [描述] 获取授权的路由地址 # [默认] '/auth' url_prefix = '/login' # -------------- secret ------------------------- # [描述] 加密密码 # [默认] 'This is a big secret. Shhhhh' # [建议] 该密码是 JWT 的安全核心所在,需要保密,尽量使用更长更复杂的密码 secret = ',$FCyFZ^b16#m:ragM#d-!;4!U5zgZDF(EhswOL_HGV#xN1Ll%MaBU42AN=jXgp7' # -------------- expiration_delta ---------------------- # [描述] 过期时间,单位为秒 # [默认] 30 分钟,即:60 * 30 # [建议] 该时间不宜过长,同时建议开启 refresh_token_enabled 以便自动更新 token expiration_delta = 60 * 60 # 改为 10 分钟过期 # -------------- cookie_set --------------------- # [描述] 是否将获取到的 token 信息写入到 cookie # [默认] False,即不写入cookie # 只有该项为 True,其它 cookie 相关设置才会起效。 # cookie_set = True # -------------- cookie_access_token_name --------------- # [描述] cookie 中存储 token 的名称。 # [默认] 'access_token' # cookie_access_token_name = "token" # -------------- cookie_access_token_name --------------- # [描述] 包含用户 id 的用户对象的键或属性,这里对应 User 类的用户唯一标识 # [默认] 'user_id' user_id = "uid" claim_iat = True # 显示签发时间,JWT的默认保留字段,在 sanic-jwt 中默认不显示该项 class MyJWTAuthentication(Authentication): # 从 payload 中解析用户信息,然后返回查找到的用户 # args[0]: request # args[1]: payload async def retrieve_user(self, *args, **kwargs): user_id_attribute = self.config.user_id() if not args or len(args) < 2 or user_id_attribute not in args[1]: return {} user_id = dict(args[1]).get(user_id_attribute) # TODO: 根据项目实际情况进行修改 user = userid_table.get(user_id) return user # 拓展 payload async def extend_payload(self, payload, *args, **kwargs): # 可以获取 User 中的一些属性添加到 payload 中 # 注意:payload 信息是公开的,这里不要添加敏感信息 user_id_attribute = self.config.user_id() user_id = payload.get(user_id_attribute) # TODO: 根据项目实际情况进行修改 user: User = userid_table.get(user_id) payload.update({'sex': user.sex}) # 比如添加性别属性 return payload async def extract_payload(self, req, verify=True, *args, **kwargs): return await super().extract_payload(req, verify) class MyJWTResponse(Responses): # 自定义发生异常的返回数据 @staticmethod def exception_response(req: request.Request, exception: exceptions): # sanic-jwt.exceptions 下面定义的异常类型: # AuthenticationFailed # MissingAuthorizationHeader # MissingAuthorizationCookie # InvalidAuthorizationHeader # MissingRegisteredClaim # Unauthorized msg = str(exception) if exception.status_code == 500: msg = str(exception) elif isinstance(exception, exceptions.AuthenticationFailed): msg = str(exception) else: if "expired" in msg: msg = "授权已失效,请重新登录!" else: msg = "未授权,请先登录!" result = { "status": exception.status_code, "data": None, "msg": msg } return response.json(result, status=exception.status_code) app = Sanic("my_auth_app") initialize(app, authenticate=authenticate, authentication_class=MyJWTAuthentication, configuration_class=MyJWTConfig, responses_class=MyJWTResponse) @app.route("/index") @protected() # 保护该路由,只有授权用户才能访问 async def protected_route_index(req: request.Request): # 从 request 中获取 payload,然后返回给前端 payload = await req.app.auth.extract_payload(req) return response.json({'payloadInfo': payload}) @app.route("/info") @inject_user() # 注入用户信息 @protected() # 保护该路由,只有授权用户才能访问 async def protected_route_info(req: request.Request, user: User): if user.black_level == 0: return response.json({'userName': user.username, "personalInfo": user.personal_info}) else: # 进入黑名单等级之后限制查看 return response.json({'userName': user.username, "personalInfo": ""}) if __name__ == "__main__": app.run(host="0.0.0.0", port=8080, auto_reload=True)
Configuration 参数
下面列出的参数可以根据需要在前面的 MyJWTConfig 这个类下进行添加设置。
参考:
sanic-jwt