feat(e2e): e2e_smoke.py deploy gate (#29)
Some checks are pending
tests / test (push) Waiting to run

Lands Task 5.4.
This commit is contained in:
goldstein 2026-04-18 10:18:23 +00:00
commit f6cc99f062
2 changed files with 222 additions and 0 deletions

View file

@ -79,6 +79,18 @@ _(fill in after running — timestamps, commit sha, e2e_smoke output)_
- **`e2e_smoke.py` status:** TBD
- **Notes:**
## E2E smoke test (`scripts/e2e_smoke.py`)
What it does (from the Mac):
1. Checks `/healthz`.
2. Starts a tiny HTTP server on the Mac's LAN IP serving `tests/fixtures/synthetic_giro.pdf`.
3. Submits a `POST /jobs` with `use_case=bank_statement_header`, the fixture URL in `context.files`, and a Paperless-style OCR text in `context.texts` (to exercise the `text_agreement` cross-check).
4. Polls `GET /jobs/{id}` every 2 s until terminal or 120 s timeout.
5. Asserts: `status=="done"`, `bank_name` non-empty, `provenance.fields["result.closing_balance"].provenance_verified=True`, `text_agreement=True`, total elapsed `< 60s`.
Non-zero exit means the deploy is not healthy. Roll back via `git revert HEAD`.
## Operational checklists
### After `ollama pull` on the host

210
scripts/e2e_smoke.py Executable file
View file

@ -0,0 +1,210 @@
"""End-to-end smoke test against the deployed infoxtractor service.
Uploads a synthetic bank-statement fixture, polls for completion, and asserts
the provenance flags per spec §12 E2E. Intended to run from the Mac after
every `git push server main` as the deploy gate.
Prerequisites:
- The service is running and reachable at --base-url (default
http://192.168.68.42:8994).
- The fixture `tests/fixtures/synthetic_giro.pdf` is present.
- The Mac and the server are on the same LAN (the server must be able to
reach the Mac to download the fixture).
Exit codes:
0 all assertions passed within the timeout
1 at least one assertion failed
2 the job never reached a terminal state in time
3 the service was unreachable or returned an unexpected error
Usage:
python scripts/e2e_smoke.py
python scripts/e2e_smoke.py --base-url http://localhost:8994
"""
from __future__ import annotations
import argparse
import http.server
import json
import socket
import socketserver
import sys
import threading
import time
import urllib.error
import urllib.request
import uuid
from pathlib import Path
DEFAULT_BASE_URL = "http://192.168.68.42:8994"
FIXTURE = Path(__file__).parent.parent / "tests" / "fixtures" / "synthetic_giro.pdf"
TIMEOUT_SECONDS = 120
POLL_INTERVAL_SECONDS = 2
def find_lan_ip() -> str:
"""Return the Mac's LAN IP that the server can reach."""
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
# 192.168.68.42 is the server; getting the default route towards it
# yields the NIC with the matching subnet.
s.connect(("192.168.68.42", 80))
return s.getsockname()[0]
finally:
s.close()
def serve_fixture_in_background(fixture: Path) -> tuple[str, threading.Event]:
"""Serve the fixture on a temporary HTTP server; return the URL and a stop event."""
if not fixture.exists():
print(f"FIXTURE MISSING: {fixture}", file=sys.stderr)
sys.exit(3)
directory = fixture.parent
filename = fixture.name
lan_ip = find_lan_ip()
class Handler(http.server.SimpleHTTPRequestHandler):
def __init__(self, *args, **kwargs):
super().__init__(*args, directory=str(directory), **kwargs)
def log_message(self, format: str, *args) -> None: # quiet
pass
# Pick any free port.
httpd = socketserver.TCPServer((lan_ip, 0), Handler)
port = httpd.server_address[1]
url = f"http://{lan_ip}:{port}/{filename}"
stop = threading.Event()
def _serve():
try:
while not stop.is_set():
httpd.handle_request()
finally:
httpd.server_close()
# Run in a thread. Use a loose timeout so handle_request returns when stop is set.
httpd.timeout = 0.5
t = threading.Thread(target=_serve, daemon=True)
t.start()
return url, stop
def post_job(base_url: str, file_url: str, client_id: str, request_id: str) -> dict:
# Include a Paperless-style OCR of the fixture as context.texts so the
# text_agreement cross-check has something to compare against.
paperless_text = (
"DKB\n"
"DE89370400440532013000\n"
"Statement period: 01.03.2026 31.03.2026\n"
"Opening balance: 1234.56 EUR\n"
"Closing balance: 1450.22 EUR\n"
"31.03.2026\n"
)
payload = {
"use_case": "bank_statement_header",
"ix_client_id": client_id,
"request_id": request_id,
"context": {
"files": [file_url],
"texts": [paperless_text],
},
}
req = urllib.request.Request(
f"{base_url}/jobs",
data=json.dumps(payload).encode("utf-8"),
headers={"Content-Type": "application/json"},
method="POST",
)
with urllib.request.urlopen(req, timeout=10) as resp:
return json.loads(resp.read().decode("utf-8"))
def get_job(base_url: str, job_id: str) -> dict:
req = urllib.request.Request(f"{base_url}/jobs/{job_id}")
with urllib.request.urlopen(req, timeout=10) as resp:
return json.loads(resp.read().decode("utf-8"))
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--base-url", default=DEFAULT_BASE_URL)
parser.add_argument("--timeout", type=int, default=TIMEOUT_SECONDS)
args = parser.parse_args()
# Sanity-check the service is up.
try:
with urllib.request.urlopen(f"{args.base_url}/healthz", timeout=5) as resp:
health = json.loads(resp.read().decode("utf-8"))
print(f"healthz: {health}")
except urllib.error.URLError as e:
print(f"service unreachable: {e}", file=sys.stderr)
return 3
fixture_url, stop_server = serve_fixture_in_background(FIXTURE)
print(f"serving fixture at {fixture_url}")
try:
client_id = "e2e_smoke"
request_id = f"smoke-{uuid.uuid4().hex[:8]}"
submit = post_job(args.base_url, fixture_url, client_id, request_id)
job_id = submit["job_id"]
print(f"submitted job_id={job_id}")
started = time.monotonic()
last_status = None
job = None
while time.monotonic() - started < args.timeout:
job = get_job(args.base_url, job_id)
if job["status"] != last_status:
print(f"[{time.monotonic() - started:5.1f}s] status={job['status']}")
last_status = job["status"]
if job["status"] in ("done", "error"):
break
time.sleep(POLL_INTERVAL_SECONDS)
else:
print(f"FAIL: timed out after {args.timeout}s", file=sys.stderr)
return 2
assert job is not None
failed = []
if job["status"] != "done":
failed.append(f"status={job['status']!r} (want 'done')")
response = job.get("response") or {}
if response.get("error"):
failed.append(f"response.error={response['error']!r}")
result = (response.get("ix_result") or {}).get("result") or {}
bank = result.get("bank_name")
if not isinstance(bank, str) or not bank.strip():
failed.append(f"bank_name={bank!r} (want non-empty string)")
fields = (response.get("provenance") or {}).get("fields") or {}
closing = fields.get("result.closing_balance") or {}
if not closing.get("provenance_verified"):
failed.append(f"closing_balance.provenance_verified={closing.get('provenance_verified')!r}")
if closing.get("text_agreement") is not True:
failed.append(f"closing_balance.text_agreement={closing.get('text_agreement')!r} (Paperless-style text submitted)")
elapsed = time.monotonic() - started
if elapsed >= 60:
failed.append(f"elapsed={elapsed:.1f}s (≥ 60s; slow path)")
print(json.dumps(result, indent=2, default=str))
if failed:
print("\n".join(f"FAIL: {f}" for f in failed), file=sys.stderr)
return 1
print(f"\nPASS in {elapsed:.1f}s")
return 0
finally:
stop_server.set()
if __name__ == "__main__":
sys.exit(main())