十一、Django REST framework自定义使用RBAC权限

     阅读:26

参考,主要参考第一个链接的,然后根据自己的场景做了些改动
drf_admin(权限RBAC)后台管理系统(RBAC权限篇)
Django实战【六】—权限管理系统rbac组件实现
CRM【第一篇】: 权限组件之权限控制

一、流程

在这里插入图片描述

在这里插入图片描述

  • 根据需求,创建不同角色,例如:admin、visitor
  • 依据角色,给不同的角色分配不同的权限
  • 根据用户的岗位及职责分配角色,使不同用户具有不同的权限
  • 用户请求后端接口时,验证用户权限,通过则放行,否则返回403
  • 操作数据库

二、数据库表设计

Users用户表
Roles角色表
Permissions权限表
Users-Roles(用户角色关联表)
Roles-Permissions(角色权限关联表)

在这里插入图片描述

1、用户表

class User(AbstractUser):
    """
    用户
    """
    name = models.CharField(max_length=20, default='', blank=True, verbose_name='真实姓名')
    mobile = models.CharField(max_length=11, unique=False, null=True, blank=True, default=None, verbose_name='手机号码')
    role = models.ManyToManyField('Role', through='UserRole', blank=True,verbose_name='角色')

    class Meta:
        db_table = 'oauth_user'
        verbose_name = '用户'
        verbose_name_plural = verbose_name
        ordering = ['id']

    def __str__(self):
        return self.username

    def _get_user_permission(self):
        # 获取用户权限
        permission = list(filter(None, set(self.role.values_list('permission__sign', flat=True))))
        if 'admin' in self.role.values_list('name', flat=True):
            permission.append('admin')
        return permission

    def get_user_info(self):
        # 获取用户信息
        user_info = {
            'id': self.pk,
            'username': self.username,
            'name': self.name,
            'email': self.email,
            'permission': self._get_user_permission(),
            'mobile': '' if self.mobile is None else self.mobile
        }
        return user_info

2、角色表

class Role(BaseModel):
    """
    角色
    """
    name = models.CharField(max_length=32, unique=True, verbose_name='角色')
    permission = models.ManyToManyField('Permission', through='RolePermission',blank=True, verbose_name='权限')
    desc = models.CharField(max_length=50, blank=True, default='', verbose_name='描述')

    objects = models.Manager()

    def __str__(self):
        return self.name

    class Meta:
        db_table = 'system_role'
        verbose_name = '角色'
        verbose_name_plural = verbose_name
        ordering = ['id']

3、权限表

class Permission(BaseModel):
    """
    权限
    """
    method_choices = (
        (u'POST', u'增'),
        (u'DELETE', u'删'),
        (u'PUT', u'改'),
        (u'PATCH', u'局部改'),
        (u'GET', u'查')
    )

    name = models.CharField(max_length=255, verbose_name='权限名')
    sign = models.CharField(max_length=255, unique=True, verbose_name='权限标识')
    menu = models.BooleanField(verbose_name='是否为菜单',default=False)  # True为菜单,False为接口
    method = models.CharField(max_length=8, blank=True, default='', choices=method_choices, verbose_name='方法')
    path = models.CharField(max_length=200, blank=True, default='', verbose_name='请求路径正则')
    pid = models.ForeignKey('self', null=True, blank=True, on_delete=models.CASCADE, verbose_name='父权限')
    desc = models.CharField(max_length=30, blank=True, default='', verbose_name='权限描述')
    objects = models.Manager()

    def __str__(self):
        return self.name

    class Meta:
        db_table = 'system_permission'
        verbose_name = '权限'
        verbose_name_plural = verbose_name
        ordering = ['id']

4、用户角色表和角色权限表中间实体

前面的用户表和角色表中,多对多关系使用through='UserRole'through='RolePermission'指定多对多关系的中间实体,方便自己自定义一些配置。

class UserRole(models.Model):
    '''用户角色(中间实体)'''
    user = models.ForeignKey('User', on_delete=models.CASCADE,null=False)
    role = models.ForeignKey('Role', on_delete=models.CASCADE,null=False)

    class Meta:
        db_table = 'oauth_user_to_system_role'
        unique_together = (('user', 'role'),)

class RolePermission(models.Model):
    '''角色权限(中间实体)'''
    role = models.ForeignKey('Role', on_delete=models.CASCADE, null=False)
    permission = models.ForeignKey('Permission', on_delete=models.CASCADE,null=False)
    class Meta:
        db_table = 'system_role_to_system_permission'
        unique_together = (('role', 'permission'), )

5、RbacPermission权限验证文件配置

  • 这里has_permission方法里判断用户是否有权限,中间注释掉的部分是是直接从mysql中读取权限的方法,如果不打算使用redis缓存而是直接从mysql里读取权限,就可以使用那部分逻辑。
  • 判断是否有权限的逻辑,我这里使用的startswith的方式,判断请求路径是否以权限的路径开头,这个权限路径后面的子路径也能有权限方法,比如管理员可以直接设置/路径。如果更严谨点可以使用正则匹配的方式,匹配到数字/+d/这种。如果没有匹配到的权限路径,则返回false,这样所有接口等于都使用了权限控制。
  • 当然也可以直接使用完全匹配,请求路径要完全匹配到权限的路径,但如果接口多的话,需要写很多权限,可以改变下逻辑,如果没有匹配到的权限路径,返回true,这样只需要把要权限控制的接口写到权限里就好了,这个只有需要的接口才使用权限控制,没写入权限的接口直接通过,但是完全匹配就没法正则和让子路径有权限,还是比较建议用正则匹配。
class UserLock(APIException):
    status_code = status.HTTP_400_BAD_REQUEST
    default_detail = '用户已被锁定,请联系管理员'
    default_code = 'not_authenticated'


class RbacPermission(BasePermission):
    """
    自定义权限认证
    """
    @staticmethod
    def pro_uri(uri):
        '''
        让返回的路径为/path1/path2/path3/格式,确保路径开头和末尾都有/,使用/+将连续的去重
        :param uri:
        :return:
        '''
        #base_api = settings.BASE_API
        #uri = '/' + base_api + '/' + uri + '/'
        uri='/' + uri + '/'
        #print(re.sub('/+', '/', uri))
        return re.sub('/+', '/', uri) #去除多余的/号,让返回的路径符合标准格式

    def has_permission(self, request, view):
        if not request.user.is_active:
            raise UserLock()
        request_url = request.path
        # 如果请求url在白名单,放行
        for safe_url in settings.WHITE_LIST:
            #print(safe_url)
            #print(request_url)
            if re.match(settings.REGEX_URL.format(url=self.pro_uri(safe_url)), request_url):
                #print("白名单放行")
                return True



        #从mysql中读取数据
        # permissionlist=Permission.objects.values_list("path",flat=True)
        # #print(permissionlist)
        # if request_url not in permissionlist:
        #     print("请求路径不在权限控制中,通过")
        #     return  True
        #遍历用户所属角色,再遍历每个角色拥有的权限的路径和方法,如果匹配则通过
        # for roleitem in request.user.role.all().prefetch_related('permission'):
        #     for permissionitem in roleitem.permission.all():
        #         #print(permissionitem.path)
        #         #print(permissionitem.name)
        #         #print(permissionitem.method)
        #         if permissionitem.method == request.method and request.path.startswith(permissionitem.path):
        #             #print(True)
        #             return True
        # return False


        # 从redis中读取信息
        conn = get_redis_connection('user_info')
        if not conn.exists('user_permissions_manage'):
            redis_storage_permissions(conn)
        if conn.exists('user_info_%s' % request.user.id):
             user_permissions = json.loads(conn.hget('user_info_%s' % request.user.id, 'permission').decode())
             # if 'admin' in user_permissions:
             #     return True
        else:
             user_permissions = []
             # if 'admin' in request.user.roles.values_list('name', flat=True):
             #     return True
        # RBAC权限验证
        # Step 1
        request_method = request.method
        #获取全部的路径列表 [b'/api/v1/rbac/test2/', b'/api/v1/rbac/test4/', b'/', b'/api/v1/rbac/test/', b'/TEST']
        url_keys = conn.hkeys('user_permissions_manage')
        permissions=[]
        #[{'method': 'GET', 'sign': 'GET_/', 'id': 1}, {'method': 'POST', 'sign': 'POST_/', 'id': 3}, {'method': 'DELETE', 'sign': 'DELETE_/', 'id': 4}, {'method': 'PUT', 'sign': 'PUT_/', 'id': 9}, {'method': 'POST', 'sign': '123456789', 'id': 11}, {'method': 'GET', 'sign': 'GET_/api/v1/rbac/test/', 'id': 14}, {'method': 'POST', 'sign': 'POST_/api/v1/rbac/test/', 'id': 8}]
        #遍历权限中的所有路径,如果有匹配请求路径开头的,则获取该路径的所有权限放入permissions列表中
        for url_key in url_keys:
            if self.pro_uri(request_url).startswith(url_key.decode()):
                redis_key=url_key.decode()
                permissions.extend(json.loads(conn.hget('user_permissions_manage', redis_key).decode()))
        #print(permissions)

        #遍历匹配到的路径的全部权限,如果方法和标识都匹配,则通过
        for permission in permissions:
                if permission.get('method') == request_method and permission.get('sign') in user_permissions:
                    return True
        return  False

6、settings文件配置

# 指定自定义的用户模型
AUTH_USER_MODEL = 'rbac.User'
#白名单
WHITE_LIST=['/api/v1/rbac/whitetest','/api/v1/rbac/userinfo/']
   
# DRF权限配置
REST_FRAMEWORK = {
    'DEFAULT_PERMISSION_CLASSES':
    (
        'rest_framework.permissions.IsAuthenticated',  # 登录验证
        'drf_admin.utils.permissions.RbacPermission',  # 自定义权限认证
    ),
}

7、模型更新时触发信号事件更新redis缓存里的权限信息

这里的redis缓存里存储权限信息哈希表和用户信息哈希表

1、权限信息哈希表

如下图,以权限的路径作为key,value里存储相同该路径,不同方法的权限信息

key:/api/v1/rbac/roles/
value:[{"method": "GET", "sign": "GET_/api/v1/rbac/roles/", "id": 25}, {"method": "POST", "sign": "POST_/api/v1/rbac/roles/", "id": 26}, {"method": "PUT", "sign": "PUT_/api/v1/rbac/roles/", "id": 27}, {"method": "DELETE", "sign": "DELETE_/api/v1/rbac/roles/", "id": 28}]

在这里插入图片描述

2、用户权限信息表

如下图,每个用户一张哈希表,比如user_info_16表示id为16的用户的权限信息表。主要存放用户的邮箱还有拥有的权限的标识列表,例如["GET_/api/v1/rbac/permissions/", "DELETE_/", "GET_/api/v1/rbac/roles/", "GET_/api/v1/rbac/users/", "PUT_/", "POST_/", "GET_/", "admin"],每个权限会有一个自己的sign标识。
在这里插入图片描述

3、模型signals信号设置

在rbac认证的app下新建signals文件,为权限Permission模型设置增删改的信号函数

import json
import logging

from django.db.models.signals import pre_save, pre_delete, post_save
from django.dispatch import receiver
from django_redis import get_redis_connection
from apps.rbac.models import Permission


@receiver(pre_save, sender=Permission)
def update_permissions_to_redis(sender, instance, **kwargs):
    """
    保存前
    Permissions模型,更新时更新redis
    :param sender:
    :param instance:
    :param kwargs:
    :return:
    """
    conn = get_redis_connection('user_info')
    if instance.id:
        if not instance.menu:
            # 接口权限,判断权限path的变化,更新redis
            try:
                #获取未修改前的权限信息
                permission = Permission.objects.get(id=instance.id)
            except Permission.DoesNotExist:
                return
            #{"/path1":[{"method":"GET","sign":"guest1","id":1},{"method":"POST","sign":"guest2","id":2}],"/path2":"{}","path3":"{}"}
            if permission.path != instance.path:
                # 路径更改,删除原有记录并新增一条权限记录
                if conn.hexists('user_permissions_manage', permission.path):
                    #遍历该路径所拥有的权限列表(比如get,post等),如果权限id为当前修改的,则删除
                    permissions = json.loads(conn.hget('user_permissions_manage', permission.path))
                    for index, value in enumerate(permissions):
                        if value.get('id') == instance.id:
                            del permissions[index]
                     #如果删除了路径更改的权限后,还有剩余的其他权限,则重新写入,如果没有则删这个路径的field
                    if permissions:
                        conn.hset('user_permissions_manage', permission.path, json.dumps(permissions))
                    else:
                        conn.hdel('user_permissions_manage', permission.path)

                if conn.hexists('user_permissions_manage', instance.path):
                    # 如存在路径记录, 添加
                    permissions = json.loads(conn.hget('user_permissions_manage', instance.path))
                    permissions.append({
                        'method': instance.method,
                        'sign': instance.sign,
                        'id': instance.id,
                    })
                    conn.hset('user_permissions_manage', instance.path, json.dumps(permissions))
                else:
                    # 否则新增
                    conn.hset('user_permissions_manage', instance.path, json.dumps([{
                        'method': instance.method,
                        'sign': instance.sign,
                        'id': instance.id,
                    }]))
            else:
                # 路径未变,更改原有权限记录
                if permission.method != instance.method or permission.sign != instance.sign:
                    permissions = json.loads(conn.hget('user_permissions_manage', instance.path))
                    for permission in permissions:
                        if permission.get('id') == instance.id:
                            permission['method'] = instance.method
                            permission['sign'] = instance.sign
                    conn.hset('user_permissions_manage', instance.path, json.dumps(permissions))
        else:
            # 菜单权限,判断是否由接口权限改为菜单权限,如果是则删除原有记录
            try:
                permission = Permission.objects.get(id=instance.id)
            except Permission.DoesNotExist:
                return
            if not permission.menu and conn.hexists('user_permissions_manage', permission.path):
                permissions = json.loads(conn.hget('user_permissions_manage', permission.path))
                for index, value in enumerate(permissions):
                    if value.get('id') == instance.id:
                        del permissions[index]
                if permissions:
                    conn.hset('user_permissions_manage', permission.path, json.dumps(permissions))
                else:
                    conn.hdel('user_permissions_manage', permission.path)


@receiver(post_save, sender=Permission)
def create_permissions_to_redis(sender, instance, **kwargs):
    """
    Permissions模型,创建时更新redis  #保存后
    :param sender:
    :param instance:
    :param kwargs:
    :return:
    """
    #{"/path1":[{"method":"GET","sign":"guest1","id":1},{"method":"POST","sign":"guest2","id":2}],"/path2":"{}","path3":"{}"}
    #如果不是菜单权限并且是刚创建的权限
    print(instance)
    if not instance.menu and kwargs.get('created'):
        conn = get_redis_connection('user_info')
        #如果该路径的权限信息在redis中存在,则追加
        if conn.exists('user_permissions_manage') and conn.hexists('user_permissions_manage', instance.path):
            permissions = json.loads(conn.hget('user_permissions_manage', instance.path))
            permissions.append({
                'method': instance.method,
                'sign': instance.sign,
                'id': instance.id,
            })
            conn.hset('user_permissions_manage', instance.path, json.dumps(permissions))
         #如果该路径的权限信息在redis中不存在,创建
        else:
            conn.hset('user_permissions_manage', instance.path, json.dumps([{
                'method': instance.method,
                'sign': instance.sign,
                'id': instance.id,
            }]))


@receiver(pre_delete, sender=Permission)
def delete_permissions_from_redis(sender, instance, **kwargs):
    """
    Permissions模型,删除时更新redis
    :param sender:
    :param instance:
    :param kwargs:
    :return:
    """
    if not instance.menu:
        conn = get_redis_connection('user_info')
        #如果该路径的权限信息在redis中存在,则遍历该路径的权限,如果有id匹配的,则删除
        if conn.exists('user_permissions_manage') and conn.hexists('user_permissions_manage', instance.path):
            permissions = json.loads(conn.hget('user_permissions_manage', instance.path))
            for index, permission in enumerate(permissions):
                if permission.get('id') == instance.id:
                    del permissions[index]
            if permissions:
                conn.hset('user_permissions_manage', instance.path, json.dumps(permissions))
            else:
                conn.hdel('user_permissions_manage', instance.path)

在app下的apps文件中导入signals

from django.apps import AppConfig


class RbacConfig(AppConfig):
    name = 'apps.rbac'
    #导入signals
    def ready(self):
        import apps.rbac.signals

8、获取用户信息并更新redis缓存的接口

在用户登录完或者前端跳转页面前,需要请求一下用户的信息,在这个接口里可以将用户权限信息写入redis缓存中

class UserInfoView(APIView):
    """
    get:
    当前用户信息

    当前用户信息, status: 200(成功), return: 用户信息和权限
    """

    def get(self, request):
        user_info = request.user.get_user_info()
        # 将用户信息缓存到redis
        conn = get_redis_connection('user_info')
        # if request.user.is_superuser and 'admin' not in user_info['permissions']:
        #     user_info['permissions'].append('admin')
        #从mysql中获取用户的权限标识列表,然后写入redis
        user_info['permission'] = json.dumps(user_info['permission'])
        conn.hmset('user_info_%s' % request.user.id, user_info)
        conn.expire('user_info_%s' % request.user.id, 60 * 60 * 24 * 30)  # 设置过期时间为30天
        user_info['permission'] = json.loads(user_info['permission'])

        # objs=Permission.objects.all()
        # for obj in objs:
        #     method=obj.method
        #     path=obj.path
        #     sign = "{method}_{path}".format(method=method, path=path)
        #     obj.sign=sign
        #     obj.save()

        return Response(user_info, status=status.HTTP_200_OK)

三、前端展示

1、权限管理
在这里插入图片描述
2、角色管理
在这里插入图片描述

还有些不足的地方,权限上面其实可以再加个菜单,几个权限放在一个菜单里,方便管理,这个后面有空再研究下吧