阅读:60
参考,主要参考第一个链接的,然后根据自己的场景做了些改动
drf_admin(权限RBAC)后台管理系统(RBAC权限篇)
Django实战【六】—权限管理系统rbac组件实现
CRM【第一篇】: 权限组件之权限控制
Users用户表
Roles角色表
Permissions权限表
Users-Roles(用户角色关联表)
Roles-Permissions(角色权限关联表)
class User(AbstractUser):
"""
用户
"""
name = models.CharField(max_length=20, default='', blank=True, verbose_name='真实姓名')
mobile = models.CharField(max_length=11, unique=False, null=True, blank=True, default=None, verbose_name='手机号码')
role = models.ManyToManyField('Role', through='UserRole', blank=True,verbose_name='角色')
class Meta:
db_table = 'oauth_user'
verbose_name = '用户'
verbose_name_plural = verbose_name
ordering = ['id']
def __str__(self):
return self.username
def _get_user_permission(self):
# 获取用户权限
permission = list(filter(None, set(self.role.values_list('permission__sign', flat=True))))
if 'admin' in self.role.values_list('name', flat=True):
permission.append('admin')
return permission
def get_user_info(self):
# 获取用户信息
user_info = {
'id': self.pk,
'username': self.username,
'name': self.name,
'email': self.email,
'permission': self._get_user_permission(),
'mobile': '' if self.mobile is None else self.mobile
}
return user_info
class Role(BaseModel):
"""
角色
"""
name = models.CharField(max_length=32, unique=True, verbose_name='角色')
permission = models.ManyToManyField('Permission', through='RolePermission',blank=True, verbose_name='权限')
desc = models.CharField(max_length=50, blank=True, default='', verbose_name='描述')
objects = models.Manager()
def __str__(self):
return self.name
class Meta:
db_table = 'system_role'
verbose_name = '角色'
verbose_name_plural = verbose_name
ordering = ['id']
class Permission(BaseModel):
"""
权限
"""
method_choices = (
(u'POST', u'增'),
(u'DELETE', u'删'),
(u'PUT', u'改'),
(u'PATCH', u'局部改'),
(u'GET', u'查')
)
name = models.CharField(max_length=255, verbose_name='权限名')
sign = models.CharField(max_length=255, unique=True, verbose_name='权限标识')
menu = models.BooleanField(verbose_name='是否为菜单',default=False) # True为菜单,False为接口
method = models.CharField(max_length=8, blank=True, default='', choices=method_choices, verbose_name='方法')
path = models.CharField(max_length=200, blank=True, default='', verbose_name='请求路径正则')
pid = models.ForeignKey('self', null=True, blank=True, on_delete=models.CASCADE, verbose_name='父权限')
desc = models.CharField(max_length=30, blank=True, default='', verbose_name='权限描述')
objects = models.Manager()
def __str__(self):
return self.name
class Meta:
db_table = 'system_permission'
verbose_name = '权限'
verbose_name_plural = verbose_name
ordering = ['id']
前面的用户表和角色表中,多对多关系使用through='UserRole'
和through='RolePermission'
指定多对多关系的中间实体,方便自己自定义一些配置。
class UserRole(models.Model):
'''用户角色(中间实体)'''
user = models.ForeignKey('User', on_delete=models.CASCADE,null=False)
role = models.ForeignKey('Role', on_delete=models.CASCADE,null=False)
class Meta:
db_table = 'oauth_user_to_system_role'
unique_together = (('user', 'role'),)
class RolePermission(models.Model):
'''角色权限(中间实体)'''
role = models.ForeignKey('Role', on_delete=models.CASCADE, null=False)
permission = models.ForeignKey('Permission', on_delete=models.CASCADE,null=False)
class Meta:
db_table = 'system_role_to_system_permission'
unique_together = (('role', 'permission'), )
class UserLock(APIException):
status_code = status.HTTP_400_BAD_REQUEST
default_detail = '用户已被锁定,请联系管理员'
default_code = 'not_authenticated'
class RbacPermission(BasePermission):
"""
自定义权限认证
"""
@staticmethod
def pro_uri(uri):
'''
让返回的路径为/path1/path2/path3/格式,确保路径开头和末尾都有/,使用/+将连续的去重
:param uri:
:return:
'''
#base_api = settings.BASE_API
#uri = '/' + base_api + '/' + uri + '/'
uri='/' + uri + '/'
#print(re.sub('/+', '/', uri))
return re.sub('/+', '/', uri) #去除多余的/号,让返回的路径符合标准格式
def has_permission(self, request, view):
if not request.user.is_active:
raise UserLock()
request_url = request.path
# 如果请求url在白名单,放行
for safe_url in settings.WHITE_LIST:
#print(safe_url)
#print(request_url)
if re.match(settings.REGEX_URL.format(url=self.pro_uri(safe_url)), request_url):
#print("白名单放行")
return True
#从mysql中读取数据
# permissionlist=Permission.objects.values_list("path",flat=True)
# #print(permissionlist)
# if request_url not in permissionlist:
# print("请求路径不在权限控制中,通过")
# return True
#遍历用户所属角色,再遍历每个角色拥有的权限的路径和方法,如果匹配则通过
# for roleitem in request.user.role.all().prefetch_related('permission'):
# for permissionitem in roleitem.permission.all():
# #print(permissionitem.path)
# #print(permissionitem.name)
# #print(permissionitem.method)
# if permissionitem.method == request.method and request.path.startswith(permissionitem.path):
# #print(True)
# return True
# return False
# 从redis中读取信息
conn = get_redis_connection('user_info')
if not conn.exists('user_permissions_manage'):
redis_storage_permissions(conn)
if conn.exists('user_info_%s' % request.user.id):
user_permissions = json.loads(conn.hget('user_info_%s' % request.user.id, 'permission').decode())
# if 'admin' in user_permissions:
# return True
else:
user_permissions = []
# if 'admin' in request.user.roles.values_list('name', flat=True):
# return True
# RBAC权限验证
# Step 1
request_method = request.method
#获取全部的路径列表 [b'/api/v1/rbac/test2/', b'/api/v1/rbac/test4/', b'/', b'/api/v1/rbac/test/', b'/TEST']
url_keys = conn.hkeys('user_permissions_manage')
permissions=[]
#[{'method': 'GET', 'sign': 'GET_/', 'id': 1}, {'method': 'POST', 'sign': 'POST_/', 'id': 3}, {'method': 'DELETE', 'sign': 'DELETE_/', 'id': 4}, {'method': 'PUT', 'sign': 'PUT_/', 'id': 9}, {'method': 'POST', 'sign': '123456789', 'id': 11}, {'method': 'GET', 'sign': 'GET_/api/v1/rbac/test/', 'id': 14}, {'method': 'POST', 'sign': 'POST_/api/v1/rbac/test/', 'id': 8}]
#遍历权限中的所有路径,如果有匹配请求路径开头的,则获取该路径的所有权限放入permissions列表中
for url_key in url_keys:
if self.pro_uri(request_url).startswith(url_key.decode()):
redis_key=url_key.decode()
permissions.extend(json.loads(conn.hget('user_permissions_manage', redis_key).decode()))
#print(permissions)
#遍历匹配到的路径的全部权限,如果方法和标识都匹配,则通过
for permission in permissions:
if permission.get('method') == request_method and permission.get('sign') in user_permissions:
return True
return False
# 指定自定义的用户模型
AUTH_USER_MODEL = 'rbac.User'
#白名单
WHITE_LIST=['/api/v1/rbac/whitetest','/api/v1/rbac/userinfo/']
# DRF权限配置
REST_FRAMEWORK = {
'DEFAULT_PERMISSION_CLASSES':
(
'rest_framework.permissions.IsAuthenticated', # 登录验证
'drf_admin.utils.permissions.RbacPermission', # 自定义权限认证
),
}
这里的redis缓存里存储权限信息哈希表和用户信息哈希表
如下图,以权限的路径作为key,value里存储相同该路径,不同方法的权限信息
key:/api/v1/rbac/roles/
value:[{"method": "GET", "sign": "GET_/api/v1/rbac/roles/", "id": 25}, {"method": "POST", "sign": "POST_/api/v1/rbac/roles/", "id": 26}, {"method": "PUT", "sign": "PUT_/api/v1/rbac/roles/", "id": 27}, {"method": "DELETE", "sign": "DELETE_/api/v1/rbac/roles/", "id": 28}]
如下图,每个用户一张哈希表,比如user_info_16表示id为16的用户的权限信息表。主要存放用户的邮箱还有拥有的权限的标识列表,例如["GET_/api/v1/rbac/permissions/", "DELETE_/", "GET_/api/v1/rbac/roles/", "GET_/api/v1/rbac/users/", "PUT_/", "POST_/", "GET_/", "admin"]
,每个权限会有一个自己的sign标识。
在rbac认证的app下新建signals文件,为权限Permission模型设置增删改的信号函数
import json
import logging
from django.db.models.signals import pre_save, pre_delete, post_save
from django.dispatch import receiver
from django_redis import get_redis_connection
from apps.rbac.models import Permission
@receiver(pre_save, sender=Permission)
def update_permissions_to_redis(sender, instance, **kwargs):
"""
保存前
Permissions模型,更新时更新redis
:param sender:
:param instance:
:param kwargs:
:return:
"""
conn = get_redis_connection('user_info')
if instance.id:
if not instance.menu:
# 接口权限,判断权限path的变化,更新redis
try:
#获取未修改前的权限信息
permission = Permission.objects.get(id=instance.id)
except Permission.DoesNotExist:
return
#{"/path1":[{"method":"GET","sign":"guest1","id":1},{"method":"POST","sign":"guest2","id":2}],"/path2":"{}","path3":"{}"}
if permission.path != instance.path:
# 路径更改,删除原有记录并新增一条权限记录
if conn.hexists('user_permissions_manage', permission.path):
#遍历该路径所拥有的权限列表(比如get,post等),如果权限id为当前修改的,则删除
permissions = json.loads(conn.hget('user_permissions_manage', permission.path))
for index, value in enumerate(permissions):
if value.get('id') == instance.id:
del permissions[index]
#如果删除了路径更改的权限后,还有剩余的其他权限,则重新写入,如果没有则删这个路径的field
if permissions:
conn.hset('user_permissions_manage', permission.path, json.dumps(permissions))
else:
conn.hdel('user_permissions_manage', permission.path)
if conn.hexists('user_permissions_manage', instance.path):
# 如存在路径记录, 添加
permissions = json.loads(conn.hget('user_permissions_manage', instance.path))
permissions.append({
'method': instance.method,
'sign': instance.sign,
'id': instance.id,
})
conn.hset('user_permissions_manage', instance.path, json.dumps(permissions))
else:
# 否则新增
conn.hset('user_permissions_manage', instance.path, json.dumps([{
'method': instance.method,
'sign': instance.sign,
'id': instance.id,
}]))
else:
# 路径未变,更改原有权限记录
if permission.method != instance.method or permission.sign != instance.sign:
permissions = json.loads(conn.hget('user_permissions_manage', instance.path))
for permission in permissions:
if permission.get('id') == instance.id:
permission['method'] = instance.method
permission['sign'] = instance.sign
conn.hset('user_permissions_manage', instance.path, json.dumps(permissions))
else:
# 菜单权限,判断是否由接口权限改为菜单权限,如果是则删除原有记录
try:
permission = Permission.objects.get(id=instance.id)
except Permission.DoesNotExist:
return
if not permission.menu and conn.hexists('user_permissions_manage', permission.path):
permissions = json.loads(conn.hget('user_permissions_manage', permission.path))
for index, value in enumerate(permissions):
if value.get('id') == instance.id:
del permissions[index]
if permissions:
conn.hset('user_permissions_manage', permission.path, json.dumps(permissions))
else:
conn.hdel('user_permissions_manage', permission.path)
@receiver(post_save, sender=Permission)
def create_permissions_to_redis(sender, instance, **kwargs):
"""
Permissions模型,创建时更新redis #保存后
:param sender:
:param instance:
:param kwargs:
:return:
"""
#{"/path1":[{"method":"GET","sign":"guest1","id":1},{"method":"POST","sign":"guest2","id":2}],"/path2":"{}","path3":"{}"}
#如果不是菜单权限并且是刚创建的权限
print(instance)
if not instance.menu and kwargs.get('created'):
conn = get_redis_connection('user_info')
#如果该路径的权限信息在redis中存在,则追加
if conn.exists('user_permissions_manage') and conn.hexists('user_permissions_manage', instance.path):
permissions = json.loads(conn.hget('user_permissions_manage', instance.path))
permissions.append({
'method': instance.method,
'sign': instance.sign,
'id': instance.id,
})
conn.hset('user_permissions_manage', instance.path, json.dumps(permissions))
#如果该路径的权限信息在redis中不存在,创建
else:
conn.hset('user_permissions_manage', instance.path, json.dumps([{
'method': instance.method,
'sign': instance.sign,
'id': instance.id,
}]))
@receiver(pre_delete, sender=Permission)
def delete_permissions_from_redis(sender, instance, **kwargs):
"""
Permissions模型,删除时更新redis
:param sender:
:param instance:
:param kwargs:
:return:
"""
if not instance.menu:
conn = get_redis_connection('user_info')
#如果该路径的权限信息在redis中存在,则遍历该路径的权限,如果有id匹配的,则删除
if conn.exists('user_permissions_manage') and conn.hexists('user_permissions_manage', instance.path):
permissions = json.loads(conn.hget('user_permissions_manage', instance.path))
for index, permission in enumerate(permissions):
if permission.get('id') == instance.id:
del permissions[index]
if permissions:
conn.hset('user_permissions_manage', instance.path, json.dumps(permissions))
else:
conn.hdel('user_permissions_manage', instance.path)
在app下的apps文件中导入signals
from django.apps import AppConfig
class RbacConfig(AppConfig):
name = 'apps.rbac'
#导入signals
def ready(self):
import apps.rbac.signals
在用户登录完或者前端跳转页面前,需要请求一下用户的信息,在这个接口里可以将用户权限信息写入redis缓存中
class UserInfoView(APIView):
"""
get:
当前用户信息
当前用户信息, status: 200(成功), return: 用户信息和权限
"""
def get(self, request):
user_info = request.user.get_user_info()
# 将用户信息缓存到redis
conn = get_redis_connection('user_info')
# if request.user.is_superuser and 'admin' not in user_info['permissions']:
# user_info['permissions'].append('admin')
#从mysql中获取用户的权限标识列表,然后写入redis
user_info['permission'] = json.dumps(user_info['permission'])
conn.hmset('user_info_%s' % request.user.id, user_info)
conn.expire('user_info_%s' % request.user.id, 60 * 60 * 24 * 30) # 设置过期时间为30天
user_info['permission'] = json.loads(user_info['permission'])
# objs=Permission.objects.all()
# for obj in objs:
# method=obj.method
# path=obj.path
# sign = "{method}_{path}".format(method=method, path=path)
# obj.sign=sign
# obj.save()
return Response(user_info, status=status.HTTP_200_OK)
1、权限管理
2、角色管理
还有些不足的地方,权限上面其实可以再加个菜单,几个权限放在一个菜单里,方便管理,这个后面有空再研究下吧