283 lines
11 KiB
Python
283 lines
11 KiB
Python
import base64
|
|
import json
|
|
import logging
|
|
from datetime import date
|
|
from flask import Blueprint, jsonify, request
|
|
from druppie_sdk import DruppieClient
|
|
from app.database import get_db
|
|
from app.models import Permit
|
|
|
|
logger = logging.getLogger(__name__)
|
|
api = Blueprint("api", __name__)
|
|
druppie = DruppieClient()
|
|
|
|
ALLOWED_EXTENSIONS = {"png", "jpg", "jpeg", "gif", "bmp", "tiff", "webp", "pdf"}
|
|
MAX_FILE_SIZE = 25 * 1024 * 1024
|
|
|
|
# Selectielijst waterschappen 2012 — simplified lookup
|
|
SELECTIELIJST = {
|
|
"watervergunning_lozing": {"nominatie": "vernietigen", "jaren": 20},
|
|
"watervergunning_onttrekking": {"nominatie": "vernietigen", "jaren": 20},
|
|
"keurvergunning": {"nominatie": "vernietigen", "jaren": 10},
|
|
"omgevingsvergunning": {"nominatie": "bewaren", "jaren": None},
|
|
"lozingsvergunning": {"nominatie": "vernietigen", "jaren": 20},
|
|
"onttrekkingsvergunning": {"nominatie": "vernietigen", "jaren": 15},
|
|
"projectplan": {"nominatie": "bewaren", "jaren": None},
|
|
"peilbesluit": {"nominatie": "bewaren", "jaren": None},
|
|
"leggerwijziging": {"nominatie": "bewaren", "jaren": None},
|
|
}
|
|
|
|
METADATA_EXTRACTION_PROMPT = """Analyseer het volgende vergunningdocument en extraheer de metadata.
|
|
Antwoord ALLEEN met een JSON object met deze velden (laat leeg als niet gevonden):
|
|
|
|
{
|
|
"permit_number": "vergunningnummer (K-xxxx, L-xxxx, etc.)",
|
|
"applicant_name": "naam aanvrager",
|
|
"permit_holder_name": "naam vergunninghouder",
|
|
"issuer_name": "naam uitgever/verlener",
|
|
"location": "locatie/adres",
|
|
"issue_date": "uitgiftedatum (YYYY-MM-DD)",
|
|
"expiry_date": "geldigheidsdatum (YYYY-MM-DD)",
|
|
"applicable_law": "toepasselijke wet of regeling",
|
|
"work_type": "type werk/activiteit",
|
|
"water_type": "type oppervlaktewater",
|
|
"embankment_type": "type waterkering",
|
|
"permit_type": "type vergunning (watervergunning_lozing, keurvergunning, etc.)",
|
|
"source_system": "bronsysteem indien herkenbaar"
|
|
}
|
|
|
|
Document tekst:
|
|
"""
|
|
|
|
|
|
def _allowed_file(fn):
|
|
return "." in fn and fn.rsplit(".", 1)[1].lower() in ALLOWED_EXTENSIONS
|
|
|
|
|
|
def _compute_archive_status(permit_type, issue_date_str):
|
|
"""Compute archive nomination, retention, and status per selectielijst."""
|
|
key = (permit_type or "").strip().lower().replace(" ", "_")
|
|
info = SELECTIELIJST.get(key, {"nominatie": "onbekend", "jaren": None})
|
|
nominatie = info["nominatie"]
|
|
jaren = info["jaren"]
|
|
status = "onbekend"
|
|
|
|
if nominatie == "bewaren":
|
|
status = "te bewaren (oneindig)"
|
|
elif nominatie == "vernietigen" and jaren and issue_date_str:
|
|
try:
|
|
issue = date.fromisoformat(issue_date_str)
|
|
destroy_date = issue.replace(year=issue.year + jaren)
|
|
if date.today() >= destroy_date:
|
|
status = "te vernietigen"
|
|
else:
|
|
status = f"te bewaren tot {destroy_date.isoformat()}"
|
|
except (ValueError, OverflowError):
|
|
status = f"te bewaren ({jaren} jaar na uitgifte)"
|
|
elif nominatie == "vernietigen":
|
|
status = f"te bewaren ({jaren} jaar na uitgifte)" if jaren else "onbekend"
|
|
|
|
return nominatie, jaren, status
|
|
|
|
|
|
@api.route("/info")
|
|
def info():
|
|
from app.config import settings
|
|
return jsonify(app_name=settings.app_name)
|
|
|
|
|
|
@api.route("/permits/upload", methods=["POST"])
|
|
def upload_permit():
|
|
"""Upload permit document — OCR + metadata extraction + archive classification."""
|
|
if "file" not in request.files:
|
|
return jsonify(error="Geen bestand geüpload."), 400
|
|
file = request.files["file"]
|
|
if not file.filename or not _allowed_file(file.filename):
|
|
return jsonify(error="Ongeldig bestandstype. Upload JPG, PNG of PDF."), 400
|
|
file_bytes = file.read()
|
|
if len(file_bytes) > MAX_FILE_SIZE:
|
|
return jsonify(error="Bestand te groot (max 25 MB)."), 400
|
|
if not file_bytes:
|
|
return jsonify(error="Leeg bestand."), 400
|
|
|
|
db = next(get_db())
|
|
permit = Permit(
|
|
permit_number=request.form.get("permit_number", "").strip() or None,
|
|
applicant_name=request.form.get("applicant_name", "").strip() or None,
|
|
source_file=file.filename,
|
|
source_system=request.form.get("source_system", "upload").strip(),
|
|
status="processing",
|
|
)
|
|
db.add(permit)
|
|
db.commit()
|
|
db.refresh(permit)
|
|
|
|
ext = file.filename.rsplit(".", 1)[1].lower()
|
|
mime_map = {"jpg": "image/jpeg", "jpeg": "image/jpeg", "png": "image/png",
|
|
"pdf": "application/pdf", "tiff": "image/tiff", "webp": "image/webp"}
|
|
mime = mime_map.get(ext, "application/octet-stream")
|
|
data_uri = f"data:{mime};base64,{base64.b64encode(file_bytes).decode()}"
|
|
|
|
try:
|
|
# Step 1: OCR — extract text from document
|
|
ocr_result = druppie.call("vision", "ocr", {
|
|
"image_source": data_uri,
|
|
"prompt": "Extraheer alle tekst uit dit vergunningdocument. Bewaar de structuur.",
|
|
})
|
|
extracted_text = ocr_result.get("text", "")
|
|
permit.extracted_text = extracted_text
|
|
|
|
# Step 2: LLM — extract structured metadata from text
|
|
meta_result = druppie.call("llm", "chat", {
|
|
"prompt": METADATA_EXTRACTION_PROMPT + extracted_text[:4000],
|
|
"system": "Je bent een metadata-extractie specialist voor Nederlandse watervergunningen. Antwoord ALLEEN met valid JSON.",
|
|
})
|
|
raw_answer = meta_result.get("answer", "{}")
|
|
|
|
# Parse JSON from LLM response (handle markdown code blocks)
|
|
json_str = raw_answer
|
|
if "```" in json_str:
|
|
json_str = json_str.split("```")[1]
|
|
if json_str.startswith("json"):
|
|
json_str = json_str[4:]
|
|
json_str = json_str.strip()
|
|
|
|
try:
|
|
meta = json.loads(json_str)
|
|
except json.JSONDecodeError:
|
|
meta = {}
|
|
logger.warning("Failed to parse metadata JSON: %s", raw_answer[:200])
|
|
|
|
# Apply extracted metadata (LLM fills gaps, user input takes priority)
|
|
permit.permit_number = permit.permit_number or meta.get("permit_number") or None
|
|
permit.applicant_name = permit.applicant_name or meta.get("applicant_name") or None
|
|
permit.permit_holder_name = meta.get("permit_holder_name") or None
|
|
permit.issuer_name = meta.get("issuer_name") or None
|
|
permit.location = meta.get("location") or None
|
|
permit.applicable_law = meta.get("applicable_law") or None
|
|
permit.work_type = meta.get("work_type") or None
|
|
permit.water_type = meta.get("water_type") or None
|
|
permit.embankment_type = meta.get("embankment_type") or None
|
|
permit.permit_type = meta.get("permit_type", "onbekend") or "onbekend"
|
|
permit.source_system = permit.source_system or meta.get("source_system") or "upload"
|
|
|
|
# Parse dates
|
|
for field, key in [("issue_date", "issue_date"), ("expiry_date", "expiry_date")]:
|
|
val = meta.get(key)
|
|
if val:
|
|
try:
|
|
setattr(permit, field, date.fromisoformat(val))
|
|
except (ValueError, TypeError):
|
|
pass
|
|
|
|
# Step 3: Compute archive status (BR-01 to BR-06)
|
|
nominatie, jaren, arch_status = _compute_archive_status(
|
|
permit.permit_type,
|
|
permit.issue_date.isoformat() if permit.issue_date else None,
|
|
)
|
|
permit.archive_nomination = nominatie
|
|
permit.retention_years = jaren
|
|
permit.archive_status = arch_status
|
|
permit.status = "processed"
|
|
|
|
except Exception as e:
|
|
logger.error("Processing failed for permit %s: %s", permit.id, e)
|
|
permit.status = "error"
|
|
permit.error_message = str(e)[:500]
|
|
|
|
db.commit()
|
|
db.refresh(permit)
|
|
return jsonify(_permit_to_dict(permit))
|
|
|
|
|
|
@api.route("/permits")
|
|
def list_permits():
|
|
db = next(get_db())
|
|
permits = db.query(Permit).order_by(Permit.created_at.desc()).all()
|
|
return jsonify([_permit_summary(p) for p in permits])
|
|
|
|
|
|
@api.route("/permits/<int:permit_id>")
|
|
def get_permit(permit_id):
|
|
db = next(get_db())
|
|
permit = db.query(Permit).filter(Permit.id == permit_id).first()
|
|
if not permit:
|
|
return jsonify(error="Niet gevonden"), 404
|
|
return jsonify(_permit_to_dict(permit))
|
|
|
|
|
|
@api.route("/permits/<int:permit_id>", methods=["DELETE"])
|
|
def delete_permit(permit_id):
|
|
db = next(get_db())
|
|
permit = db.query(Permit).filter(Permit.id == permit_id).first()
|
|
if not permit:
|
|
return jsonify(error="Niet gevonden"), 404
|
|
db.delete(permit)
|
|
db.commit()
|
|
return jsonify(ok=True)
|
|
|
|
|
|
@api.route("/permits/search")
|
|
def search_permits():
|
|
"""Search across all metadata fields (FR-01)."""
|
|
q = request.args.get("q", "").strip()
|
|
db = next(get_db())
|
|
if not q:
|
|
return jsonify([_permit_summary(p) for p in db.query(Permit).order_by(Permit.created_at.desc()).all()])
|
|
like = f"%{q}%"
|
|
results = db.query(Permit).filter(
|
|
Permit.permit_number.ilike(like)
|
|
| Permit.applicant_name.ilike(like)
|
|
| Permit.permit_holder_name.ilike(like)
|
|
| Permit.location.ilike(like)
|
|
| Permit.permit_type.ilike(like)
|
|
| Permit.applicable_law.ilike(like)
|
|
| Permit.work_type.ilike(like)
|
|
| Permit.water_type.ilike(like)
|
|
| Permit.extracted_text.ilike(like)
|
|
).order_by(Permit.created_at.desc()).all()
|
|
return jsonify([_permit_summary(p) for p in results])
|
|
|
|
|
|
@api.route("/permits/stats")
|
|
def permit_stats():
|
|
"""Dashboard stats."""
|
|
db = next(get_db())
|
|
total = db.query(Permit).count()
|
|
by_type = {}
|
|
for p in db.query(Permit).all():
|
|
t = (p.permit_type or "onbekend").lower()
|
|
by_type[t] = by_type.get(t, 0) + 1
|
|
by_archive = {}
|
|
for p in db.query(Permit).all():
|
|
s = p.archive_status or "onbekend"
|
|
by_archive[s] = by_archive.get(s, 0) + 1
|
|
return jsonify(total=total, by_type=by_type, by_archive=by_archive)
|
|
|
|
|
|
def _permit_summary(p):
|
|
return {
|
|
"id": p.id, "permit_number": p.permit_number,
|
|
"applicant_name": p.applicant_name, "permit_type": p.permit_type,
|
|
"location": p.location, "source_file": p.source_file,
|
|
"source_system": p.source_system, "status": p.status,
|
|
"archive_status": p.archive_status,
|
|
"issue_date": p.issue_date.isoformat() if p.issue_date else None,
|
|
"upload_date": p.upload_date.isoformat() if p.upload_date else None,
|
|
}
|
|
|
|
|
|
def _permit_to_dict(p):
|
|
return {
|
|
**_permit_summary(p),
|
|
"permit_holder_name": p.permit_holder_name,
|
|
"issuer_name": p.issuer_name,
|
|
"expiry_date": p.expiry_date.isoformat() if p.expiry_date else None,
|
|
"applicable_law": p.applicable_law,
|
|
"work_type": p.work_type, "water_type": p.water_type,
|
|
"embankment_type": p.embankment_type,
|
|
"archive_nomination": p.archive_nomination,
|
|
"retention_years": p.retention_years,
|
|
"extracted_text": p.extracted_text, "error_message": p.error_message,
|
|
}
|