PgQueueListener:
- Dedicated asyncpg connection outside the SQLAlchemy pool (LISTEN
needs a persistent connection; pooled connections check in/out).
- Exposes wait_for_work(timeout) — resolves on NOTIFY or timeout,
whichever fires first. The worker treats both wakes identically.
- asyncpg_dsn_from_sqlalchemy_url strips the +asyncpg driver segment
and percent-decodes the password so the same URL in IX_POSTGRES_URL
works for both SQLAlchemy and raw asyncpg.
app.py lifespan now also spawns the listener alongside the worker;
both are gated on spawn_worker=True so REST-only tests stay fast.
2 new integration tests: NOTIFY path (wake within 2 s despite 60 s
poll) + missed-NOTIFY path (fallback poll recovers within 5 s). 33
integration tests total, 209 unit. Forgejo Actions trigger is flaky;
local verification is the gate.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Worker:
- Startup: sweep_orphans(now, max_running_seconds) rescues rows stuck
in 'running' from a crashed prior process.
- Loop: claim_next_pending → build pipeline via injected factory → run
→ mark_done/mark_error → deliver callback if set → record outcome.
- Non-IX exceptions from the pipeline collapse to IX_002_000 so callers
see a stable error code.
- Sleep loop uses a cancellable wait so the stop event reacts
immediately; the wait_for_work hook is ready for Task 3.6 to plug in
the LISTEN-driven event without the worker knowing about NOTIFY.
Callback:
- One-shot POST, 2xx → delivered, anything else (incl. connect/timeout
exceptions) → failed. No retries.
- Callback record never reverts the job's terminal state — GET /jobs/{id}
stays the authoritative fallback.
7 integration tests: happy path, pipeline-raise → error, callback 2xx,
callback 5xx, orphan sweep on startup, no-callback rows stay
callback_status=None (x2 via parametrize). Unit suite still 209.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Routes:
- POST /jobs: 201 on first insert, 200 on idempotent re-submit.
- GET /jobs/{id}: full Job envelope or 404.
- GET /jobs?client_id=&request_id=: correlation lookup or 404.
- GET /healthz: {postgres, ollama, ocr}; 200 iff all ok (degraded counts
as non-200 per spec). Postgres probe guarded by a 2 s wait_for.
- GET /metrics: pending/running counts + 24h done/error counters +
per-use-case avg seconds. Plain JSON, no Prometheus.
create_app(spawn_worker=bool) parameterises worker spawning so tests that
only need REST pass False. Worker spawn is tolerant of the loop module not
being importable yet (Task 3.5 fills it in).
Probes are a DI bundle — production wiring swaps them in at startup
(Chunk 4); tests inject canned ok/fail callables. Session factory is also
DI'd so tests can point at a per-loop engine and sidestep the async-pg
cross-loop future issue that bit the jobs_repo fixture.
9 new integration tests; unit suite unchanged. Forgejo Actions trigger is
flaky; local verification is the gate (unit + integration green locally).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
JobsRepo covers the full job-lifecycle surface:
- insert_pending: idempotent on (client_id, request_id) via ON CONFLICT
DO NOTHING + re-select; assigns a 16-hex ix_id.
- claim_next_pending: FOR UPDATE SKIP LOCKED so concurrent workers never
double-dispatch a row.
- get / get_by_correlation: hydrates JSONB back through Pydantic.
- mark_done: done iff response.error is None, else error.
- mark_error: explicit convenience wrapper.
- update_callback_status: delivered | failed (no status transition).
- sweep_orphans: time-based rescue of stuck running rows; attempts++.
Integration fixtures (tests/integration/conftest.py):
- Skip cleanly when neither IX_TEST_DATABASE_URL nor IX_POSTGRES_URL is
set (unit suite stays runnable on a bare laptop).
- Alembic upgrade/downgrade runs in a subprocess so its internal
asyncio.run() doesn't collide with pytest-asyncio's loop.
- Per-test engine + truncate so loops never cross and tests start clean.
15 integration tests against a live postgres:16, including SKIP LOCKED
concurrency + orphan sweep.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>