memleketmeselesi/mm_api/services/permission.py
Mukan Erkin 2498e75594 init: memleketmeselesi platform — API + migrations
FastAPI + PostgreSQL 16. KYC, issue sistemi, permission/group yönetimi,
session yönetimi, API client auth (kışla kapısı), officials/persons CRUD.
Migration 0001–0013 dahil.
2026-04-27 23:06:59 +03:00

135 lines
5.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from psycopg import AsyncConnection
from mm_api.models.permission import PermissionGroupCreate, GroupPermissionAssign, UserGroupAssign
async def can(conn: AsyncConnection, user_id: int, action: str, module: str, is_admin: bool = False) -> bool:
"""
Kullanıcının belirtilen modül+eylem için yetkisi var mı?
is_admin=True → admin kapsamı (herhangi bir içerik); False → yalnızca kendi içeriği.
Süper kullanıcı grubundaysa is_admin değerinden bağımsız direkt True.
scope=FALSE olan izinler her iki durumda da geçerlidir (kendi içeriği hem admin hem normal kullanıcıya açık).
"""
row = await (await conn.execute(
"""
SELECT EXISTS (
SELECT 1
FROM user_groups ug
JOIN permission_groups pg ON pg.id = ug.group_id
LEFT JOIN group_permissions gp ON gp.group_id = pg.id
LEFT JOIN permissions p ON p.id = gp.permission_id
WHERE ug.user_id = %s
AND (
pg.is_superuser = TRUE
OR (
p.module IN (%s, '*')
AND p.action IN (%s, '*')
AND (p.scope = %s OR p.scope = FALSE)
)
)
)
""",
(user_id, module, action, is_admin)
)).fetchone()
return row[0] if row else False
async def user_permissions(conn: AsyncConnection, user_id: int) -> list[dict]:
"""Kullanıcının sahip olduğu tüm izinleri döner."""
rows = await (await conn.execute(
"""
SELECT DISTINCT p.module, p.action, p.description, pg.is_superuser
FROM user_groups ug
JOIN permission_groups pg ON pg.id = ug.group_id
LEFT JOIN group_permissions gp ON gp.group_id = pg.id
LEFT JOIN permissions p ON p.id = gp.permission_id
WHERE ug.user_id = %s
ORDER BY p.module, p.action
""",
(user_id,)
)).fetchall()
if any(r[3] for r in rows): # süper kullanıcı grubundaysa
return [{"module": "*", "action": "*", "description": "Tüm izinler", "is_superuser": True}]
return [{"module": r[0], "action": r[1], "description": r[2], "is_superuser": False} for r in rows if r[0]]
async def list_permissions(conn: AsyncConnection) -> list[dict]:
rows = await (await conn.execute(
"SELECT id, module, action, description FROM permissions ORDER BY module, action"
)).fetchall()
return [{"id": r[0], "module": r[1], "action": r[2], "description": r[3]} for r in rows]
async def list_groups(conn: AsyncConnection) -> list[dict]:
rows = await (await conn.execute(
"SELECT id, name, description, is_superuser, created_at FROM permission_groups ORDER BY name"
)).fetchall()
return [_group_row(r) for r in rows]
async def get_group(conn: AsyncConnection, group_id: int) -> dict | None:
row = await (await conn.execute(
"SELECT id, name, description, is_superuser, created_at FROM permission_groups WHERE id = %s",
(group_id,)
)).fetchone()
return _group_row(row) if row else None
async def create_group(conn: AsyncConnection, data: PermissionGroupCreate) -> dict:
row = await (await conn.execute(
"INSERT INTO permission_groups (name, description, is_superuser) VALUES (%s, %s, %s) RETURNING id",
(data.name, data.description, data.is_superuser)
)).fetchone()
await conn.commit()
return await get_group(conn, row[0])
async def set_group_permissions(conn: AsyncConnection, group_id: int, data: GroupPermissionAssign):
await conn.execute("DELETE FROM group_permissions WHERE group_id = %s", (group_id,))
for pid in data.permission_ids:
await conn.execute(
"INSERT INTO group_permissions (group_id, permission_id) VALUES (%s, %s) ON CONFLICT DO NOTHING",
(group_id, pid)
)
await conn.commit()
async def group_permissions(conn: AsyncConnection, group_id: int) -> list[dict]:
rows = await (await conn.execute(
"""SELECT p.id, p.module, p.action, p.description
FROM group_permissions gp
JOIN permissions p ON p.id = gp.permission_id
WHERE gp.group_id = %s ORDER BY p.module, p.action""",
(group_id,)
)).fetchall()
return [{"id": r[0], "module": r[1], "action": r[2], "description": r[3]} for r in rows]
async def assign_user_to_group(conn: AsyncConnection, data: UserGroupAssign, granted_by: int):
await conn.execute(
"INSERT INTO user_groups (user_id, group_id, granted_by) VALUES (%s, %s, %s) ON CONFLICT DO NOTHING",
(data.user_id, data.group_id, granted_by)
)
await conn.commit()
async def remove_user_from_group(conn: AsyncConnection, user_id: int, group_id: int):
await conn.execute(
"DELETE FROM user_groups WHERE user_id = %s AND group_id = %s",
(user_id, group_id)
)
await conn.commit()
async def user_groups(conn: AsyncConnection, user_id: int) -> list[dict]:
rows = await (await conn.execute(
"""SELECT pg.id, pg.name, pg.is_superuser, ug.granted_at
FROM user_groups ug
JOIN permission_groups pg ON pg.id = ug.group_id
WHERE ug.user_id = %s ORDER BY pg.name""",
(user_id,)
)).fetchall()
return [{"id": r[0], "name": r[1], "is_superuser": r[2], "granted_at": r[3]} for r in rows]
def _group_row(r) -> dict:
return {"id": r[0], "name": r[1], "description": r[2], "is_superuser": r[3], "created_at": r[4]}