from psycopg import AsyncConnection from mm_api.models.admin_unit import AdminUnitCreate, AdminUnitAssign, AdminUnitClose async def list_types(conn: AsyncConnection) -> list[dict]: rows = await (await conn.execute( "SELECT id, name, slug, description FROM administrative_unit_types ORDER BY name" )).fetchall() return [{"id": r[0], "name": r[1], "slug": r[2], "description": r[3]} for r in rows] async def list_units(conn: AsyncConnection, type_id: int | None, active_only: bool) -> list[dict]: where = [] params = [] if type_id: where.append("u.type_id = %s") params.append(type_id) if active_only: where.append("u.is_active = TRUE") clause = ("WHERE " + " AND ".join(where)) if where else "" rows = await (await conn.execute( f"""SELECT u.id, u.type_id, t.name, u.name, u.established_at, u.abolished_at, u.is_active FROM administrative_units u JOIN administrative_unit_types t ON t.id = u.type_id {clause} ORDER BY u.name""", params )).fetchall() return [_row(r) for r in rows] async def get_unit(conn: AsyncConnection, unit_id: int) -> dict | None: row = await (await conn.execute( """SELECT u.id, u.type_id, t.name, u.name, u.established_at, u.abolished_at, u.is_active FROM administrative_units u JOIN administrative_unit_types t ON t.id = u.type_id WHERE u.id = %s""", (unit_id,) )).fetchone() return _row(row) if row else None async def create_unit(conn: AsyncConnection, data: AdminUnitCreate) -> dict: row = await (await conn.execute( "INSERT INTO administrative_units (type_id, name, established_at) VALUES (%s, %s, %s) RETURNING id", (data.type_id, data.name, data.established_at) )).fetchone() await conn.commit() return await get_unit(conn, row[0]) async def close_unit(conn: AsyncConnection, unit_id: int, data: AdminUnitClose): """Birimi kapat: abolished_at yaz, bağlı lokasyonların valid_until'ını güncelle.""" await conn.execute( "UPDATE administrative_units SET abolished_at = %s, is_active = FALSE WHERE id = %s", (data.abolished_at, unit_id) ) await conn.execute( "UPDATE location_administrative_units SET valid_until = %s " "WHERE unit_id = %s AND valid_until IS NULL", (data.valid_until, unit_id) ) await conn.commit() async def assign_to_location(conn: AsyncConnection, location_id: int, data: AdminUnitAssign): """Lokasyona idari birim ata.""" await conn.execute( "INSERT INTO location_administrative_units (location_id, unit_id, valid_from, valid_until) " "VALUES (%s, %s, %s, %s) ON CONFLICT (location_id, unit_id, valid_from) DO NOTHING", (location_id, data.unit_id, data.valid_from, data.valid_until) ) await conn.commit() async def unassign_from_location(conn: AsyncConnection, location_id: int, unit_id: int, valid_until: str): """Lokasyon-birim bağlantısını kapat.""" await conn.execute( "UPDATE location_administrative_units SET valid_until = %s " "WHERE location_id = %s AND unit_id = %s AND valid_until IS NULL", (valid_until, location_id, unit_id) ) await conn.commit() async def location_units(conn: AsyncConnection, location_id: int, active_only: bool) -> list[dict]: """Bir lokasyona bağlı idari birimleri listele.""" where = "WHERE lau.location_id = %s" if active_only: where += " AND (lau.valid_until IS NULL OR lau.valid_until > NOW())" rows = await (await conn.execute( f"""SELECT u.id, u.type_id, t.name, u.name, u.established_at, u.abolished_at, u.is_active, lau.valid_from, lau.valid_until FROM location_administrative_units lau JOIN administrative_units u ON u.id = lau.unit_id JOIN administrative_unit_types t ON t.id = u.type_id {where} ORDER BY lau.valid_from DESC""", (location_id,) )).fetchall() return [ {**_row(r[:7]), "valid_from": r[7], "valid_until": r[8]} for r in rows ] def _row(r) -> dict: return { "id": r[0], "type_id": r[1], "type_name": r[2], "name": r[3], "established_at": r[4], "abolished_at": r[5], "is_active": r[6], }