日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Django REST framework 认证、权限和频率组件

發(fā)布時間:2023/12/20 编程问答 47 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Django REST framework 认证、权限和频率组件 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

認(rèn)證與權(quán)限頻率組件

身份驗證是將傳入請求與一組標(biāo)識憑據(jù)(例如請求來自的用戶或其簽名的令牌)相關(guān)聯(lián)的機(jī)制。然后 權(quán)限 和 限制 組件決定是否拒絕這個請求。

簡單來說就是:

  • 認(rèn)證確定了你是誰
  • 權(quán)限確定你能不能訪問某個接口
  • 限制確定你訪問某個接口的頻率

一、認(rèn)證組件

REST framework 提供了一些開箱即用的身份驗證方案,并且還允許你實現(xiàn)自定義方案。

自定義Token認(rèn)證

定義一個用戶表和一個保存用戶Token的表:

class UserInfo(models.Model):username = models.CharField(max_length=16)password = models.CharField(max_length=32)type = models.SmallIntegerField(choices=((1, '普通用戶'), (2, 'VIP用戶')),default=1)class Token(models.Model):user = models.OneToOneField(to='UserInfo')token_code = models.CharField(max_length=128)

定義一個登錄視圖:

def get_random_token(username):"""根據(jù)用戶名和時間戳生成隨機(jī)token:param username::return:"""import hashlib, timetimestamp = str(time.time())m = hashlib.md5(bytes(username, encoding="utf8"))m.update(bytes(timestamp, encoding="utf8"))return m.hexdigest()class LoginView(APIView):"""校驗用戶名密碼是否正確從而生成token的視圖"""def post(self, request):res = {"code": 0}print(request.data)username = request.data.get("username")password = request.data.get("password")user = models.UserInfo.objects.filter(username=username, password=password).first()if user:# 如果用戶名密碼正確token = get_random_token(username)models.Token.objects.update_or_create(defaults={"token_code": token}, user=user)res["token"] = tokenelse:res["code"] = 1res["error"] = "用戶名或密碼錯誤"return Response(res)

定義一個認(rèn)證類

from rest_framework.authentication import BaseAuthentication from rest_framework.exceptions import AuthenticationFailedclass MyAuth(BaseAuthentication):def authenticate(self, request): # 必須實現(xiàn)authenticate方法,返回(認(rèn)證之后的用戶,認(rèn)證的obj)if request.method in ["POST", "PUT", "DELETE"]:request_token = request.data.get("token", None)if not request_token:raise AuthenticationFailed('缺少token')token_obj = models.Token.objects.filter(token_code=request_token).first()if not token_obj:raise AuthenticationFailed('無效的token')return token_obj.user.username, Noneelse:return None, None

視圖級別認(rèn)證

class CommentViewSet(ModelViewSet):queryset = models.Comment.objects.all()serializer_class = app01_serializers.CommentSerializerauthentication_classes = [MyAuth, ]

全局級別認(rèn)證

# 在settings.py中配置 REST_FRAMEWORK = {"DEFAULT_AUTHENTICATION_CLASSES": ["app01.utils.MyAuth", ] }

二、權(quán)限組件

只有VIP用戶才能看的內(nèi)容。

自定義一個權(quán)限類

# 自定義權(quán)限 class MyPermission(BasePermission):message = 'VIP用戶才能訪問'def has_permission(self, request, view):"""必須實現(xiàn)has_permission,有權(quán)限返回True,無權(quán)限返回False"""# 因為在進(jìn)行權(quán)限判斷之前已經(jīng)做了認(rèn)證判斷,所以這里可以直接拿到request.userif request.user and request.user.type == 2: # 如果是VIP用戶return Trueelse:return False

視圖級別配置

class CommentViewSet(ModelViewSet):queryset = models.Comment.objects.all()serializer_class = app01_serializers.CommentSerializerauthentication_classes = [MyAuth, ]permission_classes = [MyPermission, ]

全局級別設(shè)置

# 在settings.py中設(shè)置rest framework相關(guān)配置項 REST_FRAMEWORK = {"DEFAULT_AUTHENTICATION_CLASSES": ["app01.utils.MyAuth", ],"DEFAULT_PERMISSION_CLASSES": ["app01.utils.MyPermission", ] }

三、頻率限制組件

DRF內(nèi)置了基本的限制類,首先我們自己動手寫一個限制類,熟悉下限制組件的執(zhí)行過程。

自定義限制類

VISIT_RECORD = {} # 自定義限制 class MyThrottle(object):def __init__(self):self.history = Nonedef allow_request(self, request, view): """必須實現(xiàn)allow_request,允許訪問返回True,否則返回False自定義頻率限制60秒內(nèi)只能訪問三次"""# 獲取用戶IPip = request.META.get("REMOTE_ADDR")timestamp = time.time()if ip not in VISIT_RECORD:VISIT_RECORD[ip] = [timestamp, ]return Truehistory = VISIT_RECORD[ip]self.history = historyhistory.insert(0, timestamp)while history and history[-1] < timestamp - 60:history.pop()if len(history) > 3:return Falseelse:return Truedef wait(self):"""限制時間還剩多少"""timestamp = time.time()return 60 - (timestamp - self.history[-1])

視圖使用

class CommentViewSet(ModelViewSet):queryset = models.Comment.objects.all()serializer_class = app01_serializers.CommentSerializerthrottle_classes = [MyThrottle, ]

全局使用

# 在settings.py中設(shè)置rest framework相關(guān)配置項 REST_FRAMEWORK = {"DEFAULT_AUTHENTICATION_CLASSES": ["app01.utils.MyAuth", ],"DEFAULT_PERMISSION_CLASSES": ["app01.utils.MyPermission", ],"DEFAULT_THROTTLE_CLASSES": ["app01.utils.MyThrottle", ] }

使用內(nèi)置限制類

from rest_framework.throttling import SimpleRateThrottleclass VisitThrottle(SimpleRateThrottle):scope = "xxx"def get_cache_key(self, request, view):return self.get_ident(request)

全局配置

# 在settings.py中設(shè)置rest framework相關(guān)配置項 REST_FRAMEWORK = {"DEFAULT_AUTHENTICATION_CLASSES": ["app01.utils.MyAuth", ],# "DEFAULT_PERMISSION_CLASSES": ["app01.utils.MyPermission", ]"DEFAULT_THROTTLE_CLASSES": ["app01.utils.VisitThrottle", ],"DEFAULT_THROTTLE_RATES": {"xxx": "5/m",} }

認(rèn)證類源碼

############################ authentication.py #################################### from __future__ import unicode_literalsimport base64 import binasciifrom django.contrib.auth import authenticate, get_user_model from django.middleware.csrf import CsrfViewMiddleware from django.utils.six import text_type from django.utils.translation import ugettext_lazy as _from rest_framework import HTTP_HEADER_ENCODING, exceptionsdef get_authorization_header(request):"""Return request's 'Authorization:' header, as a bytestring.Hide some test client ickyness where the header can be unicode."""auth = request.META.get('HTTP_AUTHORIZATION', b'')if isinstance(auth, text_type):# Work around django test client oddnessauth = auth.encode(HTTP_HEADER_ENCODING)return authclass CSRFCheck(CsrfViewMiddleware):def _reject(self, request, reason):# Return the failure reason instead of an HttpResponsereturn reasonclass BaseAuthentication(object):"""All authentication classes should extend BaseAuthentication."""def authenticate(self, request):"""Authenticate the request and return a two-tuple of (user, token)."""raise NotImplementedError(".authenticate() must be overridden.")def authenticate_header(self, request):"""Return a string to be used as the value of the `WWW-Authenticate`header in a `401 Unauthenticated` response, or `None` if theauthentication scheme should return `403 Permission Denied` responses."""passclass BasicAuthentication(BaseAuthentication):"""HTTP Basic authentication against username/password."""www_authenticate_realm = 'api'def authenticate(self, request):"""Returns a `User` if a correct username and password have been suppliedusing HTTP Basic authentication. Otherwise returns `None`."""auth = get_authorization_header(request).split()if not auth or auth[0].lower() != b'basic':return Noneif len(auth) == 1:msg = _('Invalid basic header. No credentials provided.')raise exceptions.AuthenticationFailed(msg)elif len(auth) > 2:msg = _('Invalid basic header. Credentials string should not contain spaces.')raise exceptions.AuthenticationFailed(msg)try:auth_parts = base64.b64decode(auth[1]).decode(HTTP_HEADER_ENCODING).partition(':')except (TypeError, UnicodeDecodeError, binascii.Error):msg = _('Invalid basic header. Credentials not correctly base64 encoded.')raise exceptions.AuthenticationFailed(msg)userid, password = auth_parts[0], auth_parts[2]return self.authenticate_credentials(userid, password, request)def authenticate_credentials(self, userid, password, request=None):"""Authenticate the userid and password against username and passwordwith optional request for context."""credentials = {get_user_model().USERNAME_FIELD: userid,'password': password}user = authenticate(request=request, **credentials)if user is None:raise exceptions.AuthenticationFailed(_('Invalid username/password.'))if not user.is_active:raise exceptions.AuthenticationFailed(_('User inactive or deleted.'))return (user, None)def authenticate_header(self, request):return 'Basic realm="%s"' % self.www_authenticate_realmclass SessionAuthentication(BaseAuthentication):"""Use Django's session framework for authentication."""def authenticate(self, request):"""Returns a `User` if the request session currently has a logged in user.Otherwise returns `None`."""# Get the session-based user from the underlying HttpRequest objectuser = getattr(request._request, 'user', None)# Unauthenticated, CSRF validation not requiredif not user or not user.is_active:return Noneself.enforce_csrf(request)# CSRF passed with authenticated userreturn (user, None)def enforce_csrf(self, request):"""Enforce CSRF validation for session based authentication."""check = CSRFCheck()# populates request.META['CSRF_COOKIE'], which is used in process_view()check.process_request(request)reason = check.process_view(request, None, (), {})if reason:# CSRF failed, bail with explicit error messageraise exceptions.PermissionDenied('CSRF Failed: %s' % reason)class TokenAuthentication(BaseAuthentication):"""Simple token based authentication.Clients should authenticate by passing the token key in the "Authorization"HTTP header, prepended with the string "Token ". For example:Authorization: Token 401f7ac837da42b97f613d789819ff93537bee6a"""keyword = 'Token'model = Nonedef get_model(self):if self.model is not None:return self.modelfrom rest_framework.authtoken.models import Tokenreturn Token"""A custom token model may be used, but must have the following properties.* key -- The string identifying the token* user -- The user to which the token belongs"""def authenticate(self, request):auth = get_authorization_header(request).split()if not auth or auth[0].lower() != self.keyword.lower().encode():return Noneif len(auth) == 1:msg = _('Invalid token header. No credentials provided.')raise exceptions.AuthenticationFailed(msg)elif len(auth) > 2:msg = _('Invalid token header. Token string should not contain spaces.')raise exceptions.AuthenticationFailed(msg)try:token = auth[1].decode()except UnicodeError:msg = _('Invalid token header. Token string should not contain invalid characters.')raise exceptions.AuthenticationFailed(msg)return self.authenticate_credentials(token)def authenticate_credentials(self, key):model = self.get_model()try:token = model.objects.select_related('user').get(key=key)except model.DoesNotExist:raise exceptions.AuthenticationFailed(_('Invalid token.'))if not token.user.is_active:raise exceptions.AuthenticationFailed(_('User inactive or deleted.'))return (token.user, token)def authenticate_header(self, request):return self.keywordclass RemoteUserAuthentication(BaseAuthentication):"""REMOTE_USER authentication.To use this, set up your web server to perform authentication, which willset the REMOTE_USER environment variable. You will need to have'django.contrib.auth.backends.RemoteUserBackend in yourAUTHENTICATION_BACKENDS setting"""# Name of request header to grab username from. This will be the key as# used in the request.META dictionary, i.e. the normalization of headers to# all uppercase and the addition of "HTTP_" prefix apply.header = "REMOTE_USER"def authenticate(self, request):user = authenticate(remote_user=request.META.get(self.header))if user and user.is_active:return (user, None)

?

總結(jié)

以上是生活随笔為你收集整理的Django REST framework 认证、权限和频率组件的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。