The home server's Ollama doesn't have gpt-oss:20b pulled; qwen3:14b is already there and is what mammon's chat agent uses. Switching the default now so the first deploy passes the /healthz ollama probe without an extra `ollama pull` step. The spec lists gpt-oss:20b as a concrete example; qwen3:14b is equally on-prem and Ollama-structured-output-compatible. Touched: AppConfig default, BankStatementHeader Request.default_model, .env.example, setup_server.sh ollama-list check, AGENTS.md, deployment.md, live tests. Unit tests that hard-coded the old model string but don't assert the default were left alone. Also: ASCII en-dash in e2e_smoke.py Paperless-style text (ruff RUF001). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
210 lines
7 KiB
Python
Executable file
210 lines
7 KiB
Python
Executable file
"""End-to-end smoke test against the deployed infoxtractor service.
|
|
|
|
Uploads a synthetic bank-statement fixture, polls for completion, and asserts
|
|
the provenance flags per spec §12 E2E. Intended to run from the Mac after
|
|
every `git push server main` as the deploy gate.
|
|
|
|
Prerequisites:
|
|
- The service is running and reachable at --base-url (default
|
|
http://192.168.68.42:8994).
|
|
- The fixture `tests/fixtures/synthetic_giro.pdf` is present.
|
|
- The Mac and the server are on the same LAN (the server must be able to
|
|
reach the Mac to download the fixture).
|
|
|
|
Exit codes:
|
|
0 all assertions passed within the timeout
|
|
1 at least one assertion failed
|
|
2 the job never reached a terminal state in time
|
|
3 the service was unreachable or returned an unexpected error
|
|
|
|
Usage:
|
|
python scripts/e2e_smoke.py
|
|
python scripts/e2e_smoke.py --base-url http://localhost:8994
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import http.server
|
|
import json
|
|
import socket
|
|
import socketserver
|
|
import sys
|
|
import threading
|
|
import time
|
|
import urllib.error
|
|
import urllib.request
|
|
import uuid
|
|
from pathlib import Path
|
|
|
|
DEFAULT_BASE_URL = "http://192.168.68.42:8994"
|
|
FIXTURE = Path(__file__).parent.parent / "tests" / "fixtures" / "synthetic_giro.pdf"
|
|
TIMEOUT_SECONDS = 120
|
|
POLL_INTERVAL_SECONDS = 2
|
|
|
|
|
|
def find_lan_ip() -> str:
|
|
"""Return the Mac's LAN IP that the server can reach."""
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
try:
|
|
# 192.168.68.42 is the server; getting the default route towards it
|
|
# yields the NIC with the matching subnet.
|
|
s.connect(("192.168.68.42", 80))
|
|
return s.getsockname()[0]
|
|
finally:
|
|
s.close()
|
|
|
|
|
|
def serve_fixture_in_background(fixture: Path) -> tuple[str, threading.Event]:
|
|
"""Serve the fixture on a temporary HTTP server; return the URL and a stop event."""
|
|
if not fixture.exists():
|
|
print(f"FIXTURE MISSING: {fixture}", file=sys.stderr)
|
|
sys.exit(3)
|
|
|
|
directory = fixture.parent
|
|
filename = fixture.name
|
|
lan_ip = find_lan_ip()
|
|
|
|
class Handler(http.server.SimpleHTTPRequestHandler):
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, directory=str(directory), **kwargs)
|
|
|
|
def log_message(self, format: str, *args) -> None: # quiet
|
|
pass
|
|
|
|
# Pick any free port.
|
|
httpd = socketserver.TCPServer((lan_ip, 0), Handler)
|
|
port = httpd.server_address[1]
|
|
url = f"http://{lan_ip}:{port}/{filename}"
|
|
stop = threading.Event()
|
|
|
|
def _serve():
|
|
try:
|
|
while not stop.is_set():
|
|
httpd.handle_request()
|
|
finally:
|
|
httpd.server_close()
|
|
|
|
# Run in a thread. Use a loose timeout so handle_request returns when stop is set.
|
|
httpd.timeout = 0.5
|
|
t = threading.Thread(target=_serve, daemon=True)
|
|
t.start()
|
|
return url, stop
|
|
|
|
|
|
def post_job(base_url: str, file_url: str, client_id: str, request_id: str) -> dict:
|
|
# Include a Paperless-style OCR of the fixture as context.texts so the
|
|
# text_agreement cross-check has something to compare against.
|
|
paperless_text = (
|
|
"DKB\n"
|
|
"DE89370400440532013000\n"
|
|
"Statement period: 01.03.2026 - 31.03.2026\n"
|
|
"Opening balance: 1234.56 EUR\n"
|
|
"Closing balance: 1450.22 EUR\n"
|
|
"31.03.2026\n"
|
|
)
|
|
payload = {
|
|
"use_case": "bank_statement_header",
|
|
"ix_client_id": client_id,
|
|
"request_id": request_id,
|
|
"context": {
|
|
"files": [file_url],
|
|
"texts": [paperless_text],
|
|
},
|
|
}
|
|
req = urllib.request.Request(
|
|
f"{base_url}/jobs",
|
|
data=json.dumps(payload).encode("utf-8"),
|
|
headers={"Content-Type": "application/json"},
|
|
method="POST",
|
|
)
|
|
with urllib.request.urlopen(req, timeout=10) as resp:
|
|
return json.loads(resp.read().decode("utf-8"))
|
|
|
|
|
|
def get_job(base_url: str, job_id: str) -> dict:
|
|
req = urllib.request.Request(f"{base_url}/jobs/{job_id}")
|
|
with urllib.request.urlopen(req, timeout=10) as resp:
|
|
return json.loads(resp.read().decode("utf-8"))
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(description=__doc__)
|
|
parser.add_argument("--base-url", default=DEFAULT_BASE_URL)
|
|
parser.add_argument("--timeout", type=int, default=TIMEOUT_SECONDS)
|
|
args = parser.parse_args()
|
|
|
|
# Sanity-check the service is up.
|
|
try:
|
|
with urllib.request.urlopen(f"{args.base_url}/healthz", timeout=5) as resp:
|
|
health = json.loads(resp.read().decode("utf-8"))
|
|
print(f"healthz: {health}")
|
|
except urllib.error.URLError as e:
|
|
print(f"service unreachable: {e}", file=sys.stderr)
|
|
return 3
|
|
|
|
fixture_url, stop_server = serve_fixture_in_background(FIXTURE)
|
|
print(f"serving fixture at {fixture_url}")
|
|
|
|
try:
|
|
client_id = "e2e_smoke"
|
|
request_id = f"smoke-{uuid.uuid4().hex[:8]}"
|
|
submit = post_job(args.base_url, fixture_url, client_id, request_id)
|
|
job_id = submit["job_id"]
|
|
print(f"submitted job_id={job_id}")
|
|
|
|
started = time.monotonic()
|
|
last_status = None
|
|
job = None
|
|
while time.monotonic() - started < args.timeout:
|
|
job = get_job(args.base_url, job_id)
|
|
if job["status"] != last_status:
|
|
print(f"[{time.monotonic() - started:5.1f}s] status={job['status']}")
|
|
last_status = job["status"]
|
|
if job["status"] in ("done", "error"):
|
|
break
|
|
time.sleep(POLL_INTERVAL_SECONDS)
|
|
else:
|
|
print(f"FAIL: timed out after {args.timeout}s", file=sys.stderr)
|
|
return 2
|
|
|
|
assert job is not None
|
|
failed = []
|
|
|
|
if job["status"] != "done":
|
|
failed.append(f"status={job['status']!r} (want 'done')")
|
|
|
|
response = job.get("response") or {}
|
|
if response.get("error"):
|
|
failed.append(f"response.error={response['error']!r}")
|
|
|
|
result = (response.get("ix_result") or {}).get("result") or {}
|
|
bank = result.get("bank_name")
|
|
if not isinstance(bank, str) or not bank.strip():
|
|
failed.append(f"bank_name={bank!r} (want non-empty string)")
|
|
|
|
fields = (response.get("provenance") or {}).get("fields") or {}
|
|
closing = fields.get("result.closing_balance") or {}
|
|
if not closing.get("provenance_verified"):
|
|
failed.append(f"closing_balance.provenance_verified={closing.get('provenance_verified')!r}")
|
|
if closing.get("text_agreement") is not True:
|
|
failed.append(f"closing_balance.text_agreement={closing.get('text_agreement')!r} (Paperless-style text submitted)")
|
|
|
|
elapsed = time.monotonic() - started
|
|
if elapsed >= 60:
|
|
failed.append(f"elapsed={elapsed:.1f}s (≥ 60s; slow path)")
|
|
|
|
print(json.dumps(result, indent=2, default=str))
|
|
|
|
if failed:
|
|
print("\n".join(f"FAIL: {f}" for f in failed), file=sys.stderr)
|
|
return 1
|
|
|
|
print(f"\nPASS in {elapsed:.1f}s")
|
|
return 0
|
|
finally:
|
|
stop_server.set()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|