memleketmeselesi/mm_api/services/official.py
Mukan Erkin 2498e75594 init: memleketmeselesi platform — API + migrations
FastAPI + PostgreSQL 16. KYC, issue sistemi, permission/group yönetimi,
session yönetimi, API client auth (kışla kapısı), officials/persons CRUD.
Migration 0001–0013 dahil.
2026-04-27 23:06:59 +03:00

188 lines
7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from psycopg import AsyncConnection
# --- Persons ---
async def list_persons(conn: AsyncConnection, q: str | None = None, limit: int = 20, offset: int = 0) -> list[dict]:
if q:
rows = await (await conn.execute(
"""SELECT id, first_name, last_name, birth_year, user_id
FROM persons
WHERE first_name ILIKE %s OR last_name ILIKE %s
ORDER BY last_name, first_name LIMIT %s OFFSET %s""",
(f"%{q}%", f"%{q}%", limit, offset)
)).fetchall()
else:
rows = await (await conn.execute(
"""SELECT id, first_name, last_name, birth_year, user_id
FROM persons ORDER BY last_name, first_name LIMIT %s OFFSET %s""",
(limit, offset)
)).fetchall()
return [_person_row(r) for r in rows]
async def get_person(conn: AsyncConnection, person_id: int) -> dict | None:
row = await (await conn.execute(
"SELECT id, first_name, last_name, birth_year, user_id FROM persons WHERE id = %s",
(person_id,)
)).fetchone()
return _person_row(row) if row else None
async def create_person(conn: AsyncConnection, first_name: str, last_name: str, birth_year: int | None) -> dict:
row = await (await conn.execute(
"INSERT INTO persons (first_name, last_name, birth_year) VALUES (%s, %s, %s) RETURNING id",
(first_name, last_name, birth_year)
)).fetchone()
await conn.commit()
return await get_person(conn, row[0])
async def update_person(conn: AsyncConnection, person_id: int, first_name: str | None, last_name: str | None, birth_year: int | None) -> dict:
updates, params = [], []
if first_name is not None:
updates.append("first_name = %s"); params.append(first_name)
if last_name is not None:
updates.append("last_name = %s"); params.append(last_name)
if birth_year is not None:
updates.append("birth_year = %s"); params.append(birth_year)
if not updates:
raise ValueError("Güncellenecek alan yok")
params.append(person_id)
await conn.execute(f"UPDATE persons SET {', '.join(updates)} WHERE id = %s", params)
await conn.commit()
return await get_person(conn, person_id)
# --- Elections ---
async def list_elections(conn: AsyncConnection) -> list[dict]:
rows = await (await conn.execute(
"SELECT id, name, held_at, type FROM elections ORDER BY held_at DESC"
)).fetchall()
return [{"id": r[0], "name": r[1], "held_at": r[2], "type": r[3]} for r in rows]
async def create_election(conn: AsyncConnection, name: str, held_at: str, type_: str) -> dict:
row = await (await conn.execute(
"INSERT INTO elections (name, held_at, type) VALUES (%s, %s, %s) RETURNING id, name, held_at, type",
(name, held_at, type_)
)).fetchone()
await conn.commit()
return {"id": row[0], "name": row[1], "held_at": row[2], "type": row[3]}
# --- Officials ---
async def list_officials(
conn: AsyncConnection,
person_id: int | None = None,
unit_id: int | None = None,
active_only: bool = False,
limit: int = 20,
offset: int = 0,
) -> list[dict]:
filters, params = [], []
if person_id:
filters.append("o.person_id = %s"); params.append(person_id)
if unit_id:
filters.append("o.unit_id = %s"); params.append(unit_id)
if active_only:
filters.append("o.ended_at IS NULL")
where = ("WHERE " + " AND ".join(filters)) if filters else ""
params += [limit, offset]
rows = await (await conn.execute(
f"""SELECT o.id, o.person_id, p.first_name, p.last_name,
o.unit_id, au.name AS unit_name,
o.title, o.started_at, o.ended_at, o.election_id
FROM officials o
JOIN persons p ON p.id = o.person_id
JOIN administrative_units au ON au.id = o.unit_id
{where}
ORDER BY o.started_at DESC LIMIT %s OFFSET %s""",
params
)).fetchall()
return [_official_row(r) for r in rows]
async def get_official(conn: AsyncConnection, official_id: int) -> dict | None:
row = await (await conn.execute(
"""SELECT o.id, o.person_id, p.first_name, p.last_name,
o.unit_id, au.name AS unit_name,
o.title, o.started_at, o.ended_at, o.election_id
FROM officials o
JOIN persons p ON p.id = o.person_id
JOIN administrative_units au ON au.id = o.unit_id
WHERE o.id = %s""",
(official_id,)
)).fetchone()
return _official_row(row) if row else None
async def create_official(
conn: AsyncConnection,
person_id: int,
unit_id: int,
title: str,
started_at: str,
election_id: int | None,
) -> dict:
person = await (await conn.execute("SELECT id FROM persons WHERE id = %s", (person_id,))).fetchone()
if not person:
raise ValueError("Kişi bulunamadı")
unit = await (await conn.execute("SELECT id FROM administrative_units WHERE id = %s", (unit_id,))).fetchone()
if not unit:
raise ValueError("İdari birim bulunamadı")
row = await (await conn.execute(
"""INSERT INTO officials (person_id, unit_id, title, started_at, election_id)
VALUES (%s, %s, %s, %s, %s) RETURNING id""",
(person_id, unit_id, title, started_at, election_id)
)).fetchone()
await conn.commit()
return await get_official(conn, row[0])
async def close_official(conn: AsyncConnection, official_id: int, ended_at: str) -> dict:
row = await (await conn.execute("SELECT id, ended_at FROM officials WHERE id = %s", (official_id,))).fetchone()
if not row:
raise ValueError("Yetkili kaydı bulunamadı")
if row[1] is not None:
raise ValueError("Bu görev zaten kapatılmış")
await conn.execute("UPDATE officials SET ended_at = %s WHERE id = %s", (ended_at, official_id))
await conn.commit()
return await get_official(conn, official_id)
async def get_person_officials(conn: AsyncConnection, person_id: int) -> list[dict]:
return await list_officials(conn, person_id=person_id, limit=100)
async def get_official_stats(conn: AsyncConnection, official_id: int) -> list[dict]:
rows = await (await conn.execute(
"""SELECT period_start, period_end, open_at_start, new_during,
resolved, rejected, score, computed_at
FROM official_stats WHERE official_id = %s ORDER BY period_start DESC""",
(official_id,)
)).fetchall()
return [
{"period_start": r[0], "period_end": r[1], "open_at_start": r[2],
"new_during": r[3], "resolved": r[4], "rejected": r[5],
"score": float(r[6]) if r[6] else None, "computed_at": r[7]}
for r in rows
]
def _person_row(r) -> dict:
return {"id": r[0], "first_name": r[1], "last_name": r[2], "birth_year": r[3], "user_id": r[4]}
def _official_row(r) -> dict:
return {
"id": r[0], "person_id": r[1],
"person_name": f"{r[2]} {r[3]}",
"unit_id": r[4], "unit_name": r[5],
"title": r[6], "started_at": r[7],
"ended_at": r[8], "election_id": r[9],
"is_active": r[8] is None,
}