memleketmeselesi/mm_api/services/location.py
Mukan Erkin 2498e75594 init: memleketmeselesi platform — API + migrations
FastAPI + PostgreSQL 16. KYC, issue sistemi, permission/group yönetimi,
session yönetimi, API client auth (kışla kapısı), officials/persons CRUD.
Migration 0001–0013 dahil.
2026-04-27 23:06:59 +03:00

124 lines
4.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Lokasyon iş mantığı — her operasyon history snapshot'ı atar ve
ilgili değişiklik tablolarını otomatik günceller.
"""
import json
from datetime import datetime
from psycopg import AsyncConnection
from mm_api.models.location import LocationCreate, LocationRetype, LocationReparent, LocationSplitFrom, LocationMerge
async def _snapshot(conn: AsyncConnection, location_id: int, change_reason: str, changed_by: int | None):
row = await (await conn.execute(
"SELECT id, parent_id, name, slug, type, latitude, longitude, is_active, created_at, updated_at "
"FROM locations WHERE id = %s",
(location_id,)
)).fetchone()
if not row:
return
snap = {
"id": row[0], "parent_id": row[1], "name": row[2], "slug": row[3],
"type": row[4], "latitude": row[5], "longitude": row[6],
"is_active": row[7],
}
await conn.execute(
"INSERT INTO location_history (location_id, snapshot, change_reason, changed_by) VALUES (%s, %s, %s, %s)",
(location_id, json.dumps(snap), change_reason, changed_by)
)
async def create(conn: AsyncConnection, data: LocationCreate) -> dict:
row = await (await conn.execute(
"INSERT INTO locations (parent_id, name, slug, type, latitude, longitude) "
"VALUES (%s, %s, %s, %s, %s, %s) RETURNING *",
(data.parent_id, data.name, data.slug, data.type.value, data.latitude, data.longitude)
)).fetchone()
await conn.commit()
return _row_to_dict(row)
async def get(conn: AsyncConnection, location_id: int) -> dict | None:
row = await (await conn.execute(
"SELECT * FROM locations WHERE id = %s", (location_id,)
)).fetchone()
return _row_to_dict(row) if row else None
async def list_children(conn: AsyncConnection, parent_id: int | None) -> list[dict]:
if parent_id is None:
rows = await (await conn.execute(
"SELECT * FROM locations WHERE parent_id IS NULL ORDER BY name"
)).fetchall()
else:
rows = await (await conn.execute(
"SELECT * FROM locations WHERE parent_id = %s ORDER BY name", (parent_id,)
)).fetchall()
return [_row_to_dict(r) for r in rows]
async def retype(conn: AsyncConnection, location_id: int, data: LocationRetype):
await _snapshot(conn, location_id, data.change_reason, data.changed_by)
await conn.execute(
"UPDATE locations SET type = %s, updated_at = NOW() WHERE id = %s",
(data.new_type.value, location_id)
)
await conn.commit()
async def reparent(conn: AsyncConnection, location_id: int, data: LocationReparent):
await _snapshot(conn, location_id, data.change_reason, data.changed_by)
await conn.execute(
"UPDATE locations SET parent_id = %s, updated_at = NOW() WHERE id = %s",
(data.new_parent_id, location_id)
)
await conn.commit()
async def add_split_from(conn: AsyncConnection, location_id: int, data: LocationSplitFrom):
await conn.execute(
"INSERT INTO location_splits (location_id, source_id, effective_at, notes) "
"VALUES (%s, %s, %s, %s) ON CONFLICT DO NOTHING",
(location_id, data.source_id, data.effective_at, data.notes)
)
await conn.commit()
async def merge(conn: AsyncConnection, data: LocationMerge):
"""
source_ids içindeki lokasyonları pasifleştirir, target_id'ye birleşme kaydı ekler,
her kaynak için location_history snapshot'ı atar.
"""
for src_id in data.source_ids:
await _snapshot(conn, src_id, data.change_reason, data.changed_by)
await conn.execute(
"UPDATE locations SET is_active = FALSE, updated_at = NOW() WHERE id = %s",
(src_id,)
)
await conn.execute(
"INSERT INTO location_merges (source_id, target_id, effective_at, notes) "
"VALUES (%s, %s, %s, %s) ON CONFLICT DO NOTHING",
(src_id, data.target_id, data.effective_at, data.notes)
)
await conn.commit()
async def history(conn: AsyncConnection, location_id: int) -> list[dict]:
rows = await (await conn.execute(
"SELECT id, location_id, snapshot, change_reason, changed_by, changed_at "
"FROM location_history WHERE location_id = %s ORDER BY changed_at",
(location_id,)
)).fetchall()
return [
{"id": r[0], "location_id": r[1], "snapshot": r[2],
"change_reason": r[3], "changed_by": r[4], "changed_at": r[5]}
for r in rows
]
def _row_to_dict(row) -> dict:
return {
"id": row[0], "parent_id": row[1], "name": row[2], "slug": row[3],
"type": row[4], "latitude": row[5], "longitude": row[6],
"is_active": row[7], "created_at": row[8], "updated_at": row[9],
}