DRF框架的认证组件

核心代码:       self.perform_authentication(request) 

框架自带模块:    from rest_framework import authentication

认证组件的返回值:request.user

自定义的认证组件的钩子方法authenticator.authenticate(self) ;返回值是元组(user,auth)

from rest_framework import authentication

from rest_framework import authentication

class BaseAuthentication(object):
def authenticate(self, request):
# 必须重写该方法,返回元组(user,auth)
return (user_obj,token) class BasicAuthentication(BaseAuthentication): class SessionAuthentication(BaseAuthentication): class TokenAuthentication(BaseAuthentication): class RemoteUserAuthentication(BaseAuthentication):

基于BaseAuthentication类的认证
# myauth.py

from rest_framework import authentication
from AuthDemo.models import UserTable
from rest_framework.exceptions import AuthenticationFailed # 用于抛出异常 # 基于BaseAuthentication类的认证
class AuthoDemo(authentication.BaseAuthentication):
'''验证GET请求是否携带Token'''
def authenticate(self, request):
# 通过/user/test/?token="xxx" 获取token
token = request.query_params.get("token","")
# 如果token不存在
if not token:
# 抛出异常
raise AuthenticationFailed("token不存在") # token存在,验证token
user_obj = UserTable.objects.filter(token=token).first()
if user_obj:
# 验证通过,必须返回元组,(user,token)
return (user_obj,token)
# 认证不通过抛出异常
raise AuthenticationFailed("token错误")
# views.py

from django.shortcuts import render
from rest_framework.views import APIView
from rest_framework.response import Response
from .models import UserTable
import uuid
from utils.myauth import AuthoDemo
from rest_framework import authentication # Create your views here. # 注册视图
class RegisterView(APIView):
def post(self,request):
# 获取提交的用户名和密码
username = request.data.get('username')
password = request.data.get('password') # 创建对象
UserTable.objects.create(username=username,password=password) # 返回结果
return Response("注册成功!") # 登陆视图
class LoginView(APIView):
def post(self,request):
# 获取提交的用户名和密码
username = request.data.get('username')
password = request.data.get('password')
# 验证用户名密码是否正确
user_obj = UserTable.objects.filter(username=username,password=password).first()
if user_obj:
# 验证通过,写入Token并保存
token = uuid.uuid4()
user_obj.token = token # 为对象的token字段写入随机字符串
user_obj.save()
# 返回token
return Response(token)
else:
return Response("用户名密码不正确") # 认证的测试视图
class TestView(APIView):
authentication_classes = [AuthoDemo,]
def get(self,request):
print(request.user) # 获取用户对象
print(request.auth) # 获取token
print(request.user.username) # 获取用户对象的名字
return Response("认证测试接口")

源码流程

# 1、封装request对象
def dispatch(self, request, *args, **kwargs):
request = self.initialize_request(request, *args, **kwargs) # 1.1
def initialize_request(self, request, *args, **kwargs):
parser_context = self.get_parser_context(request)
return Request(
request,
parsers=self.get_parsers(),
# 返回认证类的实例化对象列表:[auth() for auth in self.authentication_classes]
authenticators=self.get_authenticators(),
negotiator=self.get_content_negotiator(),
parser_context=parser_context
) # 1.2
class APIView(View):
authentication_classes = api_settings.DEFAULT_AUTHENTICATION_CLASSES from rest_framework import authentication # 查看自带的认证类 class ApiView(View):
# 2、认证组件
self.perform_authentication(request)
# 权限组件
self.check_permissions(request)
# 节流组件
self.check_throttles(request) # 2.1、开始认证
def perform_authentication(self, request):
request.user # 2.2、调用Request.user
class Request(object):
def __init__(self, request, authenticators=None):
self.authenticators = authenticators or () @property
def user(self):
if not hasattr(self, '_user'):
with wrap_attributeerrors():
# 2.3、
self._authenticate()
return self._user # 2.3、读取认证对象列表
def _authenticate(self):
for authenticator in self.authenticators:
try:
# 对每个进行验证,异常则全部终止,若返回None,则继续循环
user_auth_tuple = authenticator.authenticate(self) except exceptions.APIException:
self._not_authenticated()
raise if user_auth_tuple is not None:
# 给request赋值user
self._authenticator = authenticator
self.user, self.auth = user_auth_tuple
# 通过直接退出循环
return # 全都没有通过则设置匿名用户
self._not_authenticated() # 认证类实例
class BasicAuthentication(BaseAuthentication):
def authenticate(self, request):
"""
Returns a `User` if a correct username and password have been supplied
using HTTP Basic authentication. Otherwise returns `None`.
"""
pass
return self.authenticate_credentials(userid, password, request) def authenticate_credentials(self, userid, password, request=None):
"""
Authenticate the userid and password against username and password
with optional request for context.
"""
user = authenticate(request=request, **credentials) if user is None:
raise exceptions.AuthenticationFailed(_('Invalid username/password.')) if not user.is_active:
raise exceptions.AuthenticationFailed(_('User inactive or deleted.')) return (user, None)
04-25 10:42