from psycopg import AsyncConnection from mm_api.models.permission import PermissionGroupCreate, GroupPermissionAssign, UserGroupAssign async def can(conn: AsyncConnection, user_id: int, action: str, module: str, is_admin: bool = False) -> bool: """ Kullanıcının belirtilen modül+eylem için yetkisi var mı? is_admin=True → admin kapsamı (herhangi bir içerik); False → yalnızca kendi içeriği. Süper kullanıcı grubundaysa is_admin değerinden bağımsız direkt True. scope=FALSE olan izinler her iki durumda da geçerlidir (kendi içeriği hem admin hem normal kullanıcıya açık). """ row = await (await conn.execute( """ SELECT EXISTS ( SELECT 1 FROM user_groups ug JOIN permission_groups pg ON pg.id = ug.group_id LEFT JOIN group_permissions gp ON gp.group_id = pg.id LEFT JOIN permissions p ON p.id = gp.permission_id WHERE ug.user_id = %s AND ( pg.is_superuser = TRUE OR ( p.module IN (%s, '*') AND p.action IN (%s, '*') AND (p.scope = %s OR p.scope = FALSE) ) ) ) """, (user_id, module, action, is_admin) )).fetchone() return row[0] if row else False async def user_permissions(conn: AsyncConnection, user_id: int) -> list[dict]: """Kullanıcının sahip olduğu tüm izinleri döner.""" rows = await (await conn.execute( """ SELECT DISTINCT p.module, p.action, p.description, pg.is_superuser FROM user_groups ug JOIN permission_groups pg ON pg.id = ug.group_id LEFT JOIN group_permissions gp ON gp.group_id = pg.id LEFT JOIN permissions p ON p.id = gp.permission_id WHERE ug.user_id = %s ORDER BY p.module, p.action """, (user_id,) )).fetchall() if any(r[3] for r in rows): # süper kullanıcı grubundaysa return [{"module": "*", "action": "*", "description": "Tüm izinler", "is_superuser": True}] return [{"module": r[0], "action": r[1], "description": r[2], "is_superuser": False} for r in rows if r[0]] async def list_permissions(conn: AsyncConnection) -> list[dict]: rows = await (await conn.execute( "SELECT id, module, action, description FROM permissions ORDER BY module, action" )).fetchall() return [{"id": r[0], "module": r[1], "action": r[2], "description": r[3]} for r in rows] async def list_groups(conn: AsyncConnection) -> list[dict]: rows = await (await conn.execute( "SELECT id, name, description, is_superuser, created_at FROM permission_groups ORDER BY name" )).fetchall() return [_group_row(r) for r in rows] async def get_group(conn: AsyncConnection, group_id: int) -> dict | None: row = await (await conn.execute( "SELECT id, name, description, is_superuser, created_at FROM permission_groups WHERE id = %s", (group_id,) )).fetchone() return _group_row(row) if row else None async def create_group(conn: AsyncConnection, data: PermissionGroupCreate) -> dict: row = await (await conn.execute( "INSERT INTO permission_groups (name, description, is_superuser) VALUES (%s, %s, %s) RETURNING id", (data.name, data.description, data.is_superuser) )).fetchone() await conn.commit() return await get_group(conn, row[0]) async def set_group_permissions(conn: AsyncConnection, group_id: int, data: GroupPermissionAssign): await conn.execute("DELETE FROM group_permissions WHERE group_id = %s", (group_id,)) for pid in data.permission_ids: await conn.execute( "INSERT INTO group_permissions (group_id, permission_id) VALUES (%s, %s) ON CONFLICT DO NOTHING", (group_id, pid) ) await conn.commit() async def group_permissions(conn: AsyncConnection, group_id: int) -> list[dict]: rows = await (await conn.execute( """SELECT p.id, p.module, p.action, p.description FROM group_permissions gp JOIN permissions p ON p.id = gp.permission_id WHERE gp.group_id = %s ORDER BY p.module, p.action""", (group_id,) )).fetchall() return [{"id": r[0], "module": r[1], "action": r[2], "description": r[3]} for r in rows] async def assign_user_to_group(conn: AsyncConnection, data: UserGroupAssign, granted_by: int): await conn.execute( "INSERT INTO user_groups (user_id, group_id, granted_by) VALUES (%s, %s, %s) ON CONFLICT DO NOTHING", (data.user_id, data.group_id, granted_by) ) await conn.commit() async def remove_user_from_group(conn: AsyncConnection, user_id: int, group_id: int): await conn.execute( "DELETE FROM user_groups WHERE user_id = %s AND group_id = %s", (user_id, group_id) ) await conn.commit() async def user_groups(conn: AsyncConnection, user_id: int) -> list[dict]: rows = await (await conn.execute( """SELECT pg.id, pg.name, pg.is_superuser, ug.granted_at FROM user_groups ug JOIN permission_groups pg ON pg.id = ug.group_id WHERE ug.user_id = %s ORDER BY pg.name""", (user_id,) )).fetchall() return [{"id": r[0], "name": r[1], "is_superuser": r[2], "granted_at": r[3]} for r in rows] def _group_row(r) -> dict: return {"id": r[0], "name": r[1], "description": r[2], "is_superuser": r[3], "created_at": r[4]}