DRF三大认证
目录
- DRF三大认证
- 身份认证
- 自定义认证类
- 权限控制
- 自定义权限类
- 限制频率
- 自定义频率类
身份认证
源码分析:
APIView–dispatch–initial()–perform_authentication()
def perform_authentication(self, request):
"""
Perform authentication on the incoming request.
Note that if you override this and simply 'pass', then authentication
will instead be performed lazily, the first time either
`request.user` or `request.auth` is accessed.
"""
# 调用Request的user属性
request.user
def _authenticate(self):
# 遍历认证器对request请求进行验证
"""
Attempt to authenticate the request using each authentication instance
in turn.
"""
# self.authenticators:Request的认证器组成的list
for authenticator in self.authenticators:
try:
*****重点*****
# 如果认证成功则返回由user和认证信息组成的元组
user_auth_tuple = authenticator.authenticate(self)
except exceptions.APIException:
# 认证失败则抛出异常
self._not_authenticated()
raise
# 对认证成功的数据进行处理
if user_auth_tuple is not None:
# 将认证器返回给self._authenticator
self._authenticator = authenticator
# 对认证请求的信息分别赋值给:登录用户、登录认证
self.user, self.auth = user_auth_tuple
return
# 如果未得到认证则调用该方法:设置未经身份验证的请求的身份验证器
self._not_authenticated()
def _not_authenticated(self):
"""
Set authenticator, user & authtoken representing an unauthenticated request.
Defaults are None, AnonymousUser & None.
"""
# 验证器为None,表示未经验证
self._authenticator = None
# 判断用户是否被认证过
if api_settings.UNAUTHENTICATED_USER:
# 将user设置为匿名用户
self.user = api_settings.UNAUTHENTICATED_USER()
else:
# 将user设置为空
self.user = None
# 判断token是否被认证过,是则将auth设为未经验证的令牌,否则为空
if api_settings.UNAUTHENTICATED_TOKEN:
self.auth = api_settings.UNAUTHENTICATED_TOKEN()
else:
self.auth = None
自定义认证类
-
继承BaseAuthentication类
-
重写**
authenticate(self, request)
**方法 -
认证规则基于的条件:
- 没有认证信息返回None(游客)
- 有认证信息认证失败抛异常(非法用户)
- 有认证信息认证成功返回用户与认证信息的元素(合法用户)
-
完成视图类的全局(settings.py)或局部(指定的视图类)配置
# MyValidate.py
from rest_framework.authentication import BaseAuthentication
from rest_framework.exceptions import AuthenticationFailed
from user.models import UserToken
class Authentication(BaseAuthentication):
def authenticate(self, request):
# 获取浏览器token
token = request.META.get('HTTP_TOKEN')
print(token)
# UserToken表中校验token是否存在
user_token = UserToken.objects.filter(token=token).first()
if user_token:
# 返回user和token
user = user_token.user
return user, token
else:
raise AuthenticationFailed('请登录')
局部认证
# views.py
from app.MyValidate import Authentication
class userinfo(APIView):
authentication_classes = [MyAuthentication]
全局认证
REST_FRAMEWORK = {
'DEFAULT_PERMISSION_CLASSES': [
'app.MyValidate.Authentication'
],
}
权限控制
源码分析:
APIView–dispatch–initial()–check_permissions()
def check_permissions(self, request):
# 检查请求是否有权限,没有则抛出异常
"""
Check if the request should be permitted.
Raises an appropriate exception if the request is not permitted.
"""
# 遍历权限对象列表依次进行权限校验
for permission in self.get_permissions():
if not permission.has_permission(request, self):
# has_permission:用来做权限认证的方法
# 参数:权限对象self、request、视图类对象
# 返回值:有权返回True 无权则False
self.permission_denied(
request,
message=getattr(permission, 'message', None),
code=getattr(permission, 'code', None)
)
def get_permissions(self):
# 实例化并返回此视图所需的权限列表
"""
Instantiates and returns the list of permissions that this view requires.
"""
return [permission() for permission in self.permission_classes]
def has_object_permission(self, request, view, obj):
"""
Return `True` if permission is granted, `False` otherwise.
"""
return True
自定义权限类
-
继承BasePermission类
-
重写**
has_permission(request, self)
**方法 -
认证规则基于的条件:
- 没有认证信息返回None(游客)
- 有认证信息认证失败抛异常(非法用户)
- 有认证信息认证成功返回用户与认证信息的元素(合法用户)
-
完成视图类的全局(settings.py)或局部(指定的视图类)配置
# MyValidate.py
class Permission(BasePermission):
# 只有超级用户有权限
def has_permission(self, request, view):
level = UserInfo.objects.get(username=request.user).level
if level < 3:
return False
return True
# models.py
class UserInfo(AbstractUser):
level = models.IntegerField(choices=[(1, '普通用户'), (2, '普通管理员'), (3, '超级管理员')], default=1)
局部认证
# views.py
from app.MyValidate import Permission
class book(APIView):
permission_classes = [Permission]
全局认证
# 权限类配置
REST_FRAMEWORK = {
'DEFAULT_PERMISSION_CLASSES': [
'app.MyValidate.Permission'
],
}
限制频率
源码分析:
APIView–dispatch–initial()–check_throttles()
def check_throttles(self, request):
"""
Check if request should be throttled.
Raises an appropriate exception if the request is throttled.
"""
throttle_durations = []
# 遍历配置的频率认证类,初始化得到一个个频率认证类对象(会调用频率认证类的 __init__() 方法)
for throttle in self.get_throttles():
# 频率认证类对象调用 allow_request 方法,判断是否限次(没有限次可访问,限次不可访问)
if not throttle.allow_request(request, self):
# 频率认证类对象在限次后,调用 wait 方法,获取还需等待多长时间可以进行下一次访问
throttle_durations.append(throttle.wait())
if throttle_durations:
# Filter out `None` values which may happen in case of config / rate
# changes, see #1438
durations = [
duration for duration in throttle_durations
if duration is not None
]
duration = max(durations, default=None)
self.throttled(request, duration)
# from rest_framework.throttling import SimpleRateThrottle
class SimpleRateThrottle(BaseThrottle):
"""
A simple cache implementation, that only requires `.get_cache_key()`
to be overridden.
The rate (requests / seconds) is set by a `rate` attribute on the Throttle
class. The attribute is a string of the form 'number_of_requests/period'.
Period should be one of: ('s', 'sec', 'm', 'min', 'h', 'hour', 'd', 'day')
Previous request information used for throttling is stored in the cache.
"""
cache = default_cache
timer = time.time
cache_format = 'throttle_%(scope)s_%(ident)s'
scope = None
THROTTLE_RATES = api_settings.DEFAULT_THROTTLE_RATES
def __init__(self):
if not getattr(self, 'rate', None):
self.rate = self.get_rate()
self.num_requests, self.duration = self.parse_rate(self.rate)
def get_cache_key(self, request, view):
# 需要我们重写的方法
"""
Should return a unique cache-key which can be used for throttling.
Must be overridden.
May return `None` if the request should not be throttled.
"""
raise NotImplementedError('.get_cache_key() must be overridden')
自定义频率类
- 继承SimpleRateThrottle类
- 重写
get_cache_key()
方法 - 设置一个rate属性,意为访问频率限制,如
'5/s'
# MyValidate.py
from rest_framework.throttling import SimpleRateThrottle
class CommonThrottle(SimpleRateThrottle):
rate = '3/m'
def get_cache_key(self, request, view):
# 返回什么,就会以什么做限制, ip地址限制;用户id
return request.META.get('REMOTE_ADDR')
局部限制
# views.py
from app.MyValidate import CommonThrottle
class book(APIView):
throttle_classes = [CommonThrottle]
全局限制
REST_FRAMEWORK = {
'DEFAULT_THROTTLE_CLASSES': ['app.MyValidate.CommonThrottle'],
}