FastAPI + PostgreSQL 16. KYC, issue sistemi, permission/group yönetimi, session yönetimi, API client auth (kışla kapısı), officials/persons CRUD. Migration 0001–0013 dahil.
111 lines
4.3 KiB
Python
111 lines
4.3 KiB
Python
from psycopg import AsyncConnection
|
||
from mm_api.models.admin_unit import AdminUnitCreate, AdminUnitAssign, AdminUnitClose
|
||
|
||
|
||
async def list_types(conn: AsyncConnection) -> list[dict]:
|
||
rows = await (await conn.execute(
|
||
"SELECT id, name, slug, description FROM administrative_unit_types ORDER BY name"
|
||
)).fetchall()
|
||
return [{"id": r[0], "name": r[1], "slug": r[2], "description": r[3]} for r in rows]
|
||
|
||
|
||
async def list_units(conn: AsyncConnection, type_id: int | None, active_only: bool) -> list[dict]:
|
||
where = []
|
||
params = []
|
||
if type_id:
|
||
where.append("u.type_id = %s")
|
||
params.append(type_id)
|
||
if active_only:
|
||
where.append("u.is_active = TRUE")
|
||
clause = ("WHERE " + " AND ".join(where)) if where else ""
|
||
rows = await (await conn.execute(
|
||
f"""SELECT u.id, u.type_id, t.name, u.name, u.established_at, u.abolished_at, u.is_active
|
||
FROM administrative_units u
|
||
JOIN administrative_unit_types t ON t.id = u.type_id
|
||
{clause}
|
||
ORDER BY u.name""",
|
||
params
|
||
)).fetchall()
|
||
return [_row(r) for r in rows]
|
||
|
||
|
||
async def get_unit(conn: AsyncConnection, unit_id: int) -> dict | None:
|
||
row = await (await conn.execute(
|
||
"""SELECT u.id, u.type_id, t.name, u.name, u.established_at, u.abolished_at, u.is_active
|
||
FROM administrative_units u
|
||
JOIN administrative_unit_types t ON t.id = u.type_id
|
||
WHERE u.id = %s""",
|
||
(unit_id,)
|
||
)).fetchone()
|
||
return _row(row) if row else None
|
||
|
||
|
||
async def create_unit(conn: AsyncConnection, data: AdminUnitCreate) -> dict:
|
||
row = await (await conn.execute(
|
||
"INSERT INTO administrative_units (type_id, name, established_at) VALUES (%s, %s, %s) RETURNING id",
|
||
(data.type_id, data.name, data.established_at)
|
||
)).fetchone()
|
||
await conn.commit()
|
||
return await get_unit(conn, row[0])
|
||
|
||
|
||
async def close_unit(conn: AsyncConnection, unit_id: int, data: AdminUnitClose):
|
||
"""Birimi kapat: abolished_at yaz, bağlı lokasyonların valid_until'ını güncelle."""
|
||
await conn.execute(
|
||
"UPDATE administrative_units SET abolished_at = %s, is_active = FALSE WHERE id = %s",
|
||
(data.abolished_at, unit_id)
|
||
)
|
||
await conn.execute(
|
||
"UPDATE location_administrative_units SET valid_until = %s "
|
||
"WHERE unit_id = %s AND valid_until IS NULL",
|
||
(data.valid_until, unit_id)
|
||
)
|
||
await conn.commit()
|
||
|
||
|
||
async def assign_to_location(conn: AsyncConnection, location_id: int, data: AdminUnitAssign):
|
||
"""Lokasyona idari birim ata."""
|
||
await conn.execute(
|
||
"INSERT INTO location_administrative_units (location_id, unit_id, valid_from, valid_until) "
|
||
"VALUES (%s, %s, %s, %s) ON CONFLICT (location_id, unit_id, valid_from) DO NOTHING",
|
||
(location_id, data.unit_id, data.valid_from, data.valid_until)
|
||
)
|
||
await conn.commit()
|
||
|
||
|
||
async def unassign_from_location(conn: AsyncConnection, location_id: int, unit_id: int, valid_until: str):
|
||
"""Lokasyon-birim bağlantısını kapat."""
|
||
await conn.execute(
|
||
"UPDATE location_administrative_units SET valid_until = %s "
|
||
"WHERE location_id = %s AND unit_id = %s AND valid_until IS NULL",
|
||
(valid_until, location_id, unit_id)
|
||
)
|
||
await conn.commit()
|
||
|
||
|
||
async def location_units(conn: AsyncConnection, location_id: int, active_only: bool) -> list[dict]:
|
||
"""Bir lokasyona bağlı idari birimleri listele."""
|
||
where = "WHERE lau.location_id = %s"
|
||
if active_only:
|
||
where += " AND (lau.valid_until IS NULL OR lau.valid_until > NOW())"
|
||
rows = await (await conn.execute(
|
||
f"""SELECT u.id, u.type_id, t.name, u.name, u.established_at, u.abolished_at, u.is_active,
|
||
lau.valid_from, lau.valid_until
|
||
FROM location_administrative_units lau
|
||
JOIN administrative_units u ON u.id = lau.unit_id
|
||
JOIN administrative_unit_types t ON t.id = u.type_id
|
||
{where}
|
||
ORDER BY lau.valid_from DESC""",
|
||
(location_id,)
|
||
)).fetchall()
|
||
return [
|
||
{**_row(r[:7]), "valid_from": r[7], "valid_until": r[8]}
|
||
for r in rows
|
||
]
|
||
|
||
|
||
def _row(r) -> dict:
|
||
return {
|
||
"id": r[0], "type_id": r[1], "type_name": r[2], "name": r[3],
|
||
"established_at": r[4], "abolished_at": r[5], "is_active": r[6],
|
||
}
|