"""Integration tests for the `/ui` router (spec §PR 2). Covers the full round-trip through `POST /ui/jobs` — the handler parses multipart form data into a `RequestIX` and hands it to `ix.store.jobs_repo.insert_pending`, the same entry point the REST adapter uses. Tests assert the job row exists with the right client/request ids and that custom-use-case forms produce a `use_case_inline` block in the stored request JSON. The DB-touching tests depend on the shared integration conftest which spins up migrations against the configured Postgres; the pure-template tests (`GET /ui` and the fragment renderer) still need a factory but won't actually query — they're cheap. """ from __future__ import annotations import json from collections.abc import Iterator from pathlib import Path from uuid import uuid4 import pytest from fastapi.testclient import TestClient from sqlalchemy import select from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine from ix.adapters.rest.routes import Probes, get_probes, get_session_factory_dep from ix.app import create_app from ix.store.models import IxJob FIXTURE_DIR = Path(__file__).resolve().parents[1] / "fixtures" FIXTURE_PDF = FIXTURE_DIR / "synthetic_giro.pdf" def _factory_for_url(postgres_url: str): # type: ignore[no-untyped-def] def _factory(): # type: ignore[no-untyped-def] eng = create_async_engine(postgres_url, pool_pre_ping=True) return async_sessionmaker(eng, expire_on_commit=False) return _factory @pytest.fixture def app(postgres_url: str) -> Iterator[TestClient]: app_obj = create_app(spawn_worker=False) app_obj.dependency_overrides[get_session_factory_dep] = _factory_for_url( postgres_url ) app_obj.dependency_overrides[get_probes] = lambda: Probes( ollama=lambda: "ok", ocr=lambda: "ok" ) with TestClient(app_obj) as client: yield client class TestIndexPage: def test_index_returns_html(self, app: TestClient) -> None: resp = app.get("/ui") assert resp.status_code == 200 assert "text/html" in resp.headers["content-type"] body = resp.text # Dropdown prefilled with the registered use case. assert "bank_statement_header" in body # Marker for the submission form. assert ' None: # StaticFiles returns 404 for the keepfile; the mount itself must # exist so asset URLs resolve. We probe the directory root instead. resp = app.get("/ui/static/.gitkeep") # .gitkeep exists in the repo — expect 200 (or at minimum not a 404 # due to missing mount). A 405/403 would also indicate the mount is # wired; we assert the response is *not* a 404 from a missing route. assert resp.status_code != 404 class TestSubmitJobRegistered: def test_post_registered_use_case_creates_row( self, app: TestClient, postgres_url: str, ) -> None: request_id = f"ui-reg-{uuid4().hex[:8]}" with FIXTURE_PDF.open("rb") as fh: resp = app.post( "/ui/jobs", data={ "use_case_mode": "registered", "use_case_name": "bank_statement_header", "ix_client_id": "ui-test", "request_id": request_id, "texts": "", "use_ocr": "on", "include_provenance": "on", "max_sources_per_field": "10", }, files={"pdf": ("sample.pdf", fh, "application/pdf")}, follow_redirects=False, ) assert resp.status_code in (200, 303), resp.text # Assert the row exists in the DB. job_row = _find_job(postgres_url, "ui-test", request_id) assert job_row is not None assert job_row.status == "pending" assert job_row.request["use_case"] == "bank_statement_header" # Context.files must reference a local file:// path. files = job_row.request["context"]["files"] assert len(files) == 1 entry = files[0] url = entry if isinstance(entry, str) else entry["url"] assert url.startswith("file://") def test_htmx_submit_uses_hx_redirect_header( self, app: TestClient, ) -> None: request_id = f"ui-htmx-{uuid4().hex[:8]}" with FIXTURE_PDF.open("rb") as fh: resp = app.post( "/ui/jobs", data={ "use_case_mode": "registered", "use_case_name": "bank_statement_header", "ix_client_id": "ui-test", "request_id": request_id, }, files={"pdf": ("sample.pdf", fh, "application/pdf")}, headers={"HX-Request": "true"}, follow_redirects=False, ) assert resp.status_code == 200 assert "HX-Redirect" in resp.headers class TestSubmitJobCustom: def test_post_custom_use_case_stores_inline( self, app: TestClient, postgres_url: str, ) -> None: request_id = f"ui-cust-{uuid4().hex[:8]}" fields_json = json.dumps( [ {"name": "vendor", "type": "str", "required": True}, {"name": "total", "type": "decimal"}, ] ) with FIXTURE_PDF.open("rb") as fh: resp = app.post( "/ui/jobs", data={ "use_case_mode": "custom", "use_case_name": "invoice_adhoc", "ix_client_id": "ui-test", "request_id": request_id, "system_prompt": "Extract vendor and total.", "default_model": "qwen3:14b", "fields_json": fields_json, }, files={"pdf": ("sample.pdf", fh, "application/pdf")}, follow_redirects=False, ) assert resp.status_code in (200, 303), resp.text job_row = _find_job(postgres_url, "ui-test", request_id) assert job_row is not None stored = job_row.request["use_case_inline"] assert stored is not None assert stored["use_case_name"] == "invoice_adhoc" assert stored["system_prompt"] == "Extract vendor and total." names = [f["name"] for f in stored["fields"]] assert names == ["vendor", "total"] def test_post_malformed_fields_json_rejected( self, app: TestClient, postgres_url: str, ) -> None: request_id = f"ui-bad-{uuid4().hex[:8]}" with FIXTURE_PDF.open("rb") as fh: resp = app.post( "/ui/jobs", data={ "use_case_mode": "custom", "use_case_name": "adhoc_bad", "ix_client_id": "ui-test", "request_id": request_id, "system_prompt": "p", "fields_json": "this is not json", }, files={"pdf": ("sample.pdf", fh, "application/pdf")}, follow_redirects=False, ) # Either re-rendered form (422 / 200 with error) — what matters is # that no row was inserted. assert resp.status_code in (200, 400, 422) job_row = _find_job(postgres_url, "ui-test", request_id) assert job_row is None # A helpful error should appear somewhere in the body. assert ( "error" in resp.text.lower() or "invalid" in resp.text.lower() or "json" in resp.text.lower() ) class TestFragment: def test_fragment_pending_has_trigger( self, app: TestClient, postgres_url: str, ) -> None: request_id = f"ui-frag-p-{uuid4().hex[:8]}" with FIXTURE_PDF.open("rb") as fh: app.post( "/ui/jobs", data={ "use_case_mode": "registered", "use_case_name": "bank_statement_header", "ix_client_id": "ui-test", "request_id": request_id, }, files={"pdf": ("sample.pdf", fh, "application/pdf")}, follow_redirects=False, ) job_row = _find_job(postgres_url, "ui-test", request_id) assert job_row is not None resp = app.get(f"/ui/jobs/{job_row.job_id}/fragment") assert resp.status_code == 200 body = resp.text # Pending → auto-refresh every 2s. assert "hx-trigger" in body assert "2s" in body assert "pending" in body.lower() or "running" in body.lower() def test_fragment_done_shows_pretty_json( self, app: TestClient, postgres_url: str, ) -> None: request_id = f"ui-frag-d-{uuid4().hex[:8]}" with FIXTURE_PDF.open("rb") as fh: app.post( "/ui/jobs", data={ "use_case_mode": "registered", "use_case_name": "bank_statement_header", "ix_client_id": "ui-test", "request_id": request_id, }, files={"pdf": ("sample.pdf", fh, "application/pdf")}, follow_redirects=False, ) job_row = _find_job(postgres_url, "ui-test", request_id) assert job_row is not None # Hand-tick the row to done with a fake response. _force_done( postgres_url, job_row.job_id, response_body={ "use_case": "bank_statement_header", "ix_result": {"result": {"bank_name": "UBS AG", "currency": "CHF"}}, }, ) resp = app.get(f"/ui/jobs/{job_row.job_id}/fragment") assert resp.status_code == 200 body = resp.text # Terminal → no auto-refresh. assert "every 2s" not in body and "every 2s" not in body # JSON present. assert "UBS AG" in body assert "CHF" in body def _find_job(postgres_url: str, client_id: str, request_id: str): # type: ignore[no-untyped-def] """Look up an ``ix_jobs`` row via the async engine, wrapping the coroutine for test convenience.""" import asyncio import json as _json async def _go(): # type: ignore[no-untyped-def] eng = create_async_engine(postgres_url) sf = async_sessionmaker(eng, expire_on_commit=False) try: async with sf() as session: r = await session.scalar( select(IxJob).where( IxJob.client_id == client_id, IxJob.request_id == request_id, ) ) if r is None: return None class _JobRow: pass out = _JobRow() out.job_id = r.job_id out.client_id = r.client_id out.request_id = r.request_id out.status = r.status if isinstance(r.request, str): out.request = _json.loads(r.request) else: out.request = r.request return out finally: await eng.dispose() return asyncio.run(_go()) def _force_done( postgres_url: str, job_id, # type: ignore[no-untyped-def] response_body: dict, ) -> None: """Flip a pending job to ``done`` with the given response payload.""" import asyncio from datetime import UTC, datetime from sqlalchemy import text async def _go(): # type: ignore[no-untyped-def] eng = create_async_engine(postgres_url) try: async with eng.begin() as conn: await conn.execute( text( "UPDATE ix_jobs SET status='done', " "response=CAST(:resp AS JSONB), finished_at=:now " "WHERE job_id=:jid" ), { "resp": json.dumps(response_body), "now": datetime.now(UTC), "jid": str(job_id), }, ) finally: await eng.dispose() asyncio.run(_go())