共用的models
 from django.db import models

 # Create your models here.

 class User(models.Model):
username = models.CharField(max_length=32)
password = models.CharField(max_length=32)
user_type = models.IntegerField(choices=((1, '超级用户'), (2, '普通用户'), (3, '第三方用户'))) class UserToken(models.Model):
user = models.OneToOneField(to='User')
token = models.CharField(max_length=64)

urls

 from django.conf.urls import url
from django.contrib import admin
from app01 import views
urlpatterns = [
url(r'^admin/', admin.site.urls),
url(r'^course/',views.Course.as_view()),
url(r'^login/',views.Login.as_view()),
url(r'^get_ip/',views.GetIp.as_view()),
]

views

 from django.shortcuts import render, HttpResponse

 # Create your views here.

 import json
from django.views import View
from rest_framework.views import APIView
from app01 import models
from utils.common import *
from rest_framework.response import Response class Login(APIView):
def post(self, request, *args, **kwargs):
response = MyResponse()
name = request.data.get('username')
pwd = request.data.get('password')
user = models.User.objects.filter(username=name, password=pwd).first()
if user:
token = get_token(name)
ret = models.UserToken.objects.update_or_create(user=user, defaults={'token': token})
response.status = 100
response.msg = '登陆成功'
response.token = token else:
response.msg = '用户名或密码错误' return Response(response.get_dic()) class Course(APIView):
authentication_classes = [MyAuth, ]
permission_classes = [MyPermission, ]
throttle_classes = [VisitThrottle, ] def get(self, request):
print(request.user)
print(request.auth)
return HttpResponse(json.dumps({'name': 'Python'}))

认证组件
1 写一个认证类
from rest_framework.authentication import BaseAuthentication
class MyAuth(BaseAuthentication):
def authenticate(self,request):
# request 是封装后的
token = request.query_params.get('token')
ret = models.UserToken.objects.filter(token=token).first()
if ret:
# 认证通过
return
else:
raise AuthenticationFailed('认证失败')
#可以不写了
def authenticate_header(self,ss):
pass
2 局部使用
authentication_classes=[MyAuth,MyAuth2]
3 全局使用
查找顺序:自定义的APIView里找---》项目settings里找---》内置默认的
REST_FRAMEWORK={
'DEFAULT_AUTHENTICATION_CLASSES':['utils.common.MyAuth',] } 认证:
 import hashlib
import time class MyResponse():
def __init__(self):
self.status = 1001
self.msg = None def get_dic(self):
return self.__dict__ # res = MyResponse()
#
# print(res.get_dic()) def get_token(name):
md = hashlib.md5()
md.update(name.encode('utf-8'))
md.update(str(time.time()).encode('utf-8')) return md.hexdigest() # 返回的是包含name和time的hash值 from rest_framework.authentication import BaseAuthentication
from app01 import models
from rest_framework.exceptions import APIException, AuthenticationFailed class MyAuth(BaseAuthentication):
def authenticate(self, request):
token = request.query_params.get('token')
ret = models.UserToken.objects.filter(token=token).first()
if ret: return ret.user,ret
else: raise AuthenticationFailed("认证失败")
权限组件
1 写一个类
class MyPermission():
def has_permission(self,request,view):
token=request.query_params.get('token')
ret=models.UserToken.objects.filter(token=token).first()
if ret.user.type==2:
# 超级用户可以访问
return True
else:
return False
2 局部使用:
permission_classes=[MyPermission,]
3 全局使用:
REST_FRAMEWORK={
'DEFAULT_PERMISSION_CLASSES':['utils.common.MyPermission',]
} 权限组件:
 from rest_framework.permissions import BasePermission

 class MyPermission(BasePermission):
message = "不是超级用户,查看不了"
def has_permission(self, request, view):
token = request.query_params.get("token")
ret=models.UserToken.objects.filter(token=token).first()
print(ret.user.get_user_type_display())
if ret.user.user_type == 1:
return True
else:
return False

频率组件
1 写一个类:
from rest_framework.throttling import SimpleRateThrottle
class VisitThrottle(SimpleRateThrottle):
scope = 'xxx'
def get_cache_key(self, request, view):
return self.get_ident(request)
2 在setting里配置:
'DEFAULT_THROTTLE_RATES':{
'xxx':'5/h',
}
3 局部使用
throttle_classes=[VisitThrottle,]
4 全局使用
REST_FRAMEWORK={
'DEFAULT_THROTTLE_CLASSES':['utils.common.MyPermission',]
}
访问频率组件:
 from rest_framework.throttling import SimpleRateThrottle

 class VisitThrottle(SimpleRateThrottle):
scope = 'da peng ya' def get_cache_key(self, request, view):
return self.get_ident(request)

作业:
1 认证类,不存数据库的token验证
token='idfjasfsadfas|userid'
 def get_token(id,salt=''):
import hashlib
md=hashlib.md5()
md.update(bytes(str(id),encoding='utf-8'))
md.update(bytes(salt,encoding='utf-8')) return md.hexdigest()+'|'+str(id) def check_token(token,salt=''):
ll=token.split('|')
import hashlib
md=hashlib.md5()
md.update(bytes(ll[-1],encoding='utf-8'))
md.update(bytes(salt,encoding='utf-8'))
if ll[0]==md.hexdigest():
return True
else:
return False class TokenAuth():
def authenticate(self, request):
token = request.GET.get('token')
success=check_token(token)
if success:
return
else:
raise AuthenticationFailed('认证失败')
def authenticate_header(self,request):
pass
class Login(APIView):
def post(self,reuquest):
back_msg={'status':1001,'msg':None}
try:
name=reuquest.data.get('name')
pwd=reuquest.data.get('pwd')
user=models.User.objects.filter(username=name,password=pwd).first()
if user:
token=get_token(user.pk)
# models.UserToken.objects.update_or_create(user=user,defaults={'token':token})
back_msg['status']=''
back_msg['msg']='登录成功'
back_msg['token']=token
else:
back_msg['msg'] = '用户名或密码错误'
except Exception as e:
back_msg['msg']=str(e)
return Response(back_msg)
from rest_framework.authentication import BaseAuthentication
class TokenAuth():
def authenticate(self, request):
token = request.GET.get('token')
token_obj = models.UserToken.objects.filter(token=token).first()
if token_obj:
return
else:
raise AuthenticationFailed('认证失败')
def authenticate_header(self,request):
pass class Course(APIView):
authentication_classes = [TokenAuth, ] def get(self, request):
return HttpResponse('get') def post(self, request):
return HttpResponse('post')
2 自己写一个每分钟限制三次访问的频率类
ip:拿到ip
全局变量:{ip:[时间1,时间2,时间3]} 获取IP
     def get(self, request):
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
ip = x_forwarded_for.split(',')[0] # 所以这里是真实的ip
else:
ip = request.META.get('REMOTE_ADDR') # 这里获得代理ip
return ip

待补充。。


05-11 20:20