infoxtractor/scripts/e2e_smoke.py
Dirk Riemann d0648fe01d
All checks were successful
tests / test (push) Successful in 1m11s
tests / test (pull_request) Successful in 2m14s
feat(e2e): scripts/e2e_smoke.py — live deploy gate
Runs from the Mac after every `git push server main`.

Flow: starts a tiny HTTP server on the Mac's LAN IP serving
tests/fixtures/synthetic_giro.pdf → POST /jobs with bank_statement_header
+ Paperless-style texts so text_agreement has something to check against →
poll GET /jobs/{id} until terminal → assert status=done, bank_name
non-empty, closing_balance.provenance_verified=True, text_agreement=True,
elapsed < 60 s. Non-zero exit blocks the deploy.

Uses only stdlib (http.server, urllib) — no extra deps on the Mac-side,
no test framework overhead.

Task 5.4 of MVP plan.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 12:18:07 +02:00

210 lines
7 KiB
Python
Executable file
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""End-to-end smoke test against the deployed infoxtractor service.
Uploads a synthetic bank-statement fixture, polls for completion, and asserts
the provenance flags per spec §12 E2E. Intended to run from the Mac after
every `git push server main` as the deploy gate.
Prerequisites:
- The service is running and reachable at --base-url (default
http://192.168.68.42:8994).
- The fixture `tests/fixtures/synthetic_giro.pdf` is present.
- The Mac and the server are on the same LAN (the server must be able to
reach the Mac to download the fixture).
Exit codes:
0 all assertions passed within the timeout
1 at least one assertion failed
2 the job never reached a terminal state in time
3 the service was unreachable or returned an unexpected error
Usage:
python scripts/e2e_smoke.py
python scripts/e2e_smoke.py --base-url http://localhost:8994
"""
from __future__ import annotations
import argparse
import http.server
import json
import socket
import socketserver
import sys
import threading
import time
import urllib.error
import urllib.request
import uuid
from pathlib import Path
DEFAULT_BASE_URL = "http://192.168.68.42:8994"
FIXTURE = Path(__file__).parent.parent / "tests" / "fixtures" / "synthetic_giro.pdf"
TIMEOUT_SECONDS = 120
POLL_INTERVAL_SECONDS = 2
def find_lan_ip() -> str:
"""Return the Mac's LAN IP that the server can reach."""
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
# 192.168.68.42 is the server; getting the default route towards it
# yields the NIC with the matching subnet.
s.connect(("192.168.68.42", 80))
return s.getsockname()[0]
finally:
s.close()
def serve_fixture_in_background(fixture: Path) -> tuple[str, threading.Event]:
"""Serve the fixture on a temporary HTTP server; return the URL and a stop event."""
if not fixture.exists():
print(f"FIXTURE MISSING: {fixture}", file=sys.stderr)
sys.exit(3)
directory = fixture.parent
filename = fixture.name
lan_ip = find_lan_ip()
class Handler(http.server.SimpleHTTPRequestHandler):
def __init__(self, *args, **kwargs):
super().__init__(*args, directory=str(directory), **kwargs)
def log_message(self, format: str, *args) -> None: # quiet
pass
# Pick any free port.
httpd = socketserver.TCPServer((lan_ip, 0), Handler)
port = httpd.server_address[1]
url = f"http://{lan_ip}:{port}/{filename}"
stop = threading.Event()
def _serve():
try:
while not stop.is_set():
httpd.handle_request()
finally:
httpd.server_close()
# Run in a thread. Use a loose timeout so handle_request returns when stop is set.
httpd.timeout = 0.5
t = threading.Thread(target=_serve, daemon=True)
t.start()
return url, stop
def post_job(base_url: str, file_url: str, client_id: str, request_id: str) -> dict:
# Include a Paperless-style OCR of the fixture as context.texts so the
# text_agreement cross-check has something to compare against.
paperless_text = (
"DKB\n"
"DE89370400440532013000\n"
"Statement period: 01.03.2026 31.03.2026\n"
"Opening balance: 1234.56 EUR\n"
"Closing balance: 1450.22 EUR\n"
"31.03.2026\n"
)
payload = {
"use_case": "bank_statement_header",
"ix_client_id": client_id,
"request_id": request_id,
"context": {
"files": [file_url],
"texts": [paperless_text],
},
}
req = urllib.request.Request(
f"{base_url}/jobs",
data=json.dumps(payload).encode("utf-8"),
headers={"Content-Type": "application/json"},
method="POST",
)
with urllib.request.urlopen(req, timeout=10) as resp:
return json.loads(resp.read().decode("utf-8"))
def get_job(base_url: str, job_id: str) -> dict:
req = urllib.request.Request(f"{base_url}/jobs/{job_id}")
with urllib.request.urlopen(req, timeout=10) as resp:
return json.loads(resp.read().decode("utf-8"))
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--base-url", default=DEFAULT_BASE_URL)
parser.add_argument("--timeout", type=int, default=TIMEOUT_SECONDS)
args = parser.parse_args()
# Sanity-check the service is up.
try:
with urllib.request.urlopen(f"{args.base_url}/healthz", timeout=5) as resp:
health = json.loads(resp.read().decode("utf-8"))
print(f"healthz: {health}")
except urllib.error.URLError as e:
print(f"service unreachable: {e}", file=sys.stderr)
return 3
fixture_url, stop_server = serve_fixture_in_background(FIXTURE)
print(f"serving fixture at {fixture_url}")
try:
client_id = "e2e_smoke"
request_id = f"smoke-{uuid.uuid4().hex[:8]}"
submit = post_job(args.base_url, fixture_url, client_id, request_id)
job_id = submit["job_id"]
print(f"submitted job_id={job_id}")
started = time.monotonic()
last_status = None
job = None
while time.monotonic() - started < args.timeout:
job = get_job(args.base_url, job_id)
if job["status"] != last_status:
print(f"[{time.monotonic() - started:5.1f}s] status={job['status']}")
last_status = job["status"]
if job["status"] in ("done", "error"):
break
time.sleep(POLL_INTERVAL_SECONDS)
else:
print(f"FAIL: timed out after {args.timeout}s", file=sys.stderr)
return 2
assert job is not None
failed = []
if job["status"] != "done":
failed.append(f"status={job['status']!r} (want 'done')")
response = job.get("response") or {}
if response.get("error"):
failed.append(f"response.error={response['error']!r}")
result = (response.get("ix_result") or {}).get("result") or {}
bank = result.get("bank_name")
if not isinstance(bank, str) or not bank.strip():
failed.append(f"bank_name={bank!r} (want non-empty string)")
fields = (response.get("provenance") or {}).get("fields") or {}
closing = fields.get("result.closing_balance") or {}
if not closing.get("provenance_verified"):
failed.append(f"closing_balance.provenance_verified={closing.get('provenance_verified')!r}")
if closing.get("text_agreement") is not True:
failed.append(f"closing_balance.text_agreement={closing.get('text_agreement')!r} (Paperless-style text submitted)")
elapsed = time.monotonic() - started
if elapsed >= 60:
failed.append(f"elapsed={elapsed:.1f}s (≥ 60s; slow path)")
print(json.dumps(result, indent=2, default=str))
if failed:
print("\n".join(f"FAIL: {f}" for f in failed), file=sys.stderr)
return 1
print(f"\nPASS in {elapsed:.1f}s")
return 0
finally:
stop_server.set()
if __name__ == "__main__":
sys.exit(main())