<div id="cnblogs_post_body" class="blogpost-body">
一、认证和授权
a. 用户url传入的token认证
 
django.conf.urls urlpatterns =<span style="color: #000000;"> [
url(r<span style="color: #800000;">'<span style="color: #800000;">^test/<span style="color: #800000;">'<span style="color: #000000;">,TestView.as_view()),]
 
rest_framework.views rest_framework.response rest_framework.authentication rest_framework.permissions <span style="color: #0000ff;">from rest_framework.request <span style="color: #0000ff;">import<span style="color: #000000;"> Request
<span style="color: #0000ff;">from rest_framework <span style="color: #0000ff;">import<span style="color: #000000;"> exceptions
token_list =<span style="color: #000000;"> [
<span style="color: #800000;">'<span style="color: #800000;">sfsfss123kuf3j123<span style="color: #800000;">'<span style="color: #000000;">,<span style="color: #800000;">'<span style="color: #800000;">asijnfowerkkf9812<span style="color: #800000;">'<span style="color: #000000;">,]
<span style="color: #0000ff;">class<span style="color: #000000;"> TestAuthentication(BaseAuthentication):
<span style="color: #0000ff;">def<span style="color: #000000;"> authenticate(self,request):
<span style="color: #800000;">"""<span style="color: #800000;">
用户认证,如果验证成功后返回元组: (用户,用户Token)
:param request:
:return:
None,表示跳过该验证;
如果跳过了所有认证,默认用户和Token和使用配置文件进行设置
self._authenticator = None
if api_settings.UNAUTHENTICATED_USER:
self.user = api_settings.UNAUTHENTICATED_USER() # 默认值为:匿名用户
else:
self.user = None
if api_settings.UNAUTHENTICATED_TOKEN:
self.auth = api_settings.UNAUTHENTICATED_TOKEN()# 默认值为:None
else:
self.auth = None
(user,token)表示验证通过并设置用户名和Token;
AuthenticationFailed异常
</span><span style="color: #800000;">"""</span><span style="color: #000000;">
val </span>= request.query_params.get(<span style="color: #800000;">'</span><span style="color: #800000;">token</span><span style="color: #800000;">'</span><span style="color: #000000;">)
</span><span style="color: #0000ff;">if</span> val <span style="color: #0000ff;">not</span> <span style="color: #0000ff;">in</span><span style="color: #000000;"> token_list:
</span><span style="color: #0000ff;">raise</span> exceptions.AuthenticationFailed(<span style="color: #800000;">"</span><span style="color: #800000;">用户认证失败</span><span style="color: #800000;">"</span><span style="color: #000000;">)
</span><span style="color: #0000ff;">return</span> (<span style="color: #800000;">'</span><span style="color: #800000;">登录用户</span><span style="color: #800000;">'</span>,<span style="color: #800000;">'</span><span style="color: #800000;">用户token</span><span style="color: #800000;">'</span><span style="color: #000000;">)
</span><span style="color: #0000ff;">def</span><span style="color: #000000;"> authenticate_header(self,request):
</span><span style="color: #800000;">"""</span><span style="color: #800000;">
Return a string to be used as the value of the `WWW-Authenticate`
header in a `401 Unauthenticated` response,or `None` if the
authentication scheme should return `403 Permission Denied` responses.
</span><span style="color: #800000;">"""</span>
<span style="color: #008000;">#</span><span style="color: #008000;"> 验证失败时,返回的响应头WWW-Authenticate对应的值</span>
<span style="color: #0000ff;">pass</span>
<span style="color: #0000ff;">class<span style="color: #000000;"> TestView(APIView):
<span style="color: #008000;">#<span style="color: #008000;"> 认证的动作是由request.user触发
authentication_classes =<span style="color: #000000;"> [TestAuthentication,]
</span><span style="color: #008000;">#</span><span style="color: #008000;"> 权限</span>
<span style="color: #008000;">#</span><span style="color: #008000;"> 循环执行所有的权限</span>
permission_classes =<span style="color: #000000;"> [TestPermission,]
</span><span style="color: #0000ff;">def</span> get(self,request,*args,**<span style="color: #000000;">kwargs):
</span><span style="color: #008000;">#</span><span style="color: #008000;"> self.dispatch</span>
<span style="color: #0000ff;">print</span><span style="color: #000000;">(request.user)
</span><span style="color: #0000ff;">print</span><span style="color: #000000;">(request.auth)
</span><span style="color: #0000ff;">return</span> Response(<span style="color: #800000;">'</span><span style="color: #800000;">GET请求,响应内容</span><span style="color: #800000;">'</span><span style="color: #000000;">)
</span><span style="color: #0000ff;">def</span> post(self,**<span style="color: #000000;">kwargs):
</span><span style="color: #0000ff;">return</span> Response(<span style="color: #800000;">'</span><span style="color: #800000;">POST请求,响应内容</span><span style="color: #800000;">'</span><span style="color: #000000;">)
</span><span style="color: #0000ff;">def</span> put(self,**<span style="color: #000000;">kwargs):
</span><span style="color: #0000ff;">return</span> Response(<span style="color: #800000;">'</span><span style="color: #800000;">PUT请求,响应内容</span><span style="color: #800000;">'</span><span style="color: #000000;">)
views.py
b. 请求头认证
 
django.conf.urls
 
rest_framework.views rest_framework.response rest_framework.authentication rest_framework.request rest_framework token_list =<span style="color: #000000;"> [
<span style="color: #800000;">'<span style="color: #800000;">sfsfss123kuf3j123<span style="color: #800000;">'<span style="color: #000000;">,表示跳过该验证;
如果跳过了所有认证,默认用户和Token和使用配置文件进行设置
self._authenticator = None
if api_settings.UNAUTHENTICATED_USER:
self.user = api_settings.UNAUTHENTICATED_USER()
else:
self.user = None
if api_settings.UNAUTHENTICATED_TOKEN:
self.auth = api_settings.UNAUTHENTICATED_TOKEN()
else:
self.auth = None
(user,token)表示验证通过并设置用户名和Token;
AuthenticationFailed异常
</span><span style="color: #800000;">"""</span>
<span style="color: #0000ff;">import</span><span style="color: #000000;"> base64
auth </span>= request.META.get(<span style="color: #800000;">'</span><span style="color: #800000;">HTTP_AUTHORIZATION</span><span style="color: #800000;">'</span>,b<span style="color: #800000;">''</span><span style="color: #000000;">)
</span><span style="color: #0000ff;">if</span><span style="color: #000000;"> auth:
auth </span>= auth.encode(<span style="color: #800000;">'</span><span style="color: #800000;">utf-8</span><span style="color: #800000;">'</span><span style="color: #000000;">)
auth </span>=<span style="color: #000000;"> auth.split()
</span><span style="color: #0000ff;">if</span> <span style="color: #0000ff;">not</span> auth <span style="color: #0000ff;">or</span> auth[0].lower() != b<span style="color: #800000;">'</span><span style="color: #800000;">basic</span><span style="color: #800000;">'</span><span style="color: #000000;">:
</span><span style="color: #0000ff;">raise</span> exceptions.AuthenticationFailed(<span style="color: #800000;">'</span><span style="color: #800000;">验证失败</span><span style="color: #800000;">'</span><span style="color: #000000;">)
</span><span style="color: #0000ff;">if</span> len(auth) != 2<span style="color: #000000;">:
</span><span style="color: #0000ff;">raise</span> exceptions.AuthenticationFailed(<span style="color: #800000;">'</span><span style="color: #800000;">验证失败</span><span style="color: #800000;">'</span><span style="color: #000000;">)
username,part,password </span>= base64.b64decode(auth[1]).decode(<span style="color: #800000;">'</span><span style="color: #800000;">utf-8</span><span style="color: #800000;">'</span>).partition(<span style="color: #800000;">'</span><span style="color: #800000;">:</span><span style="color: #800000;">'</span><span style="color: #000000;">)
</span><span style="color: #0000ff;">if</span> username == <span style="color: #800000;">'</span><span style="color: #800000;">alex</span><span style="color: #800000;">'</span> <span style="color: #0000ff;">and</span> password == <span style="color: #800000;">'</span><span style="color: #800000;">123</span><span style="color: #800000;">'</span><span style="color: #000000;">:
</span><span style="color: #0000ff;">return</span> (<span style="color: #800000;">'</span><span style="color: #800000;">登录用户</span><span style="color: #800000;">'</span>,<span style="color: #800000;">'</span><span style="color: #800000;">用户token</span><span style="color: #800000;">'</span><span style="color: #000000;">)
</span><span style="color: #0000ff;">else</span><span style="color: #000000;">:
</span><span style="color: #0000ff;">raise</span> exceptions.AuthenticationFailed(<span style="color: #800000;">'</span><span style="color: #800000;">用户名或密码错误</span><span style="color: #800000;">'</span><span style="color: #000000;">)
</span><span style="color: #0000ff;">def</span><span style="color: #000000;"> authenticate_header(self,or `None` if the
authentication scheme should return `403 Permission Denied` responses.
</span><span style="color: #800000;">"""</span>
<span style="color: #0000ff;">return</span> <span style="color: #800000;">'</span><span style="color: #800000;">Basic realm=api</span><span style="color: #800000;">'</span>
<span style="color: #0000ff;">class <span style="color: #000000;"> TestView(APIView):
authentication_classes =<span style="color: #000000;"> [TestAuthentication,]
permission_classes =<span style="color: #000000;"> []
</span><span style="color: #0000ff;">def</span> get(self,**<span style="color: #000000;">kwargs):
</span><span style="color: #0000ff;">print</span><span style="color: #000000;">(request.user)
</span><span style="color: #0000ff;">print</span><span style="color: #000000;">(request.auth)
</span><span style="color: #0000ff;">return</span> Response(<span style="color: #800000;">'</span><span style="color: #800000;">GET请求,响应内容</span><span style="color: #800000;">'</span><span style="color: #000000;">)
</span><span style="color: #0000ff;">def</span> post(self,**<span style="color: #000000;">kwargs):
</span><span style="color: #0000ff;">return</span> Response(<span style="color: #800000;">'</span><span style="color: #800000;">PUT请求,响应内容</span><span style="color: #800000;">'</span>)</pre>

c.多个认证规则
 
django.conf.urls web.views.s2_auth urlpatterns =<span style="color: #000000;"> [
url(r<span style="color: #800000;">'<span style="color: #800000;">^test/<span style="color: #800000;">'<span style="color: #000000;">,]
 
rest_framework.views rest_framework.response rest_framework.authentication rest_framework.request rest_framework token_list =<span style="color: #000000;"> [
<span style="color: #800000;">'<span style="color: #800000;">sfsfss123kuf3j123<span style="color: #800000;">'<span style="color: #000000;">,]
<span style="color: #0000ff;">class<span style="color: #000000;"> Test1Authentication(BaseAuthentication):
<span style="color: #0000ff;">def<span style="color: #000000;"> authenticate(self,表示跳过该验证;
如果跳过了所有认证,默认用户和Token和使用配置文件进行设置
self._authenticator = None
if api_settings.UNAUTHENTICATED_USER:
self.user = api_settings.UNAUTHENTICATED_USER() # 默认值为:匿名用户
else:
self.user = None
if api_settings.UNAUTHENTICATED_TOKEN:
self.auth = api_settings.UNAUTHENTICATED_TOKEN()# 默认值为:None
else:
self.auth = None
(user,b<span style="color: #800000;">''</span><span style="color: #000000;">)
</span><span style="color: #0000ff;">if</span><span style="color: #000000;"> auth:
auth </span>= auth.encode(<span style="color: #800000;">'</span><span style="color: #800000;">utf-8</span><span style="color: #800000;">'</span><span style="color: #000000;">)
</span><span style="color: #0000ff;">else</span><span style="color: #000000;">:
</span><span style="color: #0000ff;">return</span><span style="color: #000000;"> None
</span><span style="color: #0000ff;">print</span>(auth,<span style="color: #800000;">'</span><span style="color: #800000;">xxxx</span><span style="color: #800000;">'</span><span style="color: #000000;">)
auth </span>=<span style="color: #000000;"> auth.split()
</span><span style="color: #0000ff;">if</span> <span style="color: #0000ff;">not</span> auth <span style="color: #0000ff;">or</span> auth[0].lower() != b<span style="color: #800000;">'</span><span style="color: #800000;">basic</span><span style="color: #800000;">'</span><span style="color: #000000;">:
</span><span style="color: #0000ff;">raise</span> exceptions.AuthenticationFailed(<span style="color: #800000;">'</span><span style="color: #800000;">验证失败</span><span style="color: #800000;">'</span><span style="color: #000000;">)
</span><span style="color: #0000ff;">if</span> len(auth) != 2<span style="color: #000000;">:
</span><span style="color: #0000ff;">raise</span> exceptions.AuthenticationFailed(<span style="color: #800000;">'</span><span style="color: #800000;">验证失败</span><span style="color: #800000;">'</span><span style="color: #000000;">)
username,or `None` if the
authentication scheme should return `403 Permission Denied` responses.
</span><span style="color: #800000;">"""</span>
<span style="color: #008000;">#</span><span style="color: #008000;"> return 'Basic realm=api'</span>
<span style="color: #0000ff;">pass</span>
<span style="color: #0000ff;">class <span style="color: #000000;"> Test2Authentication(BaseAuthentication):
<span style="color: #0000ff;">def<span style="color: #000000;"> authenticate(self,or None if the
authentication scheme should return 403 Permission Denied responses.
<span style="color: #800000;">"""
<span style="color: #0000ff;">pass
<span style="color: #0000ff;">class <span style="color: #000000;"> TestView(APIView):
authentication_classes =<span style="color: #000000;"> [Test1Authentication,Test2Authentication]
permission_classes =<span style="color: #000000;"> []
</span><span style="color: #0000ff;">def</span> get(self,**<span style="color: #000000;">kwargs):
</span><span style="color: #0000ff;">return</span> Response(<span style="color: #800000;">'</span><span style="color: #800000;">PUT请求,响应内容</span><span style="color: #800000;">'</span><span style="color: #000000;">)
views.py
d.认证和权限
 
django.conf.urls web.views urlpatterns =<span style="color: #000000;"> [
url(r<span style="color: #800000;">'<span style="color: #800000;">^test/<span style="color: #800000;">'<span style="color: #000000;">,]
 
rest_framework.views rest_framework.response rest_framework.authentication rest_framework.permissions <span style="color: #0000ff;">from rest_framework.request <span style="color: #0000ff;">import<span style="color: #000000;"> Request
<span style="color: #0000ff;">from rest_framework <span style="color: #0000ff;">import<span style="color: #000000;"> exceptions
token_list =<span style="color: #000000;"> [
<span style="color: #800000;">'<span style="color: #800000;">sfsfss123kuf3j123<span style="color: #800000;">'<span style="color: #000000;">,or None if the
authentication scheme should return 403 Permission Denied responses.
<span style="color: #800000;">"""
<span style="color: #0000ff;">pass
<span style="color: #0000ff;">class <span style="color: #000000;"> TestPermission(BasePermission):
message = <span style="color: #800000;">"<span style="color: #800000;">权限验证失败<span style="color: #800000;">"
<span style="color: #0000ff;">def</span><span style="color: #000000;"> has_permission(self,view):
</span><span style="color: #800000;">"""</span><span style="color: #800000;">
判断是否有权限访问当前请求
Return `True` if permission is granted,`False` otherwise.
:param request:
:param view:
:return: True有权限;False无权限
</span><span style="color: #800000;">"""</span>
<span style="color: #0000ff;">if</span> request.user == <span style="color: #800000;">"</span><span style="color: #800000;">管理员</span><span style="color: #800000;">"</span><span style="color: #000000;">:
</span><span style="color: #0000ff;">return</span><span style="color: #000000;"> True
</span><span style="color: #008000;">#</span><span style="color: #008000;"> GenericAPIView中get_object时调用</span>
<span style="color: #0000ff;">def</span><span style="color: #000000;"> has_object_permission(self,view,obj):
</span><span style="color: #800000;">"""</span><span style="color: #800000;">
视图继承GenericAPIView,并在其中使用get_object时获取对象时,触发单独对象权限验证
Return `True` if permission is granted,`False` otherwise.
:param request:
:param view:
:param obj:
:return: True有权限;False无权限
</span><span style="color: #800000;">"""</span>
<span style="color: #0000ff;">if</span> request.user == <span style="color: #800000;">"</span><span style="color: #800000;">管理员</span><span style="color: #800000;">"</span><span style="color: #000000;">:
</span><span style="color: #0000ff;">return</span><span style="color: #000000;"> True
<span style="color: #0000ff;">class<span style="color: #000000;"> TestView(APIView):
<span style="color: #008000;">#<span style="color: #008000;"> 认证的动作是由request.user触发
authentication_classes =<span style="color: #000000;"> [TestAuthentication,**<span style="color: #000000;">kwargs):
<span style="color: #0000ff;">return Response(<span style="color: #800000;">'<span style="color: #800000;">PUT请求,响应内容<span style="color: #800000;">')
e.全局使用
上述操作中均是对单独视图进行特殊配置,如果想要对全局进行配置,则需要再配置文件中写入即可。
 
REST_FRAMEWORK =
 
django.conf.urls urls.py
 
rest_framework.views rest_framework.response <span style="color: #0000ff;">class<span style="color: #000000;"> TestView(APIView):
</span><span style="color: #0000ff;">def</span> get(self,**<span style="color: #000000;">kwargs):
</span><span style="color: #0000ff;">return</span> Response(<span style="color: #800000;">'</span><span style="color: #800000;">PUT请求,响应内容</span><span style="color: #800000;">'</span>)</pre>
g.自定义认证工作
 
token = request.query_params.get()
token == (,)
APIException( authenticate_header(self,request):
authentication_classes = get(self,** Response()

1、需求:Host是匿名用户和用户都能访问 #匿名用户的request.user = none;User只有注册用户能访问
 
app03 django.conf.urls urlpatterns =
url( url(r url(r url(r ]
 
django.shortcuts rest_framework.views APIView
rest_framework.response Response
rest_framework.authentication BaseAuthentication
rest_framework.authentication app01 rest_framework rest_framework.permissions AllowAny
rest_framework.throttling
token = request.query_params.get( obj = models.UserInfo.objects.filter(token= obj :
None
message =
has_permission(self,view):
True
False
message =
has_permission(self,view):
request.user== True
False
authentication_classes = []
authentication_classes = permission_classes = []
Response(
authentication_classes = permission_classes = (request.user, Response(
permission_denied(self,message=
request.authenticators
exceptions.NotAuthenticated(detail= exceptions.PermissionDenied(detail=message)
 
message =
authentication_classes = [MyAuthentication,]
permission_classes = [MyPermission,AdminPermission,]
Response(
permission_denied(self,message=
request.authenticators
exceptions.NotAuthenticated(detail= exceptions.PermissionDenied(detail=message)
如果遇上下面这样的情况,是因为没有通过认证,并且权限中return False了,可以自定制错误信息为中文,参考源码

permission
=getattr(permission,
permission_denied(self,message=
request.authenticators
exceptions.PermissionDenied(detail=message)
那么我们可以重写permission_denied这个方法,如下:
 
authentication_classes = permission_classes = Response(
permission_denied(self,message=
request.authenticators
exceptions.NotAuthenticated(detail= exceptions.PermissionDenied(detail=message)

2.全局使用
上述操作中均是对单独视图进行特殊配置,如果想要对全局进行配置,则需要再配置文件中写入即可。
 
REST_FRAMEWORK = : None,
,
}
 
authentication_classes = []
authentication_classes = permission_classes = []
Response(
authentication_classes = permission_classes = (request.user, Response(
permission_denied(self,message=
request.authenticators
exceptions.NotAuthenticated(detail= exceptions.PermissionDenied(detail=
message =
authentication_classes = [MyAuthentication,]
permission_classes = [MyPermission,]
Response(
permission_denied(self,message=
request.authenticators
exceptions.NotAuthenticated(detail= exceptions.PermissionDenied(detail=message)
用户访问次数/频率限制
1、为什么要限流呢
答:
- - 第一点:爬虫,反爬
- - 第二点:控制 API 访问次数
- - 登录用户的用户名可以做标识
- 匿名用户可以参考 ip,但是 ip可以加代理。
2、限制访问频率源码分析
 
self.check_throttles(request)
 
throttle
 
[throttle() throttle self.throttle_classes]
 
throttle_classes = api_settings.DEFAULT_THROTTLE_CLASSES
 
NotImplementedError(
0. If not use all of
xff = request.META.get( remote_addr = request.META.get( num_proxies =
num_proxies num_proxies == 0 xff addrs = xff.split( client_addr = addrs[-
.join(xff.split()) xff
None
 
zz
 
exceptions.Throttled(wait)
 
status_code = default_detail = _( extra_detail_singular =
extra_detail_plural =
default_code =
(self,wait=None,detail=None,code= detail detail = wait wait = detail = force_text(ungettext(self.extra_detail_singular.format(wait= self.extra_detail_plural.format(wait= self.wait = super(Throttled,self).(detail,code)
3.方法
a. 基于用户IP限制访问频率
 
django.conf.urls
 
rest_framework.views rest_framework.response <span style="color: #0000ff;">from rest_framework <span style="color: #0000ff;">import<span style="color: #000000;"> exceptions
<span style="color: #0000ff;">from rest_framework.throttling <span style="color: #0000ff;">import<span style="color: #000000;"> BaseThrottle
<span style="color: #0000ff;">from rest_framework.settings <span style="color: #0000ff;">import<span style="color: #000000;"> api_settings
<span style="color: #008000;">#<span style="color: #008000;"> 保存访问记录
RECORD =<span style="color: #000000;"> {
<span style="color: #800000;">'<span style="color: #800000;">用户IP<span style="color: #800000;">': [12312139,12312135,12312133<span style="color: #000000;">,]
}
<span style="color: #0000ff;">class<span style="color: #000000;"> TestThrottle(BaseThrottle):
ctime =<span style="color: #000000;"> time.time
</span><span style="color: #0000ff;">def</span><span style="color: #000000;"> get_ident(self,request):
</span><span style="color: #800000;">"""</span><span style="color: #800000;">
根据用户IP和代理IP,当做请求者的唯一IP
Identify the machine making the request by parsing HTTP_X_FORWARDED_FOR
if present and number of proxies is > 0. If not use all of
HTTP_X_FORWARDED_FOR if it is available,if not use REMOTE_ADDR.
</span><span style="color: #800000;">"""</span><span style="color: #000000;">
xff </span>= request.META.get(<span style="color: #800000;">'</span><span style="color: #800000;">HTTP_X_FORWARDED_FOR</span><span style="color: #800000;">'</span><span style="color: #000000;">)
remote_addr </span>= request.META.get(<span style="color: #800000;">'</span><span style="color: #800000;">REMOTE_ADDR</span><span style="color: #800000;">'</span><span style="color: #000000;">)
num_proxies </span>=<span style="color: #000000;"> api_settings.NUM_PROXIES
</span><span style="color: #0000ff;">if</span> num_proxies <span style="color: #0000ff;">is</span> <span style="color: #0000ff;">not</span><span style="color: #000000;"> None:
</span><span style="color: #0000ff;">if</span> num_proxies == 0 <span style="color: #0000ff;">or</span> xff <span style="color: #0000ff;">is</span><span style="color: #000000;"> None:
</span><span style="color: #0000ff;">return</span><span style="color: #000000;"> remote_addr
addrs </span>= xff.split(<span style="color: #800000;">'</span><span style="color: #800000;">,</span><span style="color: #800000;">'</span><span style="color: #000000;">)
client_addr </span>= addrs[-<span style="color: #000000;">min(num_proxies,len(addrs))]
</span><span style="color: #0000ff;">return</span><span style="color: #000000;"> client_addr.strip()
</span><span style="color: #0000ff;">return</span> <span style="color: #800000;">''</span>.join(xff.split()) <span style="color: #0000ff;">if</span> xff <span style="color: #0000ff;">else</span><span style="color: #000000;"> remote_addr
</span><span style="color: #0000ff;">def</span><span style="color: #000000;"> allow_request(self,view):
</span><span style="color: #800000;">"""</span><span style="color: #800000;">
是否仍然在允许范围内
Return `True` if the request should be allowed,`False` otherwise.
:param request:
:param view:
:return: True,表示可以通过;False表示已超过限制,不允许访问
</span><span style="color: #800000;">"""</span>
<span style="color: #008000;">#</span><span style="color: #008000;"> 获取用户唯一标识(如:IP)</span>
<span style="color: #008000;">#</span><span style="color: #008000;"> 允许一分钟访问10次</span>
num_request = 10<span style="color: #000000;">
time_request </span>= 60<span style="color: #000000;">
now </span>=<span style="color: #000000;"> self.ctime()
ident </span>=<span style="color: #000000;"> self.get_ident(request)
self.ident </span>=<span style="color: #000000;"> ident
</span><span style="color: #0000ff;">if</span> ident <span style="color: #0000ff;">not</span> <span style="color: #0000ff;">in</span><span style="color: #000000;"> RECORD:
RECORD[ident] </span>=<span style="color: #000000;"> [now,]
</span><span style="color: #0000ff;">return</span><span style="color: #000000;"> True
history </span>=<span style="color: #000000;"> RECORD[ident]
</span><span style="color: #0000ff;">while</span> history <span style="color: #0000ff;">and</span> history[-1] <= now -<span style="color: #000000;"> time_request:
history.pop()
</span><span style="color: #0000ff;">if</span> len(history) <<span style="color: #000000;"> num_request:
history.insert(0,now)
</span><span style="color: #0000ff;">return</span><span style="color: #000000;"> True
</span><span style="color: #0000ff;">def</span><span style="color: #000000;"> wait(self):
</span><span style="color: #800000;">"""</span><span style="color: #800000;">
多少秒后可以允许继续访问
Optionally,return a recommended number of seconds to wait before
the next request.
</span><span style="color: #800000;">"""</span><span style="color: #000000;">
last_time </span>=<span style="color: #000000;"> RECORD[self.ident][0]
now </span>=<span style="color: #000000;"> self.ctime()
</span><span style="color: #0000ff;">return</span> int(60 + last_time -<span style="color: #000000;"> now)
<span style="color: #0000ff;">class<span style="color: #000000;"> TestView(APIView):
throttle_classes =<span style="color: #000000;"> [TestThrottle,**<span style="color: #000000;">kwargs):
<span style="color: #0000ff;">return Response(<span style="color: #800000;">'<span style="color: #800000;">PUT请求,响应内容<span style="color: #800000;">'<span style="color: #000000;">)
</span><span style="color: #0000ff;">def</span><span style="color: #000000;"> throttled(self,wait):
</span><span style="color: #800000;">"""</span><span style="color: #800000;">
访问次数被限制时,定制错误信息
</span><span style="color: #800000;">"""</span>
<span style="color: #0000ff;">class</span><span style="color: #000000;"> Throttled(exceptions.Throttled):
default_detail </span>= <span style="color: #800000;">'</span><span style="color: #800000;">请求被限制.</span><span style="color: #800000;">'</span><span style="color: #000000;">
extra_detail_singular </span>= <span style="color: #800000;">'</span><span style="color: #800000;">请 {wait} 秒之后再重试.</span><span style="color: #800000;">'</span><span style="color: #000000;">
extra_detail_plural </span>= <span style="color: #800000;">'</span><span style="color: #800000;">请 {wait} 秒之后再重试.</span><span style="color: #800000;">'</span>
<span style="color: #0000ff;">raise</span> Throttled(wait)</pre>
b. 基于用户IP显示访问频率(利于Django缓存)
 
REST_FRAMEWORK =:
 
django.conf.urls
 
rest_framework.views rest_framework.response <span style="color: #0000ff;">from rest_framework <span style="color: #0000ff;">import<span style="color: #000000;"> exceptions
<span style="color: #0000ff;">from rest_framework.throttling <span style="color: #0000ff;">import<span style="color: #000000;"> SimpleRateThrottle
<span style="color: #0000ff;">class<span style="color: #000000;"> TestThrottle(SimpleRateThrottle):
</span><span style="color: #008000;">#</span><span style="color: #008000;"> 配置文件定义的显示频率的Key</span>
scope = <span style="color: #800000;">"</span><span style="color: #800000;">test_scope</span><span style="color: #800000;">"</span>
<span style="color: #0000ff;">def</span><span style="color: #000000;"> get_cache_key(self,view):
</span><span style="color: #800000;">"""</span><span style="color: #800000;">
Should return a unique cache-key which can be used for throttling.
Must be overridden.
May return `None` if the request should not be throttled.
</span><span style="color: #800000;">"""</span>
<span style="color: #0000ff;">if</span> <span style="color: #0000ff;">not</span><span style="color: #000000;"> request.user:
ident </span>=<span style="color: #000000;"> self.get_ident(request)
</span><span style="color: #0000ff;">else</span><span style="color: #000000;">:
ident </span>=<span style="color: #000000;"> request.user
</span><span style="color: #0000ff;">return</span> self.cache_format %<span style="color: #000000;"> {
</span><span style="color: #800000;">'</span><span style="color: #800000;">scope</span><span style="color: #800000;">'</span><span style="color: #000000;">: self.scope,</span><span style="color: #800000;">'</span><span style="color: #800000;">ident</span><span style="color: #800000;">'</span><span style="color: #000000;">: ident
}
<span style="color: #0000ff;">class<span style="color: #000000;"> TestView(APIView):
throttle_classes =<span style="color: #000000;"> [TestThrottle,wait):
<span style="color: #800000;">"""<span style="color: #800000;">
访问次数被限制时,定制错误信息
<span style="color: #800000;">"""
<span style="color: #0000ff;">class</span><span style="color: #000000;"> Throttled(exceptions.Throttled):
default_detail </span>= <span style="color: #800000;">'</span><span style="color: #800000;">请求被限制.</span><span style="color: #800000;">'</span><span style="color: #000000;">
extra_detail_singular </span>= <span style="color: #800000;">'</span><span style="color: #800000;">请 {wait} 秒之后再重试.</span><span style="color: #800000;">'</span><span style="color: #000000;">
extra_detail_plural </span>= <span style="color: #800000;">'</span><span style="color: #800000;">请 {wait} 秒之后再重试.</span><span style="color: #800000;">'</span>
<span style="color: #0000ff;">raise</span> Throttled(wait)</pre>
c. view中限制请求频率
 
REST_FRAMEWORK =:
 
django.conf.urls
 
rest_framework.views rest_framework.response <span style="color: #0000ff;">from rest_framework <span style="color: #0000ff;">import<span style="color: #000000;"> exceptions
<span style="color: #0000ff;">from rest_framework.throttling <span style="color: #0000ff;">import<span style="color: #000000;"> ScopedRateThrottle
<span style="color: #008000;">#<span style="color: #008000;"> 继承 ScopedRateThrottle
<span style="color: #0000ff;">class<span style="color: #000000;"> TestThrottle(ScopedRateThrottle):
</span><span style="color: #0000ff;">def</span><span style="color: #000000;"> get_cache_key(self,]
</span><span style="color: #008000;">#</span><span style="color: #008000;"> 在settings中获取 xxxxxx 对应的频率限制值</span>
throttle_scope = <span style="color: #800000;">"</span><span style="color: #800000;">xxxxxx</span><span style="color: #800000;">"</span>
<span style="color: #0000ff;">def</span> get(self,wait):
</span><span style="color: #800000;">"""</span><span style="color: #800000;">
访问次数被限制时,定制错误信息
</span><span style="color: #800000;">"""</span>
<span style="color: #0000ff;">class</span><span style="color: #000000;"> Throttled(exceptions.Throttled):
default_detail </span>= <span style="color: #800000;">'</span><span style="color: #800000;">请求被限制.</span><span style="color: #800000;">'</span><span style="color: #000000;">
extra_detail_singular </span>= <span style="color: #800000;">'</span><span style="color: #800000;">请 {wait} 秒之后再重试.</span><span style="color: #800000;">'</span><span style="color: #000000;">
extra_detail_plural </span>= <span style="color: #800000;">'</span><span style="color: #800000;">请 {wait} 秒之后再重试.</span><span style="color: #800000;">'</span>
<span style="color: #0000ff;">raise</span> Throttled(wait)</pre>
d. 匿名时用IP限制+登录时用Token限制
 
REST_FRAMEWORK =: :
 
django.conf.urls web.views.s3_throttling urlpatterns =<span style="color: #000000;"> [
url(r<span style="color: #800000;">'<span style="color: #800000;">^test/<span style="color: #800000;">'<span style="color: #000000;">,]
 
rest_framework.views rest_framework.response <span style="color: #0000ff;">from rest_framework.throttling <span style="color: #0000ff;">import<span style="color: #000000;"> SimpleRateThrottle
<span style="color: #0000ff;">class<span style="color: #000000;"> LuffyAnonRateThrottle(SimpleRateThrottle):
<span style="color: #800000;">"""<span style="color: #800000;">
匿名用户,根据IP进行限制
<span style="color: #800000;">"""<span style="color: #000000;">
scope = <span style="color: #800000;">"<span style="color: #800000;">luffy_anon<span style="color: #800000;">"
<span style="color: #0000ff;">def</span><span style="color: #000000;"> get_cache_key(self,view):
</span><span style="color: #008000;">#</span><span style="color: #008000;"> 用户已登录,则跳过 匿名频率限制</span>
<span style="color: #0000ff;">if</span><span style="color: #000000;"> request.user:
</span><span style="color: #0000ff;">return</span><span style="color: #000000;"> None
</span><span style="color: #0000ff;">return</span> self.cache_format %<span style="color: #000000;"> {
</span><span style="color: #800000;">'</span><span style="color: #800000;">scope</span><span style="color: #800000;">'</span><span style="color: #000000;">: self.scope,</span><span style="color: #800000;">'</span><span style="color: #800000;">ident</span><span style="color: #800000;">'</span><span style="color: #000000;">: self.get_ident(request)
}
<span style="color: #0000ff;">class<span style="color: #000000;"> LuffyUserRateThrottle(SimpleRateThrottle):
<span style="color: #800000;">"""<span style="color: #800000;">
登录用户,根据用户token限制
<span style="color: #800000;">"""<span style="color: #000000;">
scope = <span style="color: #800000;">"<span style="color: #800000;">luffy_user<span style="color: #800000;">"
<span style="color: #0000ff;">def</span><span style="color: #000000;"> get_ident(self,request):
</span><span style="color: #800000;">"""</span><span style="color: #800000;">
认证成功时:request.user是用户对象;request.auth是token对象
:param request:
:return:
</span><span style="color: #800000;">"""</span>
<span style="color: #008000;">#</span><span style="color: #008000;"> return request.auth.token</span>
<span style="color: #0000ff;">return</span> <span style="color: #800000;">"</span><span style="color: #800000;">user_token</span><span style="color: #800000;">"</span>
<span style="color: #0000ff;">def</span><span style="color: #000000;"> get_cache_key(self,view):
</span><span style="color: #800000;">"""</span><span style="color: #800000;">
获取缓存key
:param request:
:param view:
:return:
</span><span style="color: #800000;">"""</span>
<span style="color: #008000;">#</span><span style="color: #008000;"> 未登录用户,则跳过 Token限制</span>
<span style="color: #0000ff;">if</span> <span style="color: #0000ff;">not</span><span style="color: #000000;"> request.user:
</span><span style="color: #0000ff;">return</span><span style="color: #000000;"> None
</span><span style="color: #0000ff;">return</span> self.cache_format %<span style="color: #000000;"> {
</span><span style="color: #800000;">'</span><span style="color: #800000;">scope</span><span style="color: #800000;">'</span><span style="color: #000000;">: self.scope,</span><span style="color: #800000;">'</span><span style="color: #800000;">ident</span><span style="color: #800000;">'</span><span style="color: #000000;">: self.get_ident(request)
}
<span style="color: #0000ff;">class<span style="color: #000000;"> TestView(APIView):
throttle_classes =<span style="color: #000000;"> [LuffyUserRateThrottle,LuffyAnonRateThrottle,**<span style="color: #000000;">kwargs):
<span style="color: #0000ff;">return Response(<span style="color: #800000;">'<span style="color: #800000;">PUT请求,响应内容<span style="color: #800000;">')
e. 全局使用
 
REST_FRAMEWORK =: : :
下面来看看最简单的从源码中分析的示例,这只是举例说明了一下
 
django.conf.urls app04 urlpatterns = url(
]
 
django.shortcuts rest_framework.views rest_framework.response rest_framework
1000
authentication_classes = []
permission_classes = []
throttle_classes =
Response(
default_detail =
extra_detail_singular =
extra_detail_plural =
default_code =
MyThrottle(wait)
需求:对匿名用户进行限制,每个用户一分钟允许访问10次(只针对用户来说)
a、基于用户IP限制访问频率
流程分析:
- 先获取用户信息,如果是匿名用户,获取IP。如果不是匿名用户就可以获取用户名。
- 获取匿名用户IP,在request里面获取,比如IP= 1.1.1.1。
- 吧获取到的IP添加到到recode字典里面,需要在添加之前先限制一下。
- 如果时间间隔大于60秒,说明时间久远了,就把那个时间给剔除 了pop。在timelist列表里面现在留的是有效的访问时间段。
- 然后判断他的访问次数超过了10次没有,如果超过了时间就return False。
- 美中不足的是时间是固定的,我们改变他为动态的:列表里面最开始进来的时间和当前的时间进行比较,看需要等多久。
具体实现:
 
django.shortcuts rest_framework.views rest_framework.response rest_framework rest_framework.throttling BaseThrottle,SimpleRateThrottle
RECORD =
ctime = ip =
ip RECORD[ip] =
time_list = RECORD[ip]
val = time_list[-1]
(ctime-60)>val:
len(time_list) >10 False
True
ctime = first_in_time = RECORD[][-1 wt = 60-(ctime-
authentication_classes = []
permission_classes = []
throttle_classes =
Response(
default_detail =
extra_detail_singular =
extra_detail_plural =
default_code =
MyThrottle(wait)
 
val: #吧时间大于60秒的给剔除了
10:
django.shortcuts rest_framework.views rest_framework.response rest_framework rest_framework.throttling BaseThrottle,SimpleRateThrottle
RECORD =
ctime = self.ip = self.ip RECORD[self.ip] =
time_list = RECORD[self.ip]
val = time_list[-1]
(ctime-60)>val:
len(time_list) >10 False |