Your LLM makes the same mistake twice.
AUA makes sure it doesn't make it three times.
Most frameworks give you a model call. AUA gives you a control layer around that call β routing, scoring, correction, and policy enforcement that runs on every query and gets smarter over time.
Here's the problem this framework exists to solve. You deploy an LLM. It gives a wrong answer on Tuesday. You notice on Thursday. You add it to the system prompt on Friday. Next Tuesday β different user, same wrong answer. The prompt didn't stick, or the context window dropped it, or a slightly different phrasing triggered a different path. The error lives.
AUA closes that loop without waiting for a new model release:
Routes to the right specialist. Scores the response with a utility function. Injects prior verified corrections into the context so past mistakes don't repeat. Enforces your policy β blocking bad output and retrying before the user ever sees it.
Accumulates what the model consistently gets wrong. Tracks which sessions followed your policy perfectly. Exports those gold-standard sessions as DPO training pairs β ready to fine-tune the model so the corrections become permanent.
Write a Policy. Say what must never appear (BLOCKING). Say what you want to see rewarded (INFO, with an E-score bonus). The framework enforces it on every call, tracks adherence over every session, and uses your policy as a curriculum for the next fine-tuning cycle.
It's designed like Django β you get a working system in five commands, and you can customise routing thresholds, utility weights, arbiter behaviour, correction stores, model backends, hooks, middleware, and deployment policy without touching framework internals. The quickstart below takes ten minutes. Parts 10β12 show how to teach the framework what good output looks like and watch it improve over time.
5-minute quickstart
Mac / Apple Silicon prerequisites. The macbook tier uses Ollama to serve models locally. Install and start it before running aua serve:
brew install ollama ollama serve & # start in background ollama pull qwen2.5-coder:7b # ~4 GB β main coding specialist ollama pull qwen2.5:7b # ~4 GB β math specialist ollama pull qwen2.5:3b # ~2 GB β arbiter ollama list # confirm all three are present
aua doctor will detect a missing Ollama and tell you exactly what to install. aua serve does not install Ollama automatically.
Five commands. A live routing endpoint. No GPU required to start.
# 1. Install pip install adaptive-utility-agent # 2. Scaffold β Mac/CPU uses Ollama; swap --tier for GPU aua init my-aua-project --preset coding --tier macbook cd my-aua-project # 3. Validate setup aua doctor # 4. Start aua serve # 5. First query (new terminal) # Auth is disabled by default β the Authorization header is optional until you enable it curl -s -X POST http://localhost:8000/query \ -H "Content-Type: application/json" \ -d '{"query": "Write binary search in Python", "session_id": "qs-demo"}' \ | python3 -m json.tool
What is now running: A multi-specialist LLM router with utility scoring, contradiction detection, assertions store, rate limiting, structured logging, and a Prometheus metrics endpoint β all from one command. Read on to understand each piece.
Concepts The mental model ~10 min
Five minutes here saves an hour of confusion later. If you've built with FastAPI or Django, AUA will feel familiar β it's a config-driven framework with a request pipeline, swappable components, and an extension system. The difference: instead of routing HTTP paths to view functions, AUA routes queries to language models, scores what comes back, and learns from the result.
What happens on every query
ββββββββββββββββββββββββ the router process (aua serve) ββββββββββββββββββββββββ
β β
query βββΆ middleware.before_query β
β ββ rewrite / redact / reject the query β
βΌ β
field classifier βββΆ {"software_engineering": 0.92, "mathematics": 0.08} β
β ββ which domain is this? (swappable via plugin) β
βΌ β
routing decision β
β ββ top prob β₯ single_domain_threshold (0.75) ββΆ SINGLE: one specialist β
β ββ 2+ domains β₯ fanout_threshold (0.30) βββββββΆ FANOUT: race them, arbiter β
β β judges, VCG picks winner β
β ββ nothing confident ββββββββββββββββββββββββββΆ ARBITER: fallback model β
βΌ β
correction injection β
β ββ verified facts from past mistakes go into the system prompt β
βΌ β
specialist call(s) βββΆ Ollama / vLLM / your ModelBackend plugin β
βΌ β
utility scoring βββΆ U = w_eΒ·E + w_cΒ·C + w_kΒ·K (swappable via plugin) β
β ββ plus contradiction detection, policy assertions, retries β
βΌ β
middleware.after_response βββΆ hooks fire βββΆ audit log + metrics β
β β
ββββΆ response + full metadata (U score, domain, routing mode, trace IDs) βββΆ you
The vocabulary β eight terms carry the whole tutorial
| Term | What it is |
|---|---|
| Specialist | A model assigned to a domain. One entry in specialists: β a name, a model, an endpoint, and a field. "swe runs qwen2.5-coder:7b on Ollama and handles software_engineering." |
| Field | A domain with scoring rules attached: the utility weights, the minimum confidence to answer (c_min), and the penalty multiplier for being wrong. surgery demands 0.95 confidence and punishes errors 10Γ; creative_writing barely cares about confidence at all. 11 fields are built in (Part 2 lists them). |
| Router | The FastAPI process you start with aua serve. Owns classification, routing, scoring, corrections, persistence, and the whole REST API. |
| Arbiter | A small, cheap model with two jobs: judge between specialists when fanout routing races them, and catch queries nothing else is confident about. |
| Utility function | The score every response gets: U = w_eΒ·E + w_cΒ·C + w_kΒ·K. E (efficacy) β output quality: did it run, was it complete, did it earn policy bonuses. C (confidence) β correctness probability, updated by test signals and contradiction detection. K (curiosity) β a small exploration bonus for under-sampled specialists so routing doesn't tunnel-vision. The per-field weights are why a 0.6-confidence answer is fine for brainstorming and disqualifying for aviation. |
| Corrections | Verified facts stored when the system (or you) catches a mistake. Injected into future prompts in the same domain β this is the "doesn't make it three times" machinery, and Part 4 + How-to 18 cover its full lifecycle. |
| Blue-green | How model upgrades ship safely: the current model (BLUE) keeps serving while a candidate (GREEN) is scored on the same traffic; promotion happens only when GREEN's mean U beats BLUE by delta over T_min queries. |
| DPO export | The exit ramp: sessions that followed your policy perfectly become preference pairs (aua dpo export) so corrections can be fine-tuned into the model permanently. |
If you know Django, you know AUA
| Django | AUA | Notes |
|---|---|---|
settings.py | aua_config.yaml | One file, strictly validated β a typo'd key fails at startup with the list of valid keys, never silently. |
manage.py | aua CLI | aua init β startproject, aua doctor β check, aua serve β runserver. |
| Apps | Specialists | Units of capability you compose in config. |
| URL dispatcher | Field classifier | Decides who handles the request β and it's a plugin, so you can replace it (How-to 13). |
| Middleware | middleware: | Same idea, same ordering semantics: before_query top-down, after_response bottom-up. |
| Signals | Hooks | 11 named points (pre_query, on_correction, on_promotionβ¦) instead of post_save. |
| ORM backends | Plugins | Swap the classifier, scorer, correction store, or model backend by import path β no framework edits. |
| South/migrations | Blue-green deploys | Schema changes vs. model changes β both get a safe, reversible upgrade path. |
Your path through this tutorial. Parts 1β12 are sequential β each builds on the last and ends with the system visibly improving itself. How-to 13β18 are standalone recipes; jump straight to the one you need. Want the expert tour in one sitting? Do Parts 1β4, then How-to 13 (plugin system), then How-to 18 (operations toolkit). Everything below was verified against a live router β every command shown is runnable as written.
Part 1 Install & scaffold ~8 min
1.1 Python version
Python 3.10, 3.11, or 3.12 required. If you use pyenv: pyenv local 3.11.10 before installing, or aua may not appear in your PATH.
# Runtime only (Ollama / CPU) pip install adaptive-utility-agent # GPU backend (Linux + CUDA required) pip install "adaptive-utility-agent[vllm]" # Dev tools β tests, linting, type checks pip install "adaptive-utility-agent[dev]" # Verify aua --version aua, version 1.0.0
1.2 Scaffold a project
Pick the tier that matches your hardware. Pick the preset that matches your domain. Together they set models, field weights, routing thresholds, and observability defaults.
| Tier | Hardware | Backend | Notes |
|---|---|---|---|
macbook | Mac M-series / Intel | Ollama | Best starting point |
single-4090 | 1Γ RTX 4090 24 GB | vLLM AWQ | Production-grade |
quad-4090 | 4Γ RTX 4090 | vLLM AWQ | One GPU per specialist |
a100-cluster | 1Γ A100 80 GB | vLLM fp16 | Highest accuracy |
| Preset | Fields configured | Use for |
|---|---|---|
coding | software_engineering | Code generation, dev tools |
math | mathematics | Proofs, computation |
research | general, mathematics | Research assistance |
medical-safe | medicine (c_min=0.95) | Medical Q&A with abstention |
legal-safe | law (c_min=0.85) | Legal Q&A with abstention |
generalist | software_engineering, mathematics, general | Multi-domain assistant |
aua init my-aua-project --preset coding --tier macbook
cd my-aua-project
# See what init created
aua config expand
1.3 Validate and start
Auth behavior. By default, auth is disabled β the Authorization header is optional and all endpoints are open. This is fine for local development. To enable auth:
aua token create --scope aua:admin --expires 30d export AUA_TOKEN="aua_tk_..." # then include in curl: -H "Authorization: Bearer $AUA_TOKEN"
Examples throughout this tutorial show Authorization: Bearer $AUA_TOKEN. On a local dev install without auth enabled, you can omit that header entirely.
aua doctor # PASS / FAIL / WARN per check, with fixes aua doctor --strict # warnings as failures β use in CI aua doctor --json # machine-readable output aua serve # start specialists + router + arbiter aua serve --with-ui # also start Chat UI at :3001 (see note below) aua serve --dry-run # print commands without executing
What you can build with this
- A working multi-model AI system running locally in under ten minutes β one command to scaffold, one to serve.
- A project that's ready to customise: config, eval folder, and .gitignore all in place.
- A pre-flight check (
aua doctor) you can drop into CI to catch config problems before they reach production.
Part 2 shows how to wire in any model β from a frontier API to a 1.5B model on a laptop β and tell the framework what you want it to optimize for.
Part 2 Models & fields ~12 min
2.1 What's in aua_config.yaml
aua: version: "0.5" # version field generated by aua init β do not edit manually mode: local backend: ollama specialists: - name: swe model: qwen-coder-7b-awq # registry alias β full model ID port: 11434 field: software_engineering gpu: 0 arbiter: model: qwen2.5:3b port: 11434 router: port: 8000 single_domain_threshold: 0.75 fanout_threshold: 0.30 security: cors_origins: ["http://localhost:3001"] # Chat UI port; Grafana is on :3000 state: backend: sqlite path: .aua/state/aua.db
2.2 Model registry β inspect aliases
aua models list Role Name Model Field Port Status specialist swe qwen2.5-coder:7b software_engineering 11434 ready arbiter β qwen2.5:3b β 11434 ready # Status comes from a live check against the model server β "not pulled" # means Ollama is up but the tag isn't downloaded yet aua models inspect qwen-coder-7b-awq # expand any registry alias to its full ID + requirements
2.3 Field registry β weights and thresholds
aua fields list aua fields inspect software_engineering
| Field | w_e | w_c | w_k | c_min | Penalty |
|---|---|---|---|---|---|
surgery | 0.20 | 0.70 | 0.10 | 0.95 | 10Γ |
aviation | 0.20 | 0.70 | 0.10 | 0.95 | 10Γ |
law | 0.30 | 0.60 | 0.10 | 0.85 | 5Γ |
mathematics | 0.50 | 0.40 | 0.10 | 0.75 | 3Γ |
software_engineering | 0.55 | 0.35 | 0.10 | 0.70 | 2Γ |
creative_writing | 0.80 | 0.05 | 0.15 | 0.05 | 1Γ |
2.4 Bring your own model β end to end
The registry aliases are conveniences, not a gate. For the Ollama backend, model: takes any Ollama tag directly β if ollama run accepts it, AUA can serve it. Here's the complete loop for adding a reasoning specialist that isn't in any registry:
ollama pull deepseek-r1:8b # any tag from ollama.com/library, or your own Modelfile build ollama list # confirm the tag is present β this exact string goes in config
specialists: - name: swe # existing entry stays model: qwen2.5-coder:7b port: 11434 field: software_engineering gpu: 0 - name: reasoning # NEW β your model model: deepseek-r1:8b # the raw Ollama tag, exactly as `ollama list` shows it port: 11434 # all Ollama models share the one Ollama server port field: mathematics # must be one of the 11 built-in fields (see 2.5) gpu: 0
aua config validate # typo'd keys or an unknown field fail here, not at runtime aua doctor # confirms Ollama is up and the tag is pulled aua serve # Force-route to the new specialist to test it in isolation: curl -s -X POST localhost:8000/query -H "Content-Type: application/json" \ -d '{"query": "Prove there are infinitely many primes.", "force_domain": "mathematics"}' \ | python3 -m json.tool # Then drop force_domain and confirm the classifier routes math queries there naturally β # the response's "primary_domain" and "specialist_responses" show who answered.
Three notes that save debugging time. One port, many models: Ollama serves every pulled model from :11434; each specialist's model: tag selects which one per request β so adding specialists doesn't mean adding servers. vLLM differs: with backend: vllm, each specialist is its own server process on its own port, and model: takes a HuggingFace ID (e.g. Qwen/Qwen2.5-Coder-7B-Instruct-AWQ) β aua models inspect shows the full ID behind each alias. Frontier APIs: to route to OpenAI/Anthropic-style endpoints, write a ModelBackend plugin (How-to 13, interface 6) β about twenty lines.
2.5 The fields you can target β and custom taxonomies
Fields are built into the framework because each carries calibrated scoring weights. The full list of 11 β every specialist's field: must be one of these, and aua config validate enforces it:
art creative_writing education general law mathematics software_engineering stem_research structural_engineering surgery aviation
"But my domains are insurance-claims and customer-support." You don't need new fields β you need a custom classifier that maps your taxonomy onto the built-in scoring profiles. Pick the field whose risk profile matches each of your domains (claims adjudication scores like law; support chat scores like general), then write a ten-line FieldClassifierPlugin (How-to 13, interface 1) that recognizes your queries and returns those fields. Your routing logic is fully yours; the scoring calibration stays sound. For dynamic taxonomies, How-to 18.8 shows how the domain ontology grows sub-domains under these roots automatically from production traffic.
What you can build with this
- Swap any model in or out without changing application code β frontier API, 7B local, or a tiny 1.5B model for fast low-stakes queries.
- Tell the framework what matters for your domain: accuracy (
w_c), output quality (w_e), or exploration (w_k). - Set how fast domain knowledge decays β fast for security practices, slow for physics principles β so the system stays calibrated over time.
- Turn off exploration entirely for safety-critical domains so routing is always consistent and predictable.
Part 3 shows how the routing decision actually gets made β and gives you the knobs to control how aggressively the system compares specialists.
Part 3 Routing & utility ~15 min
3.1 The routing pipeline
Every query follows this path: middleware β session lookup β correction retrieval β field classifier β routing decision β specialist calls β utility scoring β arbiter (if needed) β hooks β response.
| Mode | Trigger | What happens |
|---|---|---|
| single | One field above single_domain_threshold | One specialist call, utility scored |
| fanout | Two+ fields above fanout_threshold | All qualifying specialists called; best U wins |
| arbiter | Fanout returned contradictory answers | Arbiter resolves; correction stored; both models updated |
3.2 The utility function
Every candidate response gets a single utility score before it reaches the user. The score combines three things:
- How useful the answer appears β does it correctly address the query for this domain?
- How consistent it is with prior verified knowledge β does it contradict things the system already knows?
- Whether exploring this area is valuable β is this a domain where the system has low confidence and should weight novelty?
In practice, you rarely touch the formula directly. The defaults work well for most domains. You tune it when you want stricter answer quality (raise w_e), more caution (raise w_c), or more exploration (raise w_k).
The formal expression:
U = w_e(f)Β·E + w_c(f)Β·C + w_k(f)Β·K
- E (Efficacy) β Mann-Whitney dominance probability over prior outputs [0, 1]
- C (Confidence) β Kalman-filtered internal consistency, penalized per contradiction [0, 1]
- K (Curiosity) β UCB exploration bonus for novel domains [capped at 50% of U]
3.3 A full query response
curl -s -X POST http://localhost:8000/query \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $AUA_TOKEN" \
-d '{
"query": "Write binary search in Python. State the time complexity.",
"session_id": "demo-session"
}' | python3 -m json.tool
{
"session_id": "demo-session",
"trace_id": "01HXYZ...",
"request_id": "req_abc123",
"routing_mode": "single",
"primary_field": "software_engineering",
"response": "...",
"u_score": 0.641,
"confidence": 0.76,
"contradictions_detected": 0,
"corrections_injected": 1,
"latency_ms": 287.4,
"cost_estimate_usd": 0.00012
}
3.4 Live status and U scores
aua status # auto-refreshing terminal UI aua status --once # single snapshot aua status --json # machine-readable curl http://localhost:8000/status | python3 -m json.tool
What you can build with this
- A system that routes each question to the right specialist, scores every answer with a real number, and shows you exactly why β not vibes.
- Tune how aggressively the system compares multiple specialists vs. committing fast to a single one.
- Force a specific specialist for known query types β useful for dedicated deployments where you already know the domain.
- Read U scores in API responses to build your own routing analytics β low scores on a domain are an early signal the specialist needs attention.
Part 4 adds persistent memory: the framework learns from its mistakes and stops making the same error twice.
Part 4 Arbiter & corrections ~15 min
When fanout routing produces contradictory responses, the Arbiter runs four checks, issues a verdict, injects correction signals, and stores the verified claim in the assertions store.
4.1 The four arbitration checks
| Check | Weight | What it detects |
|---|---|---|
| Logical | 0.30 | Output contradicts its own premises |
| Mathematical | 0.40 | Complexity or numerical claims provably wrong |
| Cross-session | 0.20 | Contradicts a prior verified assertion |
| Empirical | 0.10 | External ground truth check β SymPy (maths), arXiv (SWE/STEM), PubMed (medicine/surgery) |
ArbiterAgent is the live arbitration path. When two specialists disagree (fanout routing), AUA runs all four checks via the ArbiterAgent pipeline β logical, mathematical, cross-session, and empirical. The empirical check (SymPy, arXiv, PubMed) fires automatically based on domain; no configuration required.
Simplified LLM-only arbitration is available for scenarios where you want lower latency and don't need the formal checks β creative domains where all four checks are always inconclusive, or deployments where p99 latency < 500ms matters more than structured evidence chains. Switch with one config line:
router: arbitration_mode: "llm" # simplified: one LLM call, VERDICT: A/B/BOTH_WRONG # default: "pairwise" (uses ArbiterAgent)
Or switch at runtime without restarting: PATCH /config with body {"arbitration_mode": "llm"}. The LLM path is also the automatic fallback if ArbiterAgent raises an exception.
4.2 The four verdict cases
| Case | Meaning | Action |
|---|---|---|
| Case 1 | A correct, B wrong | Correct B, reinforce A, store claim |
| Case 2 | B correct, A wrong | Correct A, reinforce B, store claim |
| Case 3 | Both wrong | Correct both + open curiosity gap bonus |
| Case 4 | Inconclusive | Flag for external escalation, hedge response |
4.3 Using the Arbiter directly
# ArbiterAgent runs automatically inside aua serve.
# Use directly in Python for testing or custom workflows:
from aua import ArbiterAgent, AssertionsStore
store = AssertionsStore()
arbiter = ArbiterAgent(store)
verdict = arbiter.arbitrate( # sync β call directly
subject="bubble_sort_complexity",
domain="software_engineering",
output_A="Bubble sort is O(n) average case.",
output_B="Bubble sort is O(nΒ²) average case.",
field_penalty_multiplier=2.0,
)
print(verdict.case.value) # "case_1"
print(verdict.verified_claim) # "Bubble sort is O(nΒ²) average case."
print(verdict.external_response) # safe to return to user
4.4 The assertions store β decay classes
| Class | Decay | Used for |
|---|---|---|
| A | Never | Mathematical proofs, algorithm complexity |
| B | 10 years | Classical physics, structural engineering |
| C | 3 years | Medicine, law, architecture |
| D | 6 months | Security CVEs, clinical guidelines, ML benchmarks |
4.5 Manual corrections via REST
curl -X POST http://localhost:8000/corrections \
-H "Authorization: Bearer $AUA_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"subject": "heapsort_complexity",
"domain": "software_engineering",
"claim": "Heapsort is O(n log n) worst-case, O(1) extra space.",
"confidence": 0.99
}'
Corrections are global in v1.0. A correction stored via POST /corrections is a verified fact about the world β it is injected into every future query on that subject, across all users and sessions. For internal tools, dedicated agents, and single-tenant deployments this is the intended behaviour. For multi-tenant products where users need isolated correction contexts, per-user scoping is a v1.1 roadmap item.
4.6 VCG welfare maximization β picking the best specialist
The default arbitration mode runs the 4-check arbiter in full verdict mode between two specialists. VCG mode replaces pairwise selection with welfare maximization: after all specialists in the fanout respond, the router computes a welfare score for each one and picks the highest β regardless of how many specialists are competing. The 4-check arbiter still runs, but only for contradiction detection and DPO pair accumulation; it does not override the VCG winner.
This is the mechanism tested in the RTX 4090 hardware pilot (Appendix A.1): Arm D (VCG routing) vs Arm A (no routing) showed a +43.4 pp correctness gain (95 % CI [19.5, 61.2] pp; Fisher exact p = 0.0009; Cohen's d = 1.02; n = 30/arm). The direction of the effect is robust; the interval is wide at this sample size, so treat it as a strong directional signal rather than a precise point estimate.
How the welfare score is computed
The formula has two layers. The outer layer is a multi-domain convex combination β because a query can straddle domains, the welfare score integrates over the classifier's probability distribution rather than committing to a single domain:
W_i(q) = Ξ£_j p(j|q) Β· effective_u(i, j)
i β specialist (e.g. "swe", "math")
j β domain (e.g. "software_engineering", "mathematics")
p(j|q) β field classifier probability for domain j on query q
only domains with p(j|q) β₯ 0.05 contribute (below that
the term is negligible and the DB lookup is wasteful)
effective_u(i,j) β shrinkage-corrected win-rate (see below)
Winner β argmax W_i(q) across all specialists in the fanout
Tie-breaking β raw confidence returned by the specialist, then P(top domain)
The inner layer is the shrinkage-corrected win-rate (EfronβMorris estimator, Lemma B.8.1). When a specialist has seen many queries in domain j the raw win-rate is reliable; when it has seen only a few, the estimate is pulled toward the global cross-domain prior to prevent early flukes from dominating:
effective_u(i, j) = (n_ij Β· Γ»_ij + N_cliff Β· Ε«) / (n_ij + N_cliff) n_ij β number of past queries where specialist i answered in domain j Γ»_ij β raw win-rate: fraction of those queries where i was the VCG winner N_cliff β Efron-Morris pseudo-count (default: 10) Ε« β global cross-domain prior (default: 0.65) Cold start (n_ij = 0): effective_u = 0.65 β pure prior Converging (n_ij = 10): effective_u β halfway between Γ» and 0.65 Converged (n_ij β β): effective_u β Γ» β pure observed win-rate Data source: model_runs table (vcg_winner flag, specialist and domain columns). Falls back to Ε« = 0.65 if the DB lookup fails.
Why shrinkage? Without it, a specialist that wins its very first query in a new domain gets effective_u = 1.0 β instantly beating every established specialist. The Efron-Morris estimator prevents this: N_cliff = 10 means a new specialist needs roughly 10 wins before its estimate clearly separates from the prior. The prior Ε« = 0.65 matches the empirical cross-domain baseline from the hardware pilot (Arm B matched-routing accuracy).
A worked example
Query: "Is this gradient descent implementation correct? It claims O(n) but loops over all weights." The classifier splits probability mass across two domains.
p("software_engineering" | q) = 0.72
p("mathematics" | q) = 0.28
(all other domains below 0.05 threshold β skipped)
swe specialist math specialist
βββββββββββββββββ βββββββββββββββββ
software_engineering n=18, wins=14 n=3, wins=2
Γ» = 14/18 = 0.778 Γ» = 2/3 = 0.667
effective_u = (18Β·0.778 + 10Β·0.65) (3Β·0.667 + 10Β·0.65)
ββββββββββββββββββββ = βββββββββββββββββββ = 0.711
18 + 10 3 + 10
= 0.732
mathematics n=2, wins=1 n=31, wins=24
Γ» = 1/2 = 0.500 Γ» = 24/31 = 0.774
effective_u = (2Β·0.500 + 10Β·0.65) (31Β·0.774 + 10Β·0.65)
ββββββββββββββββββββ = βββββββββββββββββββ = 0.759
2 + 10 31 + 10
= 0.625
W_swe = 0.72 Β· 0.732 + 0.28 Β· 0.625 = 0.527 + 0.175 = 0.702 W_math = 0.72 Β· 0.711 + 0.28 Β· 0.759 = 0.512 + 0.213 = 0.725 Winner: math specialist (W = 0.725 > 0.702) The classifier slightly favoured swe (72 % probability), but the math specialist's stronger domain-specific track record (31 wins vs 18) overcame the classifier advantage. This is the key property VCG mode adds over "always route to the most probable domain".
How the prior builds up over time
Every VCG query writes one row per specialist to the model_runs table with vcg_winner (boolean), vcg_welfare_score, specialist, and domain. The next query reads those rows to compute effective_u. There is no separate training phase β the prior accumulates automatically from production traffic.
{
"run_id": "uuid-...",
"specialist": "math",
"domain": "mathematics",
"round": "answer",
"vcg_winner": true, β used to compute Γ»_ij on future queries
"vcg_welfare_score": 0.725, β W_i for this query
"confidence_score": 0.81,
"utility_score": 0.744,
"conversation_id": "uuid-...",
"latency_ms": 312.4
}
After about 10β15 queries per specialist-domain pair (the N_cliff = 10 convergence point) the effective_u estimates stabilise and VCG routing becomes meaningfully better than the cold-start prior. In the hardware pilot this threshold was crossed within the first 30 queries per arm.
Reading the prior from the analytics API
# Current snapshot: win rates and average welfare per specialist curl http://localhost:8000/analytics/specialists { "specialists": { "swe": { "total_runs": 42, "vcg_wins": 31, "win_rate": 0.738, "avg_welfare_score": 0.701, "avg_utility_score": 0.731 }, "math": { "total_runs": 38, "vcg_wins": 27, "win_rate": 0.711, "avg_welfare_score": 0.724, "avg_utility_score": 0.748 } } } # Welfare trajectory over time (useful for spotting drift) curl http://localhost:8000/analytics/welfare-trajectory
import requests
r = requests.get("http://localhost:8000/analytics/specialists")
stats = r.json()["specialists"]
N_CLIFF = 10 # effective_u convergence threshold
print(f"{'Specialist':<12} {'Runs':>6} {'Win%':>6} {'Avg W':>7} {'Status':>12}")
print("-" * 48)
for name, s in sorted(stats.items()):
status = "converged" if s["total_runs"] >= N_CLIFF else "warming up"
print(
f"{name:<12} {s['total_runs']:>6} "
f"{s['win_rate']*100:>5.1f}% {s['avg_welfare_score']:>7.4f} "
f"{status:>12}"
)
# After 40+ queries per specialist:
# Specialist Runs Win% Avg W Status
# ------------------------------------------------
# math 38 71.1% 0.7240 converged
# swe 42 73.8% 0.7010 converged
Activating VCG
router:
arbitration_mode: vcg # "pairwise" (default) | "vcg"
aua serve --arbitration-mode vcg
# Enable VCG curl -X PATCH http://localhost:8000/config \ -H "Content-Type: application/json" \ -d '{"arbitration_mode": "vcg", "persist": true}' {"patched": {"arbitration_mode": "vcg"}, "persisted": true} # Revert to pairwise curl -X PATCH http://localhost:8000/config \ -d '{"arbitration_mode": "pairwise", "persist": true}'
What changes in the response envelope
curl -s -X POST http://localhost:8000/query \
-d '{"query": "Is gradient descent O(n) per step?"}'
{
"routing_mode": "vcg", β "fanout" in pairwise mode
"primary_domain": "mathematics",
"response": "Per-step complexity is O(n) where n = number of params...",
"u_score": 0.744,
"welfare_scores": { β present only in VCG mode
"math": 0.7250, β winner
"swe": 0.7020
}
}
r = requests.post("http://localhost:8000/query", json={
"query": "Is the gradient descent update O(n) per step?"
})
data = r.json()
if data["routing_mode"] == "vcg":
scores = data["welfare_scores"]
winner = max(scores, key=scores.get)
print(f"VCG selected: {winner}")
for name, w in sorted(scores.items(), key=lambda x: -x[1]):
marker = " β winner" if name == winner else ""
print(f" {name}: W = {w:.4f}{marker}")
When to use VCG vs pairwise
| Scenario | Recommended mode | Why |
|---|---|---|
| Single specialist, queries rarely trigger fanout | Pairwise (default) | VCG fires only when fanout does; no benefit from switching |
| 2β3 specialists, traffic just starting | Pairwise first, then VCG | Wait until ~10 queries per specialist per domain; switch via REST without restart |
| 2+ specialists with 10+ queries of history | VCG | Shrinkage estimates are reliable; track record differentiates specialists the classifier treats as equally likely |
| 3+ specialists, overlapping domains | VCG | Multi-domain convex combination handles ambiguous queries; pairwise can only compare two |
| Safety-critical domain, need full 4-check verdict | Pairwise | Pairwise runs the arbiter in full verdict mode (case_1β4); VCG uses it for contradiction detection only |
Gradual rollout pattern. Start with pairwise to accumulate model_runs history. After ~50 queries across your specialists, switch to VCG via PATCH /config β no restart, no disruption. Watch /analytics/specialists for win-rate divergence; once one specialist is clearly ahead, VCG's welfare scores will reflect that on every query.
What VCG does not do. VCG selects the best specialist from those that responded β it does not re-route if all specialists perform poorly. If every welfare score is near the prior (0.65), the specialists haven't accumulated enough history yet; check total_runs in /analytics/specialists. VCG also does not guarantee the incentive-theoretic properties of Theorems S1βS3 in deployment β those hold for the idealized mechanism where specialists report explicit utility bids; here welfare is inferred from historical win-rates (see Appendix A.1.2 and Β§B.8 remark).
What you can build with this
- Self-improving routing. Every VCG query makes the next one slightly better. A specialist with a bad run accumulates losses in model_runs; its effective_u falls and the router naturally routes away from it without any manual intervention.
- Interpretable decisions. Every response includes per-specialist welfare scores. Log them and you get a complete audit trail of exactly why each query went where it did.
- Graceful cold starts. The shrinkage prior (0.65) means a brand-new specialist competes at a fair baseline β not zero. It won't dominate until it earns it, but it won't be shut out on day one either.
- Hot A/B testing. Run pairwise on one process and VCG on another; compare U scores in their respective
/analyticsendpoints before committing to either mode in production.
Part 5 shows how U scores gate model promotion β the model_runs welfare trajectory VCG builds is an input to the blue-green promotion decision.
Part 5 Blue-green deployment ~20 min
BLUE is in production. GREEN is the candidate evaluated via shadow mode or a synthetic eval run. When GREEN's U score exceeds BLUE by delta and at least T_min queries have been evaluated, the promotion gate opens. Promotion is triggered manually via POST /deploy/green or through shadow mode accumulation β see How-to 20 for the full workflow.
5.1 Promotion thresholds
blue_green:
swe:
delta: 0.025 # GREEN must beat BLUE by at least +2.5% U
T_min: 10 # minimum queries before promotion is considered (gate)
router:
tau: 1.0 # routing softmax temperature: 1.0=off, <1=sharper, >1=softer
T_min is a minimum-sample gate. If you trigger POST /deploy/green before accumulating T_min shadow queries, the router returns promoted: false with a "PROMOTION DEFERRED β T_min gate" message. Accumulate more shadow traffic first.
tau (in the router: block, not blue_green:) is a softmax temperature applied to the field classifier's probability distribution before routing thresholds are checked:
- tau < 1.0 β sharpens routing: the highest-probability domain gets boosted, making single-specialist routing more likely
- tau > 1.0 β softens routing: probabilities spread more evenly, increasing fanout and arbiter traffic
- tau = 1.0 β no effect (default)
5.2 Promote and rollback
# Trigger blue-green promotion check via REST API curl -X POST http://localhost:8000/deploy/green \ -H "Content-Type: application/json" \ -H "Authorization: Bearer $AUA_TOKEN" \ -d '{"specialist": "swe", "new_model": "qwen2.5-coder:14b"}' # Check status aua status --once # Roll back to previous BLUE aua rollback --specialist swe aua rollback --specialist swe --yes # skip confirmation aua rollback --no-restart # update config only, no restart aua rollback --all --yes # roll back all specialists
5.3 Using BlueGreenDeployment in Python
from aua import BlueGreenDeployment
from aua.config import load_config
config = load_config("aua_config.yaml")
bg = BlueGreenDeployment(config, specialist_name="swe")
bg.register_green("models/swe-green-v2/")
import asyncio
summary = asyncio.run(bg.evaluate(n_queries=10)) # async β use asyncio.run() from sync context
print(f"GREEN U mean: {summary.green_u_mean:.3f}")
print(f"BLUE U mean: {summary.blue_u_mean:.3f}")
if bg.should_promote(summary):
bg.promote()
print("GREEN promoted to BLUE")
5.4 The promotions log
Every promotion is recorded atomically to .aua/state/promotions.jsonl with a UUID, timestamp, and U scores. File-locked to prevent concurrent corruption.
What you can build with this
- Upgrade your AI models the way you upgrade software β safely, with a rollback button and a promotion gate.
- Test a new model on real traffic before it touches production: deploy as GREEN, evaluate, promote only if U score delta passes the threshold.
- Revert a bad upgrade in one command β no redeployment, no config archaeology.
- Set different promotion thresholds per specialist β conservative for customer-facing models, aggressive for internal experimental ones.
Part 6 makes the whole system easier to operate β config changes that take effect in seconds without restarting anything.
Part 6 Config system ~15 min
6.1 Config commands
aua config validate # strict schema check β catches typos, dupe ports, bad ranges aua config expand # full resolved config with all defaults filled in (secrets redacted) aua presets list # list all built-in presets aua presets inspect coding # full preset config
6.2 Hot reload β no restart needed
Not all config changes take effect without restarting. The rule:
| Hot-reloadable (aua config reload) | Requires restart (aua serve) |
|---|---|
| Routing thresholds | Model path or name |
| Utility weights | Specialist port |
| Logging level | Backend (vllm β ollama) |
| CORS origins | GPU assignment |
| Rate limits | mTLS certificate paths |
| Arbiter thresholds | New specialist added/removed |
aua config reload # sends SIGHUP to running router kill -HUP $(cat .aua/pids/router.pid) # same effect
| Hot-reloadable (no restart) | Requires restart |
|---|---|
| routing thresholds | model name or path |
| promotion delta / T_min / tau | specialist port |
| logging level, rate limits | GPU assignment |
| cors_origins | backend (vllm β ollama) |
6.3 Config versioning and migration
# Config versioning: the aua.version field tracks schema compatibility. # v1.0 does not provide automatic migration β update config manually if upgrading. aua config validate # validates your config against the current schema aua config expand # shows full resolved config with all defaults applied
What you can build with this
- Change routing thresholds, utility weights, and CORS settings on a live system β no downtime, no restarts.
- Version your config in git and use
aua config validateas a pre-commit hook so schema errors never reach production. - See exactly what the running system is doing with
aua config expandβ no surprises from implicit defaults. - A config-driven system your whole team can modify safely, not one that lives in a single engineer's head.
Part 7 shows how every mistake the system makes in production automatically becomes training material for the next version.
Part 7 Correction loop & DPO export ~15 min
Every contradiction the Arbiter resolves produces a DPO training pair. The correction loop accumulates these pairs and exports them for fine-tuning your specialists.
7.1 Export corrections via CLI
# Export all verified corrections as JSONL aua corrections export --format jsonl # Export as preference pairs for DPO training aua dpo export --format preference-pairs # With redaction (remove PII from prompts) aua dpo export --format preference-pairs --redact
7.2 DPO pair format
v1.0 DPO pair status. In v1.0, DPO pairs are generated from corrections with a confirmed chosen answer. The rejected side is populated when the Arbiter identifies a clearly wrong response; for corrections injected manually (e.g. via POST /corrections), the rejected field is empty and must be filled before training. Case 4 (inconclusive arbiter outcomes) never produces a pair. Full chosen+rejected pair generation is a v1.1 item.
{
"query": "bubble_sort_complexity",
"chosen": "Bubble sort is O(nΒ²) average case.",
"rejected": "Bubble sort is O(n) average case.",
"field": "software_engineering",
"utility_chosen": 0.72,
"utility_rejected": 0.41,
"correction_ids": ["corr_abc123"],
"trace_id": "01HXYZ..."
}
7.3 Using CorrectionLoop in Python
import asyncio
from aua import CorrectionLoop
from aua.config import load_config
config = load_config("aua_config.yaml")
loop = CorrectionLoop(config, router_url="http://localhost:8000")
async def main():
pairs = await loop.collect_pairs(min_confidence=0.8)
print(f"Collected {len(pairs)} pairs")
summary = loop.export_pairs(pairs, output_dir="dpo_pairs")
print(f"Exported to: {summary.output_path}")
asyncio.run(main())
7.4 Using field penalty weights in training
from aua import FIELD_CONFIGS
for pair in pairs:
cfg = FIELD_CONFIGS.get(pair.field, FIELD_CONFIGS["general"])
loss_weight = cfg.penalty_multiplier # 2Γ for SWE, 10Γ for surgery
# pass loss_weight to your DPO trainer's per-sample weight
What you can build with this
- Every mistake your AI makes in production is automatically becoming training data for the next version β without manual labelling.
- A fine-tuning dataset built from real traffic: the things your actual users asked, and what the right answer was.
- Domain-filtered DPO pairs so coding corrections train the coding specialist and don't pollute math training data.
- A closed loop: production error β correction stored β DPO pair exported β model fine-tuned β mistake doesn't recur.
Part 8 adds a quality gate β catch regressions automatically before a new model ever reaches users.
Part 8 Eval harness ~20 min
The eval harness routes YAML test datasets through the live framework, scores outputs with the utility function, detects regressions, and produces structured JSON reports. It's the gate for blue-green promotion and CI.
8.1 Built-in smoke datasets
ls evals/ coding_smoke.yaml math_smoke.yaml routing_smoke.yaml correction_smoke.yaml arbiter_smoke.yaml safety_smoke.yaml # Run the coding smoke suite aua eval run --dataset evals/coding_smoke.yaml --config aua_config.yaml # View the report aua eval report .aua/evals/latest.json # Compare blue vs green aua eval compare --baseline blue --candidate green
8.2 Dataset format
Property checks run against the response text. Supported check types in v1.0:
| Property key | Value | What it checks |
|---|---|---|
contains | string | Case-insensitive substring match |
contains_any | [string, ...] | At least one substring present |
not_contains | string | Substring must NOT appear |
min_length | int | Response character count β₯ N |
expected_domain | string | Routing domain must equal this |
expected_domain_any | [string, ...] | Routing domain must be one of these |
Regex, LLM-judge, and custom Python validators are not supported in v1.0.
name: coding_smoke
field: software_engineering
cases:
- id: binary_search
prompt: "Implement binary search in Python. State time complexity."
expected_properties:
- "O(log n)"
- "def binary_search"
- correctness: true
- id: bubble_sort_complexity
prompt: "What is the average-case time complexity of bubble sort?"
expected_properties:
- "O(nΒ²)"
8.3 Eval report
aua eval report .aua/evals/latest.json
Eval run: coding_smoke 2026-05-11T14:30:22Z
Cases: 8 total Β· 7 passed Β· 1 failed
U mean: 0.638 (baseline: 0.601) β² +6.2%
Regressions: 0
FAILED:
merge_sort_stability β expected "stable" in response
U score: 0.41 (threshold: 0.45)
8.4 CI integration
- name: Run AUA eval
run: |
aua eval run \
--dataset evals/coding_smoke.yaml \
--config aua_config.yaml \
--json > .aua/evals/ci_result.json
# check exit code: 0 = pass, 1 = failure
What you can build with this
- A quality gate that catches AI regressions the same way unit tests catch code bugs β automatically, on every model change.
- Promote new models with a number, not a feeling:
aua eval comparegives you a quantitative diff between baseline and candidate. - Custom eval datasets for your domain β not generic benchmarks, but the exact questions and quality criteria that matter to your users.
- CI integration so any model change that causes a quality drop fails the pipeline before it touches anyone.
Part 9 gives you a full UI to demo all of this β a private ChatGPT-like product backed entirely by your own models.
Part 9 Chat UI ~15 min
AUA ships a Next.js 14 Chat UI at apps/aua_chat/. It requires Node.js 18+ and runs as a separate process from the AUA router.
9.0 Prerequisites
node --version # must be 18+ npm --version # Install Node.js if missing: brew install node # macOS # or download from https://nodejs.org
9.1 Starting the full stack
Package user vs. repo contributor. aua init does not scaffold a Chat UI β the UI lives in the AUA source repo. Package users launch it through the CLI; repo contributors can run the Next.js dev server directly.
Package user β CLI launch (recommended)
Open two terminals:
cd my-aua-project aua serve --tier macbook # Mac / Apple Silicon + Ollama # aua serve # Linux / RTX 4090 + vLLM
aua ui # starts on http://localhost:3001 # Or combined: aua serve --tier macbook --with-ui
Repo contributor β Next.js dev server
If you have cloned the source repo and want to edit the UI:
cd Adaptive-Utility-Agent/apps/aua_chat npm install # first run only npm run dev # starts on http://localhost:3001
Open http://localhost:3001 β sign in with admin / aua-admin.
Local development credentials only. The default admin / aua-admin credentials are for local use. Change them via the AUA_UI_ADMIN_PASSWORD environment variable before exposing the UI beyond localhost. In production, disable the dev login and use token-based auth instead.
Note on aua serve --with-ui. This flag attempts to start the Chat UI automatically in the background. It works when npm is on your system PATH (standard Linux/Docker installs). On macOS with nvm or homebrew, node may not be on the PATH that background processes see, causing the UI to silently fail to start. If you see no Chat UI after --with-ui, use the two-terminal approach above β it always works. The UI log is at .aua/logs/ui.log if you want to diagnose the background start.
9.2 Three-zone layout
| Zone | Contents |
|---|---|
| Left β Session sidebar | All sessions, search, new session |
| Center β Chat window | Messages, streaming responses, send bar |
| Right β Framework Debugger | Routing decision, utility breakdown, arbiter output, latency, cost, trace link |
9.3 AUA Controls drawer
Click AUA Controls (left edge of the screen) to open the configuration drawer. Change routing thresholds, utility weights, arbiter policy, corrections, blue-green status, and observability settings β all without restarting. Uses aua config reload under the hood.
9.4 Chat Session API
# Create a session curl -X POST http://localhost:8000/sessions \ -H "Authorization: Bearer $AUA_TOKEN" \ -H "Content-Type: application/json" \ -d '{"name": "my-coding-session"}' # Post a message (streaming) curl -X POST http://localhost:8000/sessions/{id}/stream \ -H "Authorization: Bearer $AUA_TOKEN" \ -H "Content-Type: application/json" \ -d '{"query": "Explain quicksort"}' # List all sessions curl http://localhost:8000/sessions \ -H "Authorization: Bearer $AUA_TOKEN"
9.5 SSE streaming event types
| Event | When fired |
|---|---|
route | Routing decision made β field, mode, specialists |
specialist_start | Specialist call begins |
chunk | Each token streamed from specialist |
specialist_done | U score, latency for this specialist |
arbiter_done | Verdict case, corrections stored |
done | Full response + metadata |
error | AUA_* error code + trace ID |
Framework Debugger tip: Every query in the UI shows the full routing trace β which specialist was called, intermediate U scores, whether the Arbiter fired, and a link to the OTEL trace in Jaeger or Tempo if observability is configured.
9.6 UI screenshots
Five states to know before you start. Each accordion shows a screenshot with annotations.
Screenshot 1 β Empty state (three-panel layout)
Sidebar Β· Chat panel Β· Framework Debugger (mint)
The three-panel layout on startup. Left: session sidebar (#fafaf8). Centre: chat area. Right: mint-green Framework Debugger showing "Send a message to see routing debug info". The β AUA Controls button is in the top-right corner.
Screenshot 2 β Single specialist routing
Debugger showing domain, U score, confidence after a coding query
After sending "Write binary search in Python." The debugger shows: Domain (software_engineering), Mode (single), U Score (0.7xxx), Confidence, Latency, and the Classifier Output bar chart. The response appears in the chat panel with assistant formatting.
Screenshot 3 β AUA Controls drawer (with VCG toggle)
Controls drawer open showing Arbitration Mode toggle
The amber AUA Controls drawer open. At the top: the Arbitration Mode section with Pairwise / VCG segmented toggle. With a single specialist, VCG is greyed with tooltip "Requires at least 2 specialists in aua_config.yaml". Below: routing threshold sliders, Config Management reload button, and Live Config table showing backend, port, version, and active models.
Screenshot 4 β VCG fanout query (indigo debugger)
Indigo Framework Debugger β VCG with welfare scores per specialist
With VCG enabled and 2+ specialists, a cross-domain query triggers fanout routing. The debugger shifts from mint-green to indigo. The header reads "Framework Debugger β VCG" with a purple "vcg" badge. A new "VCG Welfare Scores" section shows W_i per specialist β the winner is highlighted with a box border, bold text, and β indicator. Runners-up shown at reduced opacity. The Utility Breakdown note reads "Winner selected by welfare maximization".
Screenshot 5 β Policy active (INFO assertion bonus)
Debugger showing U score boost from INFO assertion firing
With an active policy containing INFO assertions (e.g. AnalogyBonus), a response that uses an analogy fires the positive assertion. The U score in the debugger is higher than the base score β the E bonus is applied. The routing mode remains "single" or "vcg" depending on config. Use aua logs assertions --filter passed=true to see which assertions fired.
Adding your own screenshots: Take screenshots of your running UI at http://localhost:3001, save to docs/screenshots/ (create the folder), then replace the placeholder <div class="ss-placeholder"> blocks with <img src="../docs/screenshots/ui_empty.png" alt="AUA Chat UI empty state"> etc.
What you can build with this
- A complete, private AI product β a chat interface backed entirely by your own models, no data leaving your environment.
- A way to show stakeholders every routing decision in plain language: which specialist answered, what score it got, whether the Arbiter stepped in.
- Adjust routing and config from the UI β no terminal, no restarts.
- Everything from Parts 1β9 in a single interface: routing, corrections, blue-green status, and U scores, all visible at once.
Part 10 is where the framework starts shaping itself to your definition of good output β and your definition becomes a curriculum.
Part 10 Policies & Assertions β Design your AI over time ~25 min
This is the most powerful section of the tutorial. By the end, you'll understand how to teach the framework what "good output" means β and how it uses that definition to block bad responses in real-time, track model reputation over sessions, and automatically identify gold-standard training data for fine-tuning.
The core idea. Instead of writing a long system prompt and hoping the model follows it, you write a Policy β a versioned, portable definition of what good output looks like. The framework enforces it in real-time, tracks adherence over every session, and eventually makes the defined behavior permanent through fine-tuning. Your policy becomes the model's curriculum.
10.1 The three assertion levels
Every assertion has a level that determines what happens when it fires:
| Level | What it does | Effect on U score |
|---|---|---|
BLOCKING | Fails β error injected back into prompt β specialist retried up to max_retries (default 3). User never sees a response that violates this. | U penalty if all retries exhausted |
SOFT | Fails β logged to assertion_events, response passes through. Use for guardrails you want to track without enforcing. | No U change β logged only |
INFO | Always passes. When condition fires (returns a message), adds +bonus to the Efficacy (E) score. Use for positive/incentive assertions. | E_final = min(1.0, E_base + bonus) |
10.2 Writing your first assertion
from aua.guard import assertion, AssertionLevel
# ββ Guardrail: block syntax errors from ever reaching the user βββββββββββββ
@assertion(name="PythonSyntaxCheck", level=AssertionLevel.BLOCKING)
def validate_python_syntax(output: str, context: dict) -> tuple[bool, str | None]:
"""Blocks output if any Python code block contains syntax errors."""
import ast, re
blocks = re.findall(r"```python(.*?)```", output, re.DOTALL)
if not blocks:
return True, None # no code block β pass through
for block in blocks:
try:
ast.parse(block)
except SyntaxError as e:
return False, f"Syntax error at line {e.lineno}: {e.msg}"
return True, None
# ββ Guardrail: soft-flag refusals without blocking βββββββββββββββββββββββββ
@assertion(name="NoAIisms", level=AssertionLevel.SOFT)
def no_ai_isms(output: str, context: dict) -> tuple[bool, str | None]:
"""Soft-flags common 'AI-isms' like 'as an AI language model'."""
phrases = ["as an ai", "as a language model", "i cannot help with"]
found = next((p for p in phrases if p in output.lower()), None)
if found:
return False, f"AI-ism detected: '{found}'"
return True, None
10.3 Positive assertions β rewarding gold-standard behaviour
Negative assertions block bad output. Positive assertions reward exceptional output β and this is what feeds the fine-tuning pipeline. Sessions where positive assertions fire get the highest U scores and are automatically selected as "chosen" in your DPO export.
@assertion(name="AnalogyBonus", level=AssertionLevel.INFO, bonus=0.10)
def reward_analogy(output: str, context: dict) -> tuple[bool, str | None]:
"""Rewards responses that use analogies to explain concepts."""
phrases = ["like a", "similar to", "imagine a", "think of it as", "just like"]
if any(p in output.lower() for p in phrases):
return True, "Positive: analogy used for clarity"
return True, None # neutral β no bonus if condition not met
@assertion(name="SocraticEnding", level=AssertionLevel.INFO, bonus=0.08)
def reward_question_ending(output: str, context: dict) -> tuple[bool, str | None]:
"""Rewards responses that end with an engaging question."""
if output.strip().endswith("?"):
return True, "Positive: Socratic engagement"
return True, None
@assertion(name="PythonSyntaxBonus", level=AssertionLevel.INFO, bonus=0.12)
def reward_clean_code(output: str, context: dict) -> tuple[bool, str | None]:
"""Rewards syntactically clean Python with a bonus (stack with syntax check)."""
import ast, re
blocks = re.findall(r"```python(.*?)```", output, re.DOTALL)
if blocks:
try:
for b in blocks:
ast.parse(b)
return True, "Positive: clean executable Python"
except SyntaxError:
pass
return True, None
Option B bonus cap. Each INFO assertion contributes its declared bonus independently. The sum is capped by max_total_bonus on the Policy (default 0.30), then hard-capped at 0.50. A session where all three INFO assertions above fire adds up to 0.30 to E β a meaningful signal that this session is gold-standard.
10.4 Bundling into a Policy
A Policy is a versioned bundle that groups assertions, sets retry limits, and optionally shifts utility weights when active. Think of it as a Django settings.py for your AI's behaviour.
from aua.policy import Policy
# Bundle guardrails + incentives into one named Policy
coding_policy = Policy(
name="SafeCoding",
version="1.0",
max_retries=3, # BLOCKING retries before giving up
max_total_bonus=0.30, # cap on total E bonus (Option B)
utility_overrides={
"w_k": 0.30, # slightly raise curiosity weight for this policy
}
)
# Add assertions β chaining supported
coding_policy.add(validate_python_syntax) # BLOCKING
coding_policy.add(no_ai_isms) # SOFT
coding_policy.add(reward_analogy) # INFO +0.10
coding_policy.add(reward_clean_code) # INFO +0.12
# Inspect before applying
print(coding_policy.summary())
10.5 YAML policy file (recommended for production)
name: SafeCoding
version: "1.0"
max_retries: 3
max_total_bonus: 0.30
assertions:
- import_path: mypackage.policies:validate_python_syntax
# level defaults to what's declared on the @assertion decorator
- import_path: mypackage.policies:no_ai_isms
- import_path: mypackage.policies:reward_analogy
bonus: 0.10 # override decorator default
- import_path: mypackage.policies:reward_clean_code
bonus: 0.12
utility_overrides:
w_k: 0.30
10.6 Applying a policy via CLI
# Validate schema before applying
aua policy validate policies/safe_coding.yaml
# β policies/safe_coding.yaml is valid
# Preview β see what would be activated
aua policy apply policies/safe_coding.yaml --dry-run
# Activate β writes pointer to .aua/active_policy
aua policy apply policies/safe_coding.yaml
# β Policy activated. Restart or hot-reload to apply.
# List all policies in policies/
aua policy list
# Test a single assertion against sample output
aua guard list
aua guard test --import-path mypackage.policies:validate_python_syntax
aua guard test --import-path mypackage.policies:reward_analogy \
--output "Think of it as a balanced binary tree."
10.7 The three-layer learning loop
Once a policy is active, the framework creates a feedback loop that progressively shapes model behaviour β no manual intervention required:
Layer 1 β Immediate (milliseconds). BLOCKING assertions fire on every response. If PythonSyntaxCheck fails, the error is injected back into the prompt and the specialist retries. The user only ever sees syntactically valid code.
Layer 2 β Session-by-session. Every assertion result is stored in assertion_events with a timestamp. Specialists that consistently fail assertions accumulate lower mean U scores. Lower U scores mean they don't meet the blue-green promotion delta threshold β a model that can't follow your policy doesn't advance to BLUE.
Layer 3 β Calibration (on demand). Run aua calibrate --layer 3 to export sessions where all INFO assertions fired and no BLOCKING assertion exhausted retries. These are your gold-standard sessions β ready as DPO "chosen" examples for fine-tuning. After fine-tuning, the defined behaviours are baked into the model weights, and the assertions become less necessary over time.
What you can build with this
- Bad output blocked before users ever see it β your guardrails run on every response, automatically.
- A system that rewards the behaviours you want: every session that meets your gold standard is automatically flagged as training data.
- Domain-specific personalities: strict and cautious for legal queries, curious and expressive for creative ones β all from one YAML file.
- The start of a feedback loop: every failure you define an assertion for is a failure that gets corrected, tracked, and eventually eliminated.
Part 11 shows how to close the loop β take those gold-standard sessions and turn them into the next version of your model.
Part 11 Calibration cycles ~15 min
The aua calibrate command surfaces the three feedback loops as explicit, triggerable operations. You choose when to run each one β the framework handles the analysis.
11.1 Layer 1 β Measure current performance
# Run the eval harness β same as `aua eval run` but surfaced as a calibration step aua calibrate --layer 1 --dataset evals/coding_smoke.yaml # Use the default dataset if it exists aua calibrate --layer 1
11.2 Layer 2 β Routing weight analysis
Layer 2 reads assertion event history and shows which domains are healthy vs. degrading β the signal that tells you which specialists need attention.
aua calibrate --layer 2 # Example output: # ββββββββββββββββββββββββββββ¬ββββββββββ¬ββββββββββββ¬ββββββββββββ¬βββββββββββββββ # β Domain β Queries β Pass Rate β Avg Bonus β Signal β # ββββββββββββββββββββββββββββΌββββββββββΌββββββββββββΌββββββββββββΌβββββββββββββββ€ # β software_engineering β 312 β 91.3% β +0.087 β β Strong β # β mathematics β 148 β 83.1% β +0.041 β β Stable β # β general β 44 β 56.2% β β β β Weak β # ββββββββββββββββββββββββββββ΄ββββββββββ΄ββββββββββββ΄ββββββββββββ΄βββββββββββββββ # Stagnation signal: same assertions failing week over week # β Check: is the assertion too strict? Is the model too small? aua calibrate --layer 2 --dry-run # preview only
11.3 Layer 3 β Export gold-standard DPO pairs
This is the calibration cycle that closes the loop. The framework identifies your best sessions β where the model followed your policy perfectly β and exports them as DPO training pairs.
# See what would be exported without writing files aua calibrate --layer 3 --dry-run # Example dry-run output: # Gold-standard sessions: 47 # Failed sessions: 12 # Exportable pairs: 12 # --dry-run: would export 12 DPO pairs β dpo_pairs/calibration.jsonl # Export when ready aua calibrate --layer 3 --output dpo_pairs/may_calibration.jsonl # Force export even if below min-pairs threshold aua calibrate --layer 3 --force --output dpo_pairs/early_export.jsonl # Fine-tune your specialist with the exported pairs: # Axolotl: axolotl train configs/dpo.yaml --data dpo_pairs/may_calibration.jsonl # TRL: trl dpo --dataset dpo_pairs/may_calibration.jsonl # Then deploy as GREEN: curl -X POST http://localhost:8000/deploy/green
What Layer 3 does (and doesn't do). aua calibrate --layer 3 identifies gold-standard sessions and exports DPO pairs in the format your fine-tuning framework expects. It does not fine-tune models automatically β that step runs via Axolotl, TRL, or LLaMA-Factory using the exported JSONL. After fine-tuning, deploy the new model as a GREEN candidate and let blue-green handle promotion.
What you can build with this
- A model that gets measurably better over time β not by accident, but because you've defined what better means and built a pipeline that teaches it.
- Training data you didn't have to label: the framework identified which sessions were gold-standard based on your policy.
- A clear picture of which domains are healthy and which specialists need attention β before users notice.
- The complete loop: define what good looks like β run queries β identify the best sessions β export training pairs β fine-tune β repeat.
Part 12 gives you the visibility layer β so you can actually see the improvement happening over time.
Part 12 Logs & metrics over time ~15 min
The assertion events store gives you a time-series view of how your policy is performing. These commands let you answer "is my AI actually getting better at following my policy?"
12.1 Viewing assertion events
# All recent assertion events aua logs assertions # Filter to failures only β the assertions that need attention aua logs assertions --filter passed=false # Filter by assertion name aua logs assertions --assertion PythonSyntaxCheck --tail 20 # Filter by domain aua logs assertions --filter domain=software_engineering # Export for offline analysis aua logs assertions --json > my_assertions.json
12.2 Viewing session history
# Recent sessions with U scores aua logs sessions # Export sessions to JSON aua logs export --table audit_log --output sessions.json
12.3 Comparing metrics over time
This is the "is it working?" command. It compares the current window against the prior window of the same length and shows whether the key signals are moving in the right direction.
# Compare last 30 days vs prior 30 days aua metrics --compare 30d # Example output (after a few weeks with an active policy): # βββββββββββββββββββββββββββββββ¬βββββββββββ¬βββββββββββ¬βββββββββββββββββββ # β Metric β Prior β Current β Trend β # βββββββββββββββββββββββββββββββΌβββββββββββΌβββββββββββΌβββββββββββββββββββ€ # β Mean U score β 0.6213 β 0.6891 β β +0.0678 β # β Assertion fail rate β 0.2341 β 0.1102 β β -0.1239 β β good # β Retry rate (BLOCKING) β 0.1820 β 0.0890 β β -0.0930 β β good # β Avg E bonus (INFO) β 0.0120 β 0.0654 β β +0.0534 β # β Total queries β 312 β 481 β β +0.0000 β # βββββββββββββββββββββββββββββββ΄βββββββββββ΄βββββββββββ΄βββββββββββββββββββ # Success signal: mean_u_score β, assertion_fail_rate β, retry_rate β # Stagnation signal: same assertions failing week over week # Focus on a single metric aua metrics --compare 7d --metric assertion_fail_rate # Date range aua metrics --compare 2025-04-01:2025-05-01 # JSON output for charting aua metrics --compare 30d --json
What success looks like. Mean U score trending up. Assertion fail rate trending down. Retry rate (BLOCKING) falling β meaning the model is learning to get it right on the first try. After a fine-tuning cycle, you may see a step-change drop in fail rate as the trained behaviour is baked into the weights.
What stagnation looks like. The same assertions failing week over week. This means either the assertion is too strict for the model's capability, or the model isn't receiving enough signal to learn. Check: is max_retries too low? Is the policy active on enough queries to accumulate data?
12.4 The full policy workflow in practice
Putting it all together β this is the cycle for designing and refining your AI over time:
# 1. Week 1-4: run queries with policy active, accumulate assertion events
# 2. Check Layer 2 health at any point
aua calibrate --layer 2
# 3. Review failures β add/refine assertions if the same things keep failing
aua logs assertions --filter passed=false --tail 50
# 4. End of month: export gold-standard sessions
aua calibrate --layer 3 --dry-run # preview
aua calibrate --layer 3 # export to dpo_pairs/calibration.jsonl
# 5. Fine-tune your specialist on the exported pairs (external step)
# trl dpo --dataset dpo_pairs/calibration.jsonl
# 6. Deploy the fine-tuned model as GREEN
curl -X POST http://localhost:8000/deploy/green \
-d '{"specialist": "swe", "new_model": "qwen2.5-coder:14b-finetuned"}'
# 7. Blue-green evaluates and promotes if U score delta passes threshold
aua status --once # watch the promotion
# 8. Compare metrics to confirm improvement
aua metrics --compare 30d
# Repeat β each cycle, the model gets better at following your policy.
# After a few cycles, the assertions become less necessary because the
# defined behaviours are baked into the model weights.
What you can build with this
- Full visibility into whether your AI is improving β assertion fail rate trending down, U score trending up, in whatever monitoring stack you use.
- One
trace_idthat links a specific response to its log line, its Prometheus metrics, and its distributed trace β the full story of what happened. - Alerts before users notice: U score drops, assertion failure spikes, latency regressions β all triggerable from the same metrics.
- A system you can hand to an ops team: ELK, Splunk, Grafana, Loki β whatever they already use, with working configs and the right fields already in every log line.
The how-to guides cover everything you need for production: plugins, security, Docker, and full observability setup.
How-to guides
Task-oriented guides for specific things you need to accomplish.
How-to 13 Plugin system β all interfaces ~90 min
AUA has 8 plugin interfaces and 1 middleware interface. Each one replaces a specific internal layer. No inheritance required β implement the methods, register in YAML, and the framework uses your version instead of the built-in.
The pattern is always the same: minimum viable (implement only the required method with the minimum signature), then full version (using all available parameters). Like Python's sorted() β one argument works, but key= and reverse= give you more control.
No base class, no inheritance. AUA uses Python Protocol for structural subtyping. Your class just needs the right method names and signatures. If it has them, it satisfies the interface β the framework verifies this at load time with isinstance() check.
The constructor contract. The config: mapping under your plugin entry is splatted as keyword arguments β the loader calls YourClass(**config). So config: {confidence_boost: 1.1} means KeywordClassifier(confidence_boost=1.1). Write def __init__(self, confidence_boost: float = 1.0), give every optional key a default, and omit defaults for keys you want to be required (missing β fail-fast at startup). Never write def __init__(self, config: dict).
Where the code lives β and how to know it loaded
Plugins are a normal Python package sitting next to your config. The router inserts the config file's directory onto sys.path at startup (the same trick manage.py does for a Django project), so plugins.mine:KeywordClassifier resolves with zero packaging work:
my-aua-project/ βββ aua_config.yaml # import paths resolve relative to this file's directory βββ plugins/ βββ __init__.py # required β plugins/ is a package βββ mine.py # your classes; any module name works
The verification loop, in the order you'll actually use it:
# 1. Pre-flight before touching config: imports + contract-validates in isolation aua extensions test --kind field_classifier --import-path plugins.mine:KeywordClassifier # 2. Register in aua_config.yaml (each interface below shows its block), then aua config validate # typo'd kind, bad hook point, malformed import path β caught here aua serve # startup logs: "Plugin loaded from config: field_classifier β plugins.mine:KeywordClassifier" # 3. Ask the RUNNING server what it loaded (null = built-in): curl -s localhost:8000/extensions | python3 -m json.tool
Which kinds wire from YAML: field_classifier, utility_scorer, and correction_store take effect the moment the server starts, as do all hooks: and middleware:. The remaining kinds (arbiter_policy, promotion_policy, model_backend, state_store) still load and contract-validate from config β a typo'd import path fails at startup, not in production β and attach programmatically at the points shown in their sections below.
Jump to: FieldClassifier Β· UtilityScorer Β· ArbiterPolicy Β· PromotionPolicy Β· CorrectionStore Β· ModelBackend Β· StateStore Β· Middleware
13.1 FieldClassifierPlugin β custom query routing logic
What it replaces: The built-in TF-IDF/embedding field classifier that decides which specialist handles each query.
When to write one: You have a proprietary taxonomy, an existing intent classifier, or domain-specific routing rules that the built-in classifier doesn't know about.
Required method: classify(query: str) β dict[str, float]
class KeywordClassifier:
def classify(self, query: str) -> dict[str, float]:
"""Route based on keywords. Probabilities must sum to β€ 1.0."""
q = query.lower()
if any(w in q for w in ["sort", "binary", "algorithm", "complexity"]):
return {"software_engineering": 0.9, "mathematics": 0.1}
if any(w in q for w in ["integral", "derivative", "proof"]):
return {"mathematics": 0.95}
return {"general": 1.0} # fallback
class KeywordClassifier:
def __init__(self, confidence_boost: float = 1.0):
# YAML config keys arrive as KEYWORD ARGUMENTS β cls(**config).
# plugins.field_classifier.config: {confidence_boost: 1.1} calls
# KeywordClassifier(confidence_boost=1.1). Give every key a default.
self.boost = float(confidence_boost)
def classify(self, query: str) -> dict[str, float]:
"""
query: str β raw user query text (pre-processed, no conversation history)
Returns: dict[field_name β probability]
- field names must match specialist 'field' values in aua_config.yaml
- probabilities should sum to β€ 1.0 (remainder is implicitly 'unknown')
- returning {} triggers arbiter fallback
"""
q = query.lower()
scores: dict[str, float] = {}
if any(w in q for w in ["sort", "binary", "algorithm", "def ", "class "]):
scores["software_engineering"] = 0.85 * self.boost
if any(w in q for w in ["integral", "derivative", "theorem", "proof"]):
scores["mathematics"] = 0.90 * self.boost
# Normalise so probabilities sum to 1.0
total = sum(scores.values())
if total > 1.0:
scores = {k: v / total for k, v in scores.items()}
return scores or {"general": 1.0}
plugins:
field_classifier:
import_path: plugins.my_classifier:KeywordClassifier
config:
confidence_boost: 1.1
13.2 UtilityScorerPlugin β custom U score
What it replaces: The built-in U = w_eΒ·E + w_cΒ·C + w_kΒ·K scorer.
When to write one: You want domain-specific scoring logic β e.g. penalise hallucinations more in a medical domain, or reward shorter answers in a support context.
Required method: score(response, field, prior_u, confidence, metadata) β float
class RiskWeightedScorer:
def score(
self,
response: str, # the specialist's text output
field: str, # field name e.g. "software_engineering"
prior_u: float, # running mean U for this specialist (0.0β1.0)
confidence: float, # Kalman-filtered confidence (0.0β1.0)
metadata: dict, # {"session_id", "query", "latency_ms", ...}
) -> float: # return U score in [0.0, 1.0]
return 0.5 * confidence + 0.5 * prior_u
class RiskWeightedScorer:
def __init__(self, risk_threshold: float = 0.80, length_penalty: float = 0.0):
self.risk_threshold = float(risk_threshold)
self.length_penalty = float(length_penalty)
def score(self, response: str, field: str, prior_u: float,
confidence: float, metadata: dict) -> float:
# metadata keys available: session_id, query, latency_ms, specialist
# Lower efficacy when confidence is below threshold
efficacy = prior_u
if confidence < self.risk_threshold:
efficacy *= 0.6
# Penalise very long responses (optional)
word_count = len(response.split())
length_factor = max(0.7, 1.0 - self.length_penalty * (word_count / 1000))
u = 0.5 * efficacy + 0.4 * confidence + 0.1 * (prior_u * length_factor)
return round(min(1.0, max(0.0, u)), 4)
plugins:
utility_scorer:
import_path: plugins.risk_scorer:RiskWeightedScorer
config:
risk_threshold: 0.85
length_penalty: 0.1
13.3 ArbiterPolicyPlugin β custom contradiction arbitration
What it replaces: The built-in 4-check arbitration logic (logical, mathematical, cross-session, empirical).
When to write one: You want domain-specific arbitration β e.g. always prefer the response from the specialist with higher past U score, or use an external fact-checker API.
Required method: arbitrate(subject, domain, output_a, output_b, metadata) β dict
class LengthArbiter:
def arbitrate(
self,
subject: str, # short subject identifier e.g. "heapsort_complexity"
domain: str, # field name
output_a: str, # first specialist's full response text
output_b: str, # second specialist's full response text
metadata: dict, # {"session_id", "field_penalty_multiplier", ...}
) -> dict:
# Required return keys:
return {
"case": "case_1", # "case_1"|"case_2"|"case_3"|"case_4"
"correct_a": False, # True β store correction for specialist A
"correct_b": False, # True β store correction for specialist B
"verified_claim": None, # str if a fact was verified, else None
"external_response": output_a, # what the user sees
}
class ConfidenceArbiter:
def __init__(self, prefer_longer: bool = False):
self.prefer_longer = prefer_longer
def arbitrate(self, subject: str, domain: str, output_a: str,
output_b: str, metadata: dict) -> dict:
# metadata["specialist_a_confidence"] and ["specialist_b_confidence"]
# are available when fanout routing produced both responses
conf_a = metadata.get("specialist_a_confidence", 0.5)
conf_b = metadata.get("specialist_b_confidence", 0.5)
if abs(conf_a - conf_b) < 0.05:
# Too close to call β both acceptable (case_1: no contradiction)
winner = output_a if (not self.prefer_longer or len(output_a) >= len(output_b)) else output_b
return {"case": "case_1", "correct_a": False, "correct_b": False,
"verified_claim": None, "external_response": winner}
if conf_a > conf_b:
# A wins β B needs correction (case_2)
return {"case": "case_2", "correct_a": False, "correct_b": True,
"verified_claim": output_a[:200], "external_response": output_a}
else:
# B wins β A needs correction (case_3)
return {"case": "case_3", "correct_a": True, "correct_b": False,
"verified_claim": output_b[:200], "external_response": output_b}
# case_4 = both wrong β use "external_response" = your fallback text
plugins:
arbiter_policy:
import_path: plugins.simple_arbiter:ConfidenceArbiter
config:
prefer_longer: true
13.4 PromotionPolicyPlugin β custom blue-green promotion logic
What it replaces: The built-in delta + T_min promotion threshold check.
When to write one: You want additional promotion criteria β e.g. minimum number of queries, assertion pass rate threshold, or a human approval step.
Required method: should_promote(specialist, blue_mean_u, green_mean_u, n_queries, metadata) β bool
class StrictPromoter:
def should_promote(
self,
specialist: str, # specialist name e.g. "swe"
blue_mean_u: float, # BLUE model's mean U over evaluation period
green_mean_u: float, # GREEN candidate's mean U
n_queries: int, # number of evaluation queries run
metadata: dict, # {"delta", "T_min", "tau", "config", ...}
) -> bool: # True = promote, False = keep BLUE
return green_mean_u > blue_mean_u + 0.05 and n_queries >= 50
class StrictPromoter:
def __init__(self, min_delta: float = 0.05, min_queries: int = 50,
max_assertion_fail_rate: float = 0.10):
self.min_delta = float(min_delta)
self.min_queries = int(min_queries)
self.max_fail_rate = float(max_assertion_fail_rate)
def should_promote(self, specialist: str, blue_mean_u: float,
green_mean_u: float, n_queries: int, metadata: dict) -> bool:
# Gate 1: minimum evaluation period
if n_queries < self.min_queries:
return False
# Gate 2: meaningful U improvement
if green_mean_u - blue_mean_u < self.min_delta:
return False
# Gate 3: assertion pass rate (if available in metadata)
fail_rate = metadata.get("assertion_fail_rate", 0.0)
if fail_rate > self.max_fail_rate:
return False
return True
plugins:
promotion_policy:
import_path: plugins.strict_promoter:StrictPromoter
config:
min_delta: 0.08
min_queries: 100
max_assertion_fail_rate: 0.05
13.5 CorrectionStorePlugin β custom correction backend
What it replaces: The built-in in-memory + SQLite AssertionsStore.
When to write one: You want corrections stored in Postgres, Redis, or a vector database β or you need multi-tenant isolation (pending v1.1 per-user scoping).
Required methods: store(), query(), export_dpo_pairs()
class InMemoryCorrectionStore:
def __init__(self):
self._data: list[dict] = []
def store(
self,
subject: str, # what the assertion is about e.g. "heapsort_complexity"
domain: str, # field name
claim: str, # the verified fact string
confidence: float, # 0.0β1.0 confidence at write time
) -> None:
self._data.append({"subject": subject, "domain": domain,
"claim": claim, "confidence": confidence})
def query(
self,
subject: str, # filter by subject (partial match OK)
domain: str | None, # optional domain filter
) -> list[dict]:
return [d for d in self._data
if subject.lower() in d["subject"].lower()
and (domain is None or d["domain"] == domain)]
def export_dpo_pairs(
self,
domain: str | None, # None = all domains
limit: int, # max pairs to return
) -> list[dict]:
# Return list of {"chosen": str, "rejected": str} dicts
pairs = [{"chosen": d["claim"], "rejected": ""} for d in self._data
if domain is None or d["domain"] == domain]
return pairs[:limit]
plugins:
correction_store:
import_path: plugins.pg_store:InMemoryCorrectionStore
config:
connection_string_secret: POSTGRES_URL # for a real Postgres implementation
13.6 ModelBackendPlugin β connect any LLM serving infrastructure
Not yet wired for per-specialist dispatch β roadmap #74. The framework loads and validates your plugin class at startup (a bad import path fails fast), but the router's _call() does not yet dispatch to the plugin at query time. All traffic still goes through the built-in vLLM/Ollama HTTP path. The prebuilt plugins (OpenAI, Anthropic, etc.) are also affected. Planned for #74.
Workaround: use a routing_strategy plugin (section 19.4) to redirect traffic to an external HTTP endpoint of your choice, or use a field_classifier plugin that maps queries to a specialist already backed by a frontier API endpoint.
What it will replace: The built-in vLLM and Ollama HTTP clients.
When to write one: You want to route to a commercial API (OpenAI, Anthropic, Cohere), an internal gateway, or any serving stack that isn't vLLM or Ollama.
Prebuilt plugins available. Seven production-ready frontier model backends are included in aua/plugins/prebuilt/ β contributed from AUA-Veritas. Drop them into your config without writing any code:
| Plugin | Models | import_path |
|---|---|---|
OpenAIBackend | GPT-4o, GPT-4o mini | aua.plugins.prebuilt.openai_backend:OpenAIBackend |
AnthropicBackend | Claude Sonnet 4.5, Haiku 4.5 | aua.plugins.prebuilt.anthropic_backend:AnthropicBackend |
GoogleBackend | Gemini 1.5 Pro, 2.0 Flash | aua.plugins.prebuilt.google_backend:GoogleBackend |
XAIBackend | Grok-2 | aua.plugins.prebuilt.xai_backend:XAIBackend |
MistralBackend | Mistral Large | aua.plugins.prebuilt.mistral_backend:MistralBackend |
GroqBackend | Llama 3.3 70B | aua.plugins.prebuilt.groq_backend:GroqBackend |
DeepSeekBackend | DeepSeek-V3, DeepSeek-R1 | aua.plugins.prebuilt.deepseek_backend:DeepSeekBackend |
plugins:
model_backend:
import_path: aua.plugins.prebuilt.openai_backend:OpenAIBackend
config:
api_key_secret: OPENAI_API_KEY # resolved from env or secrets provider
model: gpt-4o
Required methods: complete(), stream(), health() β all async.
import httpx
from collections.abc import AsyncIterator
class OpenAIBackend:
def __init__(self, api_key: str, model: str = "gpt-4o-mini",
base_url: str = "https://api.openai.com"):
self.api_key = api_key # required key β no default, fails fast if missing
self.model = model
self.base_url = base_url
async def complete(self, request: dict) -> dict:
# request is OpenAI-compatible: {"model", "messages", "temperature", ...}
# Return OpenAI-compatible response with choices[0].message.content
async with httpx.AsyncClient() as client:
r = await client.post(
f"{self.base_url}/v1/chat/completions",
headers={"Authorization": f"Bearer {self.api_key}"},
json={**request, "model": self.model},
timeout=30.0,
)
return r.json()
async def stream(self, request: dict) -> AsyncIterator[str]:
# Yield token strings as they arrive
async with httpx.AsyncClient() as client:
async with client.stream(
"POST", f"{self.base_url}/v1/chat/completions",
headers={"Authorization": f"Bearer {self.api_key}"},
json={**request, "model": self.model, "stream": True},
) as r:
async for line in r.aiter_lines():
if line.startswith("data: ") and "[DONE]" not in line:
import json
chunk = json.loads(line[6:])
token = chunk["choices"][0]["delta"].get("content", "")
if token:
yield token
async def health(self) -> dict:
# Return {"status": "ok"|"error", "latency_ms": float}
try:
async with httpx.AsyncClient() as client:
import time
t0 = time.time()
await client.get(f"{self.base_url}/v1/models",
headers={"Authorization": f"Bearer {self.api_key}"},
timeout=5.0)
return {"status": "ok", "latency_ms": (time.time() - t0) * 1000}
except Exception as e:
return {"status": "error", "error": str(e), "latency_ms": 0.0}
plugins:
model_backend:
import_path: plugins.openai_backend:OpenAIBackend
config:
api_key_secret: OPENAI_API_KEY # resolved from env at startup
model: gpt-4o-mini
base_url: https://api.openai.com
13.7 StateStorePlugin β custom session/state backend
Not yet wired β roadmap #75. The framework loads and validates your plugin class at startup, but all state writes (corrections, audit log, model_runs, sessions) still go to the built-in SQLite store. The init ordering constraint (BatchQueue, ShadowStore, DomainTree, and others all receive self._state_store at construction time) must be resolved before this plugin can be activated. Planned for #75.
Workaround: for persistence across instances, configure Postgres or Redis at the OS level and point SQLite to a shared path via NFS, or use the correction_store plugin (which is wired) to redirect DPO pair storage specifically.
What it will replace: The built-in SQLite state store.
When to write one: You want sessions, corrections, and audit events stored in Postgres, Redis, or a shared datastore across multiple router instances.
ποΈ Full DB schema β all tables and columns β
Required methods: get(), set(), append(), query()
import uuid
class DictStateStore:
def __init__(self):
self._store: dict[str, dict] = {} # table:key β record
self._lists: dict[str, list] = {} # append-only tables
def get(
self,
table: str, # table name e.g. "sessions", "corrections", "assertion_events"
key: str, # record key (UUID string)
) -> dict | None:
return self._store.get(f"{table}:{key}")
def set(
self,
table: str,
key: str,
value: dict, # arbitrary JSON-serialisable dict
) -> None:
self._store[f"{table}:{key}"] = value
def append(
self,
table: str,
record: dict, # record to append (no key β framework generates one)
) -> str: # return the generated record ID
record_id = str(uuid.uuid4())
self._lists.setdefault(table, []).append({**record, "id": record_id})
return record_id
def query(
self,
table: str,
filters: dict = {}, # {field_name: value} equality filters
limit: int = 100,
) -> list[dict]:
rows = self._lists.get(table, [])
for k, v in filters.items():
rows = [r for r in rows if r.get(k) == v]
return rows[:limit]
plugins:
state_store:
import_path: plugins.redis_store:DictStateStore
config:
url_secret: REDIS_URL
13.8 AUAMiddleware β before/after every request
What it replaces: Nothing β middleware is additive. It wraps every request/response pair.
When to write one: PII redaction, request logging, tenant routing, rate limiting, or any transformation that needs to run on every query.
Required methods: before_query(request: dict) β dict and after_response(response: dict) β dict β both async.
import re
class PIIRedactionMiddleware:
async def before_query(self, request: dict) -> dict:
# request keys: "query", "session_id", "conversation_history", "force_domain"
# Modify and return request dict. Raise to abort the request.
request["query"] = re.sub(r"\b\d{3}-\d{2}-\d{4}\b", "[SSN]", request["query"])
return request
async def after_response(self, response: dict) -> dict:
# response keys: "response", "u_score", "domain", "routing_mode",
# "confidence", "latency_ms", "contradictions_detected"
# Return response unchanged if you don't need to modify it.
return response
import logging, re, time
log = logging.getLogger(__name__)
class PIIRedactionMiddleware:
def __init__(self, patterns: list[str] | None = None):
self.patterns = patterns or [r"\b\d{3}-\d{2}-\d{4}\b"]
self._compiled = [re.compile(p) for p in self.patterns]
async def before_query(self, request: dict) -> dict:
original = request["query"]
redacted = original
for pattern in self._compiled:
redacted = pattern.sub("[REDACTED]", redacted)
if redacted != original:
log.info("PII redacted in session %s", request.get("session_id"))
request["query"] = redacted
request["_redacted"] = redacted != original # pass context forward
return request
async def after_response(self, response: dict) -> dict:
# Optionally add middleware metadata to response
if response.get("_redacted"):
response["metadata"] = response.get("metadata", {})
response["metadata"]["pii_redacted"] = True
return response
middleware: # TOP-LEVEL list β runs in order - import_path: plugins.pii_middleware:PIIRedactionMiddleware config: patterns: - '\b\d{3}-\d{2}-\d{4}\b' # SSN - '\b[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}\b' # email - import_path: plugins.tenant_policy:TenantMiddleware - plugins.simple_logger:LogMiddleware # bare string works when there's no config
13.9 Testing plugins before registering
# Pre-flight: import + contract-validate before touching config (run from the project dir) aua extensions test --kind utility_scorer --import-path plugins.risk_scorer:RiskWeightedScorer aua extensions inspect plugins.risk_scorer:RiskWeightedScorer # β Interface satisfied: UtilityScorerPlugin # β score() signature valid # To pick up changes: restart aua serve
The sections below complete the plugin reference β new interfaces added in v1.2.
Every major decision point in AUA β routing, scoring, arbitration, promotion, contradiction detection, assertion storage β has a formal Python Protocol interface you can replace by registering a class in aua_config.yaml. You never fork the source. You write a class, register it, and the router wires it at startup.
This How-to walks through all plugin types from scratch, with working examples. By the end you will have replaced the utility scorer, added a custom routing strategy, implemented a non-linear utility function, and configured multi-tenancy. Each section can be read independently if you only need one plugin type.
13.10 How plugins are loaded β the five-step lifecycle
When the router starts, it reads plugins: from aua_config.yaml, then for each entry:
- Parses
import_pathasmodule.path:ClassName - Adds your project root to
sys.path(soplugins.scoring:MyScorerresolves to{project}/plugins/scoring.py) - Imports the module and instantiates the class, passing
config:dict as kwargs - Validates the instance against its Protocol using
isinstance()β fails fast at startup, not silently at query time - Wires the plugin into the relevant decision point (e.g. replaces
self._custom_scorer)
plugins: utility_scorer: # which slot import_path: plugins.scoring:MyScorer # module:ClassName config: # passed as **kwargs to __init__ risk_weight: 0.7
Structural typing β no inheritance needed. AUA uses Python Protocols. Your class does not import or extend anything from AUA. It just needs the right method name and signature. This means you can wrap any existing class β an sklearn model, a FastAPI endpoint, a numpy function β as a plugin with a thin adapter.
13.11 Plugin types at a glance
| YAML key | Protocol class | What it replaces | Required method | Fallback on error |
|---|---|---|---|---|
field_classifier | FieldClassifierPlugin | Domain classifier | classify(query) β dict[str,float] | Built-in classifier |
utility_scorer | UtilityScorerPlugin | Final U score (adjustment) | score(response, field, prior_u, confidence, metadata) β float | Built-in U |
full_utility_scorer | FullUtilityScorerPlugin | Entire U computation | score_full(field, efficacy, confidence, curiosity, weights, metadata) β float | Falls back to score() then built-in |
contradiction_detector | ContradictionDetectorPlugin | Built-in code checker | check(problem, solution, claimed_complexity=None) β dict | Built-in detector |
assertion_store | AssertionStorePlugin | In-memory AssertionsStore | add(), query(), query_contradictions() | β |
routing_strategy | RoutingStrategyPlugin | Post-classifier distribution | route(query, distribution, metadata) β dict[str,float] | Classifier output |
scoring_component | ScoringComponentPlugin | One sub-score (E, C, or K) | compute(component, value, field, metadata) β float | Built-in sub-score |
arbiter_policy | ArbiterPolicyPlugin | LLM arbitration call | arbitrate(subject, domain, output_a, output_b, metadata) β dict | Built-in LLM arbiter |
promotion_policy | PromotionPolicyPlugin | Promotion gate (simple) | should_promote(specialist, blue_mean_u, green_mean_u, n_queries, metadata) β bool | Built-in threshold |
full_promotion_policy | FullPromotionPolicyPlugin | Promotion gate (full context) | should_promote_full(context: dict) β bool | Falls back to should_promote() |
correction_store | CorrectionStorePlugin | DPO pair / correction storage | store(), query(), export_dpo_pairs() | β |
Every plugin has a safe fallback. If your plugin raises an unhandled exception at query time, AUA logs it at DEBUG level and falls back to the built-in implementation. Your production traffic is never blocked by a plugin bug. Fix and redeploy at your own pace.
13.12 Field classifier plugin β route queries differently
The built-in classifier uses a keyword/embedding model trained on the 11 built-in fields. Replace it when you have tenant-specific routing rules, a proprietary domain taxonomy, or an external classification service.
class TenantAwareClassifier:
"""
Route based on tenant context first, then keyword fallback.
Reads the tenant ID set by TenantPolicyMiddleware.
"""
def __init__(self, tenant_overrides: dict):
self.overrides = tenant_overrides # e.g. {"tenant-finance": "mathematics"}
def classify(self, query: str) -> dict[str, float]:
from aua.tenancy import get_tenant_id
tenant = get_tenant_id()
if tenant and tenant in self.overrides:
return {self.overrides[tenant]: 1.0}
if any(w in query.lower() for w in ["integral", "derivative", "proof"]):
return {"mathematics": 0.92, "software_engineering": 0.05}
return {"software_engineering": 0.85, "mathematics": 0.10}
plugins:
field_classifier:
import_path: plugins.routing:TenantAwareClassifier
config:
tenant_overrides:
tenant-finance: mathematics
tenant-devtools: software_engineering
Your classify() receives the raw query string and must return a dict mapping field names to probabilities (0.0β1.0). Probabilities should sum to β€ 1.0 (the remainder is treated as "unknown" and routes to the arbiter). Returning {"software_engineering": 1.0} forces single-specialist routing to the swe specialist.
13.13 Routing strategy plugin β intercept after classification
A routing_strategy plugin sits between the classifier and the routing threshold decision. It receives the probability distribution and can reorder, cap, or override it. This runs after the classifier and before the single/fanout/arbiter threshold check β so the router's mode decision uses your adjusted distribution.
class PrefixRouter:
"""
Queries starting with [math] always go to the mathematics specialist.
Useful for power users who know which specialist they want.
"""
def route(self, query: str, distribution: dict, metadata: dict) -> dict:
q = query.strip()
if q.startswith("[math]"):
return {"mathematics": 1.0}
if q.startswith("[code]"):
return {"software_engineering": 1.0}
return distribution # pass through unchanged
plugins:
routing_strategy:
import_path: plugins.routing:PrefixRouter
The metadata dict contains session_id. When force_domain is set on a request, the routing strategy is skipped β explicit overrides always win.
13.14 Contradiction detector plugin β custom validation
The built-in detector catches syntax errors, logical contradictions, and cross-session conflicts in code generation. Replace it when you need domain-specific validation β checking medical claims against a drug database, verifying proofs with SymPy, or using a fine-tuned classifier.
def check(self, problem: str, solution: str,
claimed_complexity: str | None = None) -> dict:
return {
"contradictions": [
{
"type": "domain_specific", # any string label
"description": "Claim contradicts known literature",
"severity": 0.8, # 0.0 to 1.0
}
],
"confidence_penalty": 0.3, # total deducted from confidence score
"is_clean": False, # True when contradictions list is empty
}
Tip: use the built-in empirical module inside your plugin. from aua.empirical import empirical_check gives you the same SymPy / arXiv / PubMed cross-check the Arbiter Stage 4 uses. Call it from your detector to get external ground-truth verification at the per-response level.
plugins:
contradiction_detector:
import_path: plugins.validation:MyDomainDetector
13.15 Assertion store plugin β persistent claim memory
The built-in AssertionsStore is in-memory and resets on restart. Replace it when you want verified claims to survive restarts, be shared across instances, or be queryable from outside AUA β for example, via a Postgres database that a reporting tool can also read.
import psycopg2
class PostgresAssertionStore:
def __init__(self, dsn: str):
self.conn = psycopg2.connect(dsn)
def add(self, subject, domain, claim, confidence,
source="arbiter", evidence_summary=""):
with self.conn.cursor() as cur:
cur.execute(
"INSERT INTO assertions (subject, domain, claim, confidence, source)"
" VALUES (%s,%s,%s,%s,%s) ON CONFLICT (subject,domain) DO UPDATE"
" SET claim=EXCLUDED.claim, confidence=EXCLUDED.confidence",
(subject, domain, claim, confidence, source)
)
self.conn.commit()
def query(self, subject, domain=None, min_confidence=None):
... # SELECT WHERE subject ILIKE %s AND confidence >= %s
def query_contradictions(self, subject, new_claim, domain=None):
... # compare new_claim against stored claims
assertion_store vs correction_store. These are two different stores. assertion_store holds knowledge-level verified claims (what the arbiter learned is true about "bubble sort complexity"). correction_store holds DPO training pairs (which response was better and why). They are independent plugin slots with different interfaces.
13.16 Scoring component plugin β adjust one sub-score
The utility function is U = w_eΒ·E + w_cΒ·C + w_kΒ·K. A scoring_component plugin intercepts one component (E, C, or K) after the built-in pipeline computes it, applies your adjustment, then the weighted sum is recomputed with your adjusted value. This is right when you want to bias one dimension without touching the overall architecture.
class LengthAwareEfficacy:
"""
Responses shorter than min_chars get an efficacy penalty.
Encourages the specialist to give complete answers.
"""
def __init__(self, min_chars: int = 200):
self.min_chars = min_chars
def compute(self, component: str, value: float,
field: str, metadata: dict) -> float:
if component != "efficacy":
return value # pass C and K through unchanged
response = metadata.get("response", "")
if len(response) < self.min_chars:
shortfall = 1.0 - len(response) / self.min_chars
return max(0.0, value * (1.0 - 0.4 * shortfall))
return value
plugins:
scoring_component:
import_path: plugins.scoring:LengthAwareEfficacy
config:
min_chars: 300
The metadata dict available inside compute() contains: query, response, pass_rate, and contradiction_penalty.
13.17 Utility scorer β adjustment mode vs full replacement
There are two ways to customise the utility score. Understanding the difference is important before choosing one.
Adjustment mode (utility_scorer) β the built-in pipeline runs first, produces a U score, passes it to your plugin as prior_u. Your plugin returns a scalar. Use this when you want to post-process the built-in score β multiplying by a risk factor, capping on confidence, or blending with an external signal.
Full replacement mode (full_utility_scorer) β the built-in w_eΒ·E + w_cΒ·C + w_kΒ·K step is skipped entirely. Your plugin receives the raw E, C, K components and can compute any function of them. Use this when the linear form is wrong for your domain.
Why the built-in is linear β Axiom A5
The linear form is not arbitrary. It follows necessarily from five axioms proved in Appendix B.1 of the whitepaper. The load-bearing axiom is A5 (linear scaling): if you scale all three inputs by the same factor Ξ», utility scales by the same factor. A5 forces the component functions to be linear. It is what makes the additive representation theorem (Theorem B.1) work.
score_full() deliberately bypasses A5. You lose the theoretical guarantee of Theorem B.1 but gain the freedom to express any utility model. Non-linear models often outperform the linear form empirically even when they lack the formal proof β the Appendix B.1 Remark on A5 as the "load-bearing axiom" is exactly about this tradeoff.
What score_full() receives
| Argument | Type | What it is |
|---|---|---|
field | str | Domain name β "surgery", "mathematics", "software_engineering" β¦ |
efficacy | float 0β1 | E_ema β EMA-accumulated efficacy (Ξ±=0.2): tracks how well the specialist performs over time |
confidence | float 0β1 | Kalman-filtered confidence after contradiction penalty has been applied |
curiosity | float 0β1 | K_effective = K_base + gap_bonus from Arbiter Case 3 |
weights | dict | {"w_e": float, "w_c": float, "w_k": float} from this field's config |
metadata | dict | query, response, pass_rate, task_score (the full built-in TaskScore object) |
Non-linear utility functions you can implement
def score_full(self, field, efficacy, confidence, curiosity, weights, metadata):
if field == "surgery":
# C=0.9 β multiplier 0.81; C=0.5 β multiplier 0.25
return min(1.0, efficacy * (confidence ** 2) * (1 + 0.1 * curiosity))
# all other fields: standard linear
return (weights["w_e"]*efficacy + weights["w_c"]*confidence + weights["w_k"]*curiosity)
def score_full(self, field, efficacy, confidence, curiosity, weights, metadata):
return min(1.0, efficacy * confidence * (1 + weights["w_k"] * curiosity))
def score_full(self, field, efficacy, confidence, curiosity, weights, metadata):
e = max(0.001, efficacy)
c = max(0.001, confidence)
k = max(0.001, curiosity)
# w_e + w_c + w_k = 1.0 so this is a proper weighted geometric mean
return min(1.0, (e ** weights["w_e"]) * (c ** weights["w_c"]) * (k ** weights["w_k"]))
def score_full(self, field, efficacy, confidence, curiosity, weights, metadata):
return min(efficacy, confidence) # curiosity is excluded from the minimum
from aua.config import FIELD_CONFIGS
def score_full(self, field, efficacy, confidence, curiosity, weights, metadata):
c_min = FIELD_CONFIGS.get(field, FIELD_CONFIGS["general"]).c_min
if confidence < c_min:
return confidence * 0.1 # near-zero: this answer should not reach the user
return (weights["w_e"]*efficacy + weights["w_c"]*confidence + weights["w_k"]*curiosity)
class SurgeryAwareScorer:
def score(self, response, field, prior_u, confidence, metadata) -> float:
return prior_u # fallback: return built-in unchanged
def score_full(self, field, efficacy, confidence, curiosity,
weights, metadata) -> float:
if field == "surgery":
return min(1.0, efficacy * (confidence ** 2))
return (weights["w_e"]*efficacy
+ weights["w_c"]*confidence
+ weights["w_k"]*curiosity)
plugins:
full_utility_scorer:
import_path: plugins.scoring:SurgeryAwareScorer
Fallback chain. If score_full() raises, AUA falls back to score(). If that raises, it uses the built-in linear U. Always implement score() returning prior_u unchanged as a safety net β it costs you nothing and guarantees traffic is never blocked by a scoring bug.
13.18 Arbiter policy plugin β replace the LLM arbitration call
When two specialists disagree (fanout routing), AUA calls the arbiter LLM to decide the winner. Replace this with a deterministic rule, a faster heuristic, or a domain-specific logic layer to cut latency or use stronger domain knowledge.
def arbitrate(self, subject: str, domain: str,
output_a: str, output_b: str, metadata: dict) -> dict:
return {
"winner": "A", # "A" | "B" | "BOTH_WRONG"
"reason": "A includes the time complexity proof",
"external_response": "", # shown to user only when BOTH_WRONG
"case": "case_1", # optional metadata for DPO labelling
}
metadata contains: domain_a, domain_b, specialist_a, specialist_b.
class LengthHeuristicArbiter:
"""
Pick the more detailed response. Fast, deterministic, zero-cost.
Good baseline before you have enough DPO data to train a judge.
"""
def arbitrate(self, subject, domain, output_a, output_b, metadata):
winner = "A" if len(output_a) >= len(output_b) else "B"
return {
"winner": winner,
"reason": f"{winner} is more complete ({len(output_a)} vs {len(output_b)} chars)",
"external_response": "",
}
plugins:
arbiter_policy:
import_path: plugins.arbitration:LengthHeuristicArbiter
13.19 Promotion policy plugin β any function of any signal
The default promotion gate is green_u - blue_u >= threshold β a simple scalar comparison. Replace it when you need multi-factor gating, confidence intervals, minimum sample sizes, or non-linear combinations of the available signals.
Two modes β same pattern as the utility scorer:
Simple mode (promotion_policy) β receives pre-computed scalars. Good for straightforward rules.
Full context mode (full_promotion_policy) β receives the complete promotion context dict including raw shadow score rows, std dev of the delta distribution, regression results, and the full config objects. Use this for anything statistical.
Full context dict β every key explained
| Key | Type | What it is |
|---|---|---|
specialist | str | Specialist name |
blue_u / green_u | float | Mean U for BLUE (current production) and GREEN (candidate) |
u_delta | float | green_u - blue_u |
mean_delta | float | Mean U_delta across all shadow queries (real traffic) |
n_queries | int | Number of shadow/eval queries used |
min_queries | int | shadow_min_queries from config |
threshold | float | delta from blue_green config |
shadow_scores | list[dict] | Raw rows from ShadowStore β each has blue_u, green_u, u_delta, domain |
shadow_std_delta | float | Std dev of U_delta across shadow queries β 0.0 when n < 2 |
regression_result | dict or None | Regression gate output: regressed, delta_pass_rate, delta_u_score |
dry | bool | True when no green_endpoint was available (dry-run scores only) |
source | str | "shadow (N queries)" | "synthetic eval" | "dry-run" |
bg_config | BlueGreenFieldConfig | Full blue_green config for this specialist |
class CIGatePromoter:
"""Promote only when mean_delta > 2 standard deviations β statistically significant."""
def should_promote(self, specialist, blue_mean_u, green_mean_u, n_queries, metadata):
return True # fallback if should_promote_full raises
def should_promote_full(self, context: dict) -> bool:
std = context["shadow_std_delta"]
mean = context["mean_delta"]
if std == 0:
return mean > 0
return mean > 2 * std
class AdaptiveThresholdPromoter:
"""Require a larger delta when we have fewer queries β conservative early, liberal later."""
def should_promote(self, *a, **kw): return False
def should_promote_full(self, context: dict) -> bool:
n = context["n_queries"]
adaptive = context["threshold"] + 0.5 / max(n, 1)
return context["mean_delta"] >= adaptive
class MultiFactorGate:
"""All three conditions must pass: no regression, enough queries, positive delta."""
def should_promote(self, *a, **kw): return False
def should_promote_full(self, context: dict) -> bool:
if context.get("regression_result") and context["regression_result"].get("regressed"):
return False
if context["n_queries"] < context["min_queries"]:
return False
return context["mean_delta"] >= context["threshold"]
plugins:
full_promotion_policy:
import_path: plugins.promotion:MultiFactorGate
13.20 Multi-tenancy β isolated namespaces per tenant
AUA supports full per-tenant isolation: separate rate limits, field allowlists, model bindings, and namespaced writes across all four persistent tables (corrections, promotions, audit_log, model_runs each carry a tenant_id column). Isolation is enforced at the database level β one tenant's queries cannot interfere with another's corrections or promotion history.
middleware:
- import_path: aua.middleware:TenantPolicyMiddleware
config:
reject_unknown: true # 403 for any X-Tenant-ID not listed below
tenants:
tenant-a:
allowed_fields: [software_engineering, mathematics]
rate_limit_rpm: 60 # requests per minute for this tenant
model_binding: swe # force ALL queries to the swe specialist
tenant-b:
allowed_fields: [law, software_engineering]
rate_limit_rpm: 120
model_binding: null # normal routing for tenant-b
Clients pass their tenant ID in the X-Tenant-ID HTTP header. The middleware runs before any routing β the tenant's allowlist and rate limit are enforced before the query reaches the field classifier.
from aua.tenancy import get_tenant_id
class TenantScoringComponent:
def compute(self, component: str, value: float, field: str, metadata: dict) -> float:
tenant = get_tenant_id() # None for anonymous / no middleware
if tenant == "tenant-premium" and component == "curiosity":
return min(1.0, value * 1.3) # premium tenants get +30% curiosity
return value
# Only tenant-a's corrections rows = store.query("corrections", filters={"tenant_id": "tenant-a"}) # All anonymous queries (no tenant header) rows = store.query("model_runs", filters={"tenant_id": None})
How-to 14 Hooks & middleware ~25 min
Hooks fire at 11 named points in the request pipeline. Each hook receives an event dict and returns a (possibly modified) event dict. Register one hook class per point, or one class for multiple points.
All 11 hook points are live in v1.1. Hooks are fail-open by default β if a hook errors or times out, the pipeline continues. Set fail_closed: true in YAML to abort the request on hook failure.
14.1 The minimal hook
class MyHook:
async def __call__(self, event: dict) -> dict:
# event always has: "type" (hook point name), "session_id", "trace_id"
# plus hook-specific fields documented below
print(f"Hook fired: {event['type']}")
return event # always return the event dict (modified or unchanged)
hooks: # a LIST β one entry per registration - hook_point: on_correction import_path: plugins.my_hook:MyHook fail_closed: false # fail-open: log error, continue (default) config: {} # optional β splatted as constructor kwargs
14.2 All 11 hook points β event fields
Each hook receives a specific event dict. Here are the fields available at each point, shown as the dict your __call__ method receives:
pre_query β fires before field classification
Use for: PII scrubbing, request logging, query transformation, rate limiting per session.
event = {
"type": "pre_query", # always present
"session_id": "s_abc123", # current session
"trace_id": "01HX...", # W3C trace ID
"query": "Write binary search.",
"conversation_history": [...], # list of prior messages
"force_domain": None, # str if routing forced, else None
}
# Modify event["query"] to transform the query before routing
post_route β fires after routing decision, before specialist calls
Use for: logging routing decisions, overriding the routing mode, alerting on unexpected domains.
event = {
"type": "post_route",
"session_id": "s_abc123",
"trace_id": "01HX...",
"query": "Write binary search.",
"domain_distribution": {"software_engineering": 0.9, "mathematics": 0.1},
"top_domain": "software_engineering",
"routing_mode": "single", # "single" | "fanout" | "arbiter"
"active_specialists": ["swe"], # specialists involved
}
pre_specialist_call β fires before each specialist API call
Use for: per-specialist logging, injecting prompt context, circuit breakers. Fires once per specialist in fanout mode.
event = {
"type": "pre_specialist_call",
"session_id": "s_abc123",
"trace_id": "01HX...",
"query": "Write binary search.",
"domain": "software_engineering",
"specialist": "swe", # specialist name from config
"model": "qwen2.5-coder:7b", # model being called
"endpoint": "http://localhost:11434",
}
post_specialist_call β fires after each specialist returns
Use for: response logging, latency tracking per specialist, content filtering before scoring. Fires once per specialist in fanout mode.
event = {
"type": "post_specialist_call",
"session_id": "s_abc123",
"trace_id": "01HX...",
"domain": "software_engineering",
"specialist": "swe",
"response_preview": "Here is a binary search implementation...", # first 200 chars
"confidence": 0.823, # base confidence before Kalman
}
pre_arbiter β fires before arbiter receives fanout responses
Use for: logging both specialist responses before arbitration, injecting human review, recording disagreements.
event = {
"type": "pre_arbiter",
"session_id": "s_abc123",
"trace_id": "01HX...",
"query": "What is heapsort complexity?",
"specialist_a": "swe",
"response_a": "O(n log n) worst-case...", # first 200 chars
"specialist_b": "math",
"response_b": "O(n^2) in the worst case...", # first 200 chars
}
post_arbiter β fires after arbiter verdict issued
Use for: alerting on case_4 (both wrong), logging verdicts, triggering human review on contradictions.
event = {
"type": "post_arbiter",
"session_id": "s_abc123",
"trace_id": "01HX...",
"verdict": "case_2: specialist A is correct...", # first 200 chars
"winner_field": "software_engineering", # or "both_wrong"
"specialist_a": "swe",
"specialist_b": "math",
}
# Alert on contradictions:
# if event["winner_field"] == "both_wrong": β trigger review
on_correction β fires when a correction is stored
Use for: Slack/webhook notifications when the system learns something new, audit logging, syncing to external knowledge bases. Fires in background β non-blocking.
event = {
"type": "on_correction",
"session_id": "", # may be empty for manual corrections
"trace_id": "",
"subject": "heapsort_complexity",
"domain": "software_engineering",
"claim": "Heapsort is O(n log n) worst-case, O(1) extra space.",
"confidence": 0.99,
"decay_class": "A", # A=permanent, B=slow, C=moderate, D=fast
"source": "manual", # "manual" | "arbiter" | "cross_session"
}
import httpx
class SlackNotificationHook:
def __init__(self, webhook_url: str):
self.webhook_url = webhook_url
async def __call__(self, event: dict) -> dict:
if event["type"] == "on_correction":
msg = (f"*New correction stored*\n"
f"Domain: {event['domain']}\n"
f"Claim: {event['claim']}\n"
f"Confidence: {event['confidence']}")
async with httpx.AsyncClient() as client:
await client.post(self.webhook_url, json={"text": msg}, timeout=3.0)
return event
pre_response β fires before response is sent to client
Use for: response transformation, content filtering, adding metadata, modifying the response text before it reaches the user. Modify event["response"] to change what the user sees.
event = {
"type": "pre_response",
"session_id": "s_abc123",
"trace_id": "01HX...",
"domain": "software_engineering",
"routing_mode": "single",
"u_score": 0.731,
"confidence": 0.823,
"latency_ms": 312.4,
"response": "Here is the implementation...", # modify this to change response
}
# Return modified event to change the response the user receives:
# event["response"] = event["response"] + "\n\n[Disclaimer: ...]"
post_response β fires after response is sent (background, non-blocking)
Use for: async analytics, usage tracking, long-running post-processing. Never blocks the response. Return value is ignored.
event = {
"type": "post_response",
"session_id": "s_abc123",
"trace_id": "01HX...",
"domain": "software_engineering",
"routing_mode": "single",
"u_score": 0.731,
"latency_ms": 312.4,
"gold_standard": True, # True if all INFO assertions fired
}
on_promotion β fires when GREEN promotes to BLUE (background)
Use for: deployment notifications, updating dashboards, triggering downstream systems.
event = {
"type": "on_promotion",
"session_id": "",
"trace_id": "",
"specialist": "swe",
"promoted_from": "qwen2.5-coder:7b",
"promoted_to": "qwen2.5-coder:14b-finetuned",
"project_dir": "/home/user/my-aua-project",
}
on_rollback β fires when a rollback completes (background)
Use for: incident alerts, updating dashboards, triggering post-rollback diagnostics.
event = {
"type": "on_rollback",
"session_id": "",
"trace_id": "",
"specialist": "swe",
"rolled_back_from": "qwen2.5-coder:14b-finetuned",
"rolled_back_to": "qwen2.5-coder:7b",
"project_dir": "/home/user/my-aua-project",
}
14.3 One hook for multiple points
class AnalyticsHook:
"""Single hook that handles multiple pipeline points."""
async def __call__(self, event: dict) -> dict:
hook_type = event["type"]
if hook_type == "pre_query":
self._log_query(event["session_id"], event["query"])
elif hook_type == "post_response":
self._record_latency(event["domain"], event["latency_ms"], event["u_score"])
elif hook_type == "on_correction":
self._notify_team(event["domain"], event["claim"])
elif hook_type == "on_promotion":
self._update_dashboard(event["specialist"], event["promoted_to"])
return event # always return event
def _log_query(self, session_id, query): ...
def _record_latency(self, domain, ms, u): ...
def _notify_team(self, domain, claim): ...
def _update_dashboard(self, specialist, model): ...
hooks:
- hook_point: pre_query
import_path: plugins.analytics_hook:AnalyticsHook
- hook_point: post_response
import_path: plugins.analytics_hook:AnalyticsHook
- hook_point: on_correction
import_path: plugins.analytics_hook:AnalyticsHook
fail_closed: false
- hook_point: on_promotion
import_path: plugins.analytics_hook:AnalyticsHook
How-to 15 Security ~25 min
15.1 Bearer tokens and scopes
AUA uses HMAC-SHA256 bearer tokens with 15 fine-grained scopes. Auth is off by default (for local development); enable it in production with one config line:
security: auth_enabled: true # false by default β open in dev token_secret_env: AUA_TOKEN_SECRET # env var holding your signing secret token_expiry_days: 30 # Generate a signing secret: # python3 -c "import secrets; print(secrets.token_hex(32))" # export AUA_TOKEN_SECRET=<output>
When auth_enabled: false (the default), AUA logs a WARNING on startup and all endpoints are open. Never run a public deployment with auth_enabled: false.
Create and manage tokens with the CLI:
# Create a query-only token expiring in 30 days aua token create --scope aua:query --expires 30d # Create an admin token aua token create --scope aua:admin --expires 7d # List all tokens aua token list # Revoke a token aua token revoke <token-id>
| Scope | Grants access to |
|---|---|
aua:query | POST /query, POST /sessions/{id}/messages |
aua:stream | POST /sessions/{id}/stream |
aua:status | GET /status, GET /version, GET /health |
aua:config:read | GET /config (secrets redacted) |
aua:config:write | POST /config/reload |
aua:corrections:write | POST /corrections |
aua:deploy | POST /deploy/green |
aua:rollback | POST /deploy/rollback |
aua:extensions:write | POST /extensions, POST /extensions/reload |
aua:admin | All scopes |
export AUA_TOKEN="aua_tk_..."
curl -X POST http://localhost:8000/query \
-H "Authorization: Bearer $AUA_TOKEN" \
-H "Content-Type: application/json" \
-d '{"query": "Explain quicksort", "session_id": "s1"}'
15.2 mTLS β encrypted communication
AUA supports TLS (server-side) and mutual TLS (both sides). Configure cert paths in security.mtls:
security:
mtls:
key_file: certs/server.key # or keyfile:
cert_file: certs/server.crt # or certfile:
security:
mtls:
key_file: certs/server.key
cert_file: certs/server.crt
ca_file: certs/ca.crt # presence triggers CERT_REQUIRED
# Generate self-signed dev certs aua certs generate # Inspect cert details aua certs inspect
When key_file and cert_file are set, AUA passes them directly to uvicorn's SSL arguments. If ca_file is also set, client certificates are required on every connection (mutual TLS). AUA logs which certs are active at startup.
15.3 Secrets management
AUA never stores plaintext secrets in config. Instead, config references a secret name, and the secrets provider resolves it at startup:
secrets: provider: env # "env" (default) | "vault" | "aws" | "gcp" specialists: - name: swe api_key_secret: SWE_API_KEY # reads env var SWE_API_KEY
secrets:
provider: vault
url: https://vault.internal:8200
token_env: VAULT_TOKEN # env var holding the Vault token
secrets: provider: aws region: us-east-1
Resolution order is always environment variable first, then the configured provider; values are cached for the process lifetime and never appear in logs, repr(), or GET /config. Vault secrets are read from the KV v2 path matching the secret name, taking the value key (or a key named after the secret). Both providers are covered by live integration tests in CI β a wire-faithful Vault KV v2 server driven by the real hvac client, and moto intercepting the real boto3 client (tests/test_secrets_live.py).
15.4 Encryption at rest
Correction payloads, assertions, DPO pairs, token metadata, and sensitive audit fields are encrypted at rest with AES-256-GCM:
security:
encryption:
enabled: true
key_secret: AUA_ENCRYPTION_KEY # 64-char hex key β see Β§14.3 for generation
15.5 Audit log
The audit log is append-only with a tamper-evident SHA-256 hash chain. Every security-relevant event is recorded:
# View recent audit events (written to .aua/audit.log) tail -f .aua/audit.log # Export corrections (machine-readable audit trail) aua corrections export --format jsonl
15.6 Session, trace & request IDs (#15)
Every HTTP request gets three IDs: session_id (logical conversation β persistent across queries), trace_id (one per request, W3C-compatible 48-hex format, used by OTEL/audit/logs), and request_id (one per HTTP request, never reused). Client-supplied IDs are honored via the X-Session-ID, X-Trace-ID (or traceparent), and X-Request-ID headers; UUIDs are generated otherwise. All three come back as headers on every response, and /query additionally echoes them in the body.
# No session_id supplied β a UUID is generated and echoed back. # Adopt it for the rest of the conversation: curl -si -X POST localhost:8000/query -H "Content-Type: application/json" \ -d '{"query": "Write binary search in Python."}' | grep -iE "x-(session|trace|request)-id" # Pin your own session across requests (body session_id wins over the header) curl -s -X POST localhost:8000/query -H "Content-Type: application/json" \ -d '{"query": "Now make it iterative.", "session_id": "user_42"}' \ | jq '{session_id, trace_id, request_id}'
The context propagates everywhere: downstream specialist and arbiter calls carry the three headers, every hook payload includes them, the audit log records them per event, and structured logs attach them automatically β so one trace_id stitches a query across the router, the winning specialist, the arbiter verdict, and the audit chain.
Production checklist: aua doctor --strict validates that if cors_origins is * and the host is 0.0.0.0 and auth is disabled, a loud warning is emitted. Use the Team Server or Enterprise deployment profile to enforce auth + mTLS requirements.
How-to 16 Observability ~25 min
AUA emits three observability streams out of the box: structured JSON logs (every query, assertion, and error), Prometheus metrics (18 gauges/counters/histograms), and optional OpenTelemetry distributed traces. All three are designed to ship directly to ELK, Splunk, Grafana, or any OTEL-compatible backend β no code changes required.
16.1 Structured JSON logging
Every log line the framework emits is a single-line JSON object. Every line automatically includes the current request's session_id, trace_id, and request_id β so a Kibana or Splunk search on a session ID returns the complete picture of everything that happened in that request.
logging: level: INFO # DEBUG | INFO | WARNING | ERROR format: json # "json" (default) | "text" (human-readable dev mode) output: stdout # "stdout" | "stderr" | "/var/log/aua/router.log"
{"ts":1747000000.12,"level":"INFO","logger":"aua.router","msg":"singleβsoftware_engineering U=0.731","session_id":"s_abc123","trace_id":"01HX...","field":"software_engineering","routing_mode":"single","latency_ms":312.4,"utility_score":0.731,"confidence":0.823}
{"ts":1747000000.43,"level":"INFO","logger":"aua.router","msg":"Query routed","session_id":"s_abc123","trace_id":"01HX...","domain":"software_engineering","u_score":0.731,"latency_ms":315.1}
Fields included in every structured log line:
| Field | Description |
|---|---|
ts | Unix timestamp (float) |
level | DEBUG / INFO / WARNING / ERROR |
logger | Module name (aua.router, aua.arbiter, aua.auth, ...) |
session_id | Chat session identifier β auto-injected from request context |
trace_id | W3C-compatible trace ID β links to OTEL spans if enabled |
request_id | Per-request unique ID |
field | Routed domain (software_engineering, mathematics, ...) |
specialist | Specialist name that handled the query |
routing_mode | single / fanout / arbiter |
utility_score | Final U score for this response |
confidence | Kalman-filtered confidence estimate |
latency_ms | End-to-end latency in milliseconds |
error_code | HTTP status on errors |
verdict | Arbiter verdict case (A/B/C/D) when arbiter fires |
16.2 Shipping logs to ELK (Elasticsearch / Kibana)
AUA's JSON output is Filebeat-native. No parsing config needed β all fields are already top-level JSON keys that become indexed Elasticsearch fields automatically.
logging:
format: json
output: /var/log/aua/router.log # Filebeat monitors this path
filebeat.inputs:
- type: log
paths: ["/var/log/aua/router.log"]
json.keys_under_root: true # promote JSON fields to top-level
json.add_error_key: true
processors:
- timestamp:
field: ts
layouts: ["UNIX"]
target_field: "@timestamp"
output.elasticsearch:
hosts: ["https://your-elastic:9200"]
index: "aua-logs-%{+yyyy.MM.dd}"
api_key: "your-api-key"
# All failed assertions in the last 24h logger: "aua.router" AND level: "WARNING" AND msg: "assertion" # Low U-score sessions (worth reviewing) utility_score < 0.4 # All events for a specific session session_id: "s_abc123" # High latency queries latency_ms > 5000 # Authentication failures logger: "aua.auth" AND level: "WARNING" # Arbiter fired routing_mode: "arbiter" AND verdict: *
Logstash pipeline alternative. If you're using Logstash instead of Filebeat, pipe aua serve stdout directly: aua serve 2>&1 | logstash -f aua.conf. In the pipeline filter: json { source => "message" } then date { match => ["ts", "UNIX"] }. The JSON structure needs no grok patterns.
16.3 Shipping logs to Splunk
Two options depending on your Splunk setup:
Option A β Universal Forwarder (file-based)
[monitor:///var/log/aua/router.log] index = aua sourcetype = aua_json
[aua_json] KV_MODE = json TIME_FORMAT = %s%3N TIME_PREFIX = "ts": MAX_TIMESTAMP_LOOKAHEAD = 20
Option B β HTTP Event Collector (HEC, no file needed)
pip install splunk-handler
from splunk_handler import SplunkHandler
import logging
logging.getLogger("aua").addHandler(
SplunkHandler(
host="splunk.yourcompany.com",
port=8088,
token="your-hec-token",
index="aua",
sourcetype="aua_json",
)
)
# Failed assertions over time index=aua sourcetype=aua_json logger="aua.router" "assertion" | timechart count by assertion_name # U-score trend per domain index=aua sourcetype=aua_json utility_score=* | timechart avg(utility_score) by field # P95 latency by routing mode index=aua sourcetype=aua_json latency_ms=* | stats perc95(latency_ms) by routing_mode
16.4 Prometheus metrics
curl http://localhost:8000/metrics | head -30
| Metric | Type | What it measures |
|---|---|---|
aua_queries_total | Counter | Total queries by field, routing mode, status |
aua_query_latency_seconds | Histogram | Latency (p50/p95/p99) |
aua_utility_score | Gauge | Last U score per domain |
aua_contradiction_rate | Gauge | Contradiction rate per domain |
aua_routing_field_distribution | Counter | Query distribution across fields |
aua_specialist_confidence | Gauge | Confidence per specialist |
aua_correction_count | Counter | Corrections accumulated |
aua_arbiter_verdict_distribution | Counter | Case 1/2/3/4 breakdown |
aua_dpo_pairs_accumulated | Gauge | Total DPO pairs in store |
aua_token_requests_total | Counter | Token-gated requests by scope |
aua_hook_failures_total | Counter | Hook execution failures |
aua_plugin_execution_seconds | Histogram | Plugin latency |
aua_specialist_vram_utilization | Gauge | GPU VRAM % per specialist |
aua_cost_gpu_hours_total | Counter | Cumulative GPU hours per specialist |
aua_cost_usd_total | Counter | Cumulative USD cost per specialist |
aua_assertion_results_total | Counter | Assertion pass/fail by name, level, domain |
aua_assertion_retries_total | Counter | BLOCKING assertion retry count |
aua_assertion_bonus_applied | Histogram | E-score bonus applied by INFO assertions |
16.5 Cost tracking
curl http://localhost:8000/metrics/cost | python3 -m json.tool
{
"swe": {"queries": 42, "gpu_hours": 0.012, "cost_usd": 0.0083},
"math": {"queries": 18, "gpu_hours": 0.005, "cost_usd": 0.0034},
"total_cost_usd": 0.0117
}
16.6 Grafana dashboard
docker compose --profile obs up
# Grafana at http://localhost:3000 (admin / aua-admin)
# Dashboard pre-loaded: 20 panels covering query volume, latency p50/p95/p99,
# routing distribution, U score trends, contradiction rate, arbiter verdicts,
# specialist health, VRAM usage, blue-green split, assertion fail rate,
# DPO pairs accumulated, auth failures, cost per specialist
16.7 OpenTelemetry β distributed traces
Optional. Sends full request traces to Jaeger, Tempo, Elastic APM, Splunk Observability, or any OTLP-compatible backend. Each trace covers the complete request path: router β classifier β routing decision β specialist calls β utility scoring β arbiter β hooks β policy assertions β response.
pip install "adaptive-utility-agent[otel]"
observability:
otel:
enabled: true
endpoint: http://localhost:4317 # OTLP gRPC collector
service_name: aua-router
observability:
otel:
enabled: true
endpoint: https://ingest.us1.signalfx.com:443
service_name: aua-router
headers:
X-SF-Token: "your-splunk-o11y-token"
Log + trace correlation. The trace_id in every JSON log line is W3C-compatible. When OTEL is enabled, clicking a log line in Kibana or Splunk and following its trace_id jumps directly to the corresponding distributed trace in Jaeger or Elastic APM β showing the exact specialist calls, latencies, and assertion checks for that request.
16.8 Structured logging in Docker / Kubernetes
services:
aua-router:
logging:
driver: json-file # Docker captures stdout as JSON
labels:
logging: "aua"
alloy:
image: grafana/alloy
volumes:
- /var/lib/docker/containers:/var/lib/docker/containers:ro
# Alloy β Loki β Grafana: full log + metric correlation
# fluent-bit ConfigMap snippet
[INPUT]
Name tail
Path /var/log/containers/aua-*.log
Parser json
Tag aua.*
[OUTPUT]
Name es
Match aua.*
Host elasticsearch.logging.svc
Index aua-logs
Type _doc
How-to 17 Docker deployment ~20 min
17.1 Docker Compose profiles
All examples use the modern docker compose (V2) command. If your system only has the legacy binary, replace with docker-compose.
# CPU / Ollama docker compose up # GPU / vLLM (requires NVIDIA runtime) docker compose --profile gpu up # + Prometheus and Grafana docker compose --profile obs up # Full local stack (Ollama + observability) docker compose --profile ollama --profile obs up # GPU (Linux + NVIDIA) β uses separate compose file docker compose -f docker compose.gpu.yml up
17.2 Deployment profiles
| Profile | Auth | mTLS | State | Use for |
|---|---|---|---|---|
| Local Developer | Optional | No | SQLite | localhost only |
| Single GPU Workstation | Recommended | No | SQLite | One-machine GPU server |
| Team Server | Required | Required | Postgres | Shared team deployment |
| Enterprise | IAM + scopes | Required | Postgres | Custom backends, strict audit |
17.3 Environment configuration
Generate your encryption key before deploying β it must be a 32-byte value encoded as 64 hex characters. Run either command once and store the output:
# Option 1 β Python (no extra dependencies) python3 -c "import os; print(os.urandom(32).hex())" # Option 2 β OpenSSL openssl rand -hex 32 # Either prints a 64-character hex string, e.g.: # a3f2c1e8b7d4509261af3e2c84b19d07f6a5c3e1b8294d6072f1e3a5c8b2d490
Keep this value secret and never commit it to version control. Rotate it by generating a new key, re-encrypting state, and restarting. Encryption uses AES-256-GCM; the key is loaded at startup from the named environment variable.
AUA_ENCRYPTION_KEY=<64-char hex string from above> AUA_ADMIN_TOKEN=aua_tk_... SWE_API_KEY=... POSTGRES_URL=postgresql://aua:password@db:5432/aua_state
security:
mtls: {enabled: true, cert_dir: /certs, auto_generate: false}
encryption: {enabled: true, key_secret: AUA_ENCRYPTION_KEY}
cors_origins: ["https://your-domain.com"]
state:
backend: postgres
url_secret: POSTGRES_URL
audit:
enabled: true
hash_chain: true
How-to 18 Persistence, search & operations toolkit ~70 min
Everything in this part was battle-tested in AUA-Veritas β a macOS desktop assistant built on this framework β and backported in v1.1. It turns the router from a stateless query engine into a product backend: durable conversations, full-text search, automatic context handoffs, a correction lifecycle, analytics, and self-maintenance jobs. All of it ships in the default aua serve β nothing extra to enable.
18.1 Conversations, messages & projects
The state store (SQLite by default, .aua/state/aua.db) persists conversations and messages independently of the chat-session API. Projects group conversations; project_id=NULL means "All chats".
# Create a project, then a conversation inside it PROJECT=$(curl -s -X POST localhost:8000/projects -H "Content-Type: application/json" \ -d '{"name": "Q3 Research"}' | jq -r .project_id) CONV=$(curl -s -X POST localhost:8000/conversations -H "Content-Type: application/json" \ -d "{\"title\": \"Vector DB eval\", \"project_id\": \"$PROJECT\"}" | jq -r .conversation_id) # Append messages (role: user | assistant) curl -s -X POST localhost:8000/conversations/$CONV/messages \ -H "Content-Type: application/json" \ -d '{"role": "user", "content": "Compare pgvector and Qdrant for 10M embeddings"}' # Paginated reads β newest page by default, timestamp cursors for history curl -s "localhost:8000/conversations/$CONV/messages?limit=50" curl -s "localhost:8000/conversations/$CONV/messages?before=1765432100.5&limit=50" # Filter the sidebar to one project; omit project_id for all chats curl -s "localhost:8000/conversations?project_id=$PROJECT"
Cache rule (learned in production): first-page reads at the default limit≥50 are served from a true-LRU MessageCache. Any non-default limit bypasses the cache and hits the DB with the actual limit β otherwise ?limit=1 would silently return the whole cached page and break pagination.
18.2 Full-text keyword search
Every message write is keyword-indexed by a background worker (50 ms batches, never on the response path) into an in-memory inverted index with O(log n) prefix matching. Search is message-level β the Cmd+F model: one result per matching message, with AND semantics for multi-word queries. On restart, a startup backfill re-indexes anything the worker didn't flush.
curl -s "localhost:8000/search?q=pgvector+embeddings" | jq
[
{
"conversation_id": "7f3aβ¦",
"title": "Vector DB eval",
"message_id": "c91dβ¦",
"match_message_id": "c91dβ¦",
"match_message_ts": 1765432101.7
}
]
Extraction is pure Python (~8 Β΅s/message, no spaCy): stopword filtering, CamelCase/snake_case identifiers, years and multi-digit numbers β so fire_and_forget, MessageCache, and 2026 are all searchable. Prefix search means kuber finds kubernetes.
18.3 Context backups β automatic session handoffs
Long conversations exhaust context windows. AUA keeps a per-(specialist, conversation) token counter and, when a trigger fires, asks the specialist to write a structured handoff note that a fresh window can resume from. Triggers: token_threshold (70% of the context window β bumps the thread number), message_count (every 30 messages), and time_gap (returning after 24h+ with ≥5 messages).
The backup prompt forces six sections β GOAL Β· DECISIONS MADE Β· CURRENT STATUS Β· ACTIVE FILE / CODE CONTEXT Β· USER PREFERENCES LEARNED Β· RESUME INSTRUCTION β capped at 900 tokens. Two production rules are baked in: the generator always reads the last 60 messages from the database, never the slice the client happened to send; and a 6-hour coverage job sweeps for any conversation whose newest backup is older than its newest message (backup valid β MAX(backup.created_at) > MAX(messages.created_at)), regenerating stale ones at 1/s.
# Which conversations need a backup for this specialist? curl -s "localhost:8000/context/backup/coverage?specialist=swe" | jq # Don't wait 6 hours β sweep now (runs in the background) curl -s -X POST "localhost:8000/context/backup/run-coverage-job?specialist=swe"
18.4 The correction lifecycle
Part 4 covered arbiter-driven corrections. v1.1 adds the full user-facing lifecycle.
Explicit prefix. A message starting with correction: is a preference statement and is stored immediately β even as the first message of a conversation, with no prior AI turn. (In Veritas, an early-return guard silently discarded these for months.)
SESSION=$(curl -s -X POST localhost:8000/sessions -H "Content-Type: application/json" -d '{}' | jq -r .id)
curl -s -X POST localhost:8000/sessions/$SESSION/messages \
-H "Content-Type: application/json" \
-d '{"content": "correction: always use ISO-8601 dates"}' | jq .correction_stored
# β a correction_id β stored at confidence 0.85, source=explicit_prefix
Implicit detection. A two-layer trigger detector (regex layer ships with the framework; plug a classifier into TriggerDetector(layer2=β¦) for semantic catches) flags replies like "no, that's wrong β use UTC" after an AI turn. Instead of asking the user to re-type their intent, the response carries implicit_correction_pending: true and your UI shows Accept / Reject:
curl -s -X POST localhost:8000/corrections/confirm-implicit \
-H "Content-Type: application/json" \
-d "{\"conversation_id\": \"$SESSION\", \"action\": \"accept\"}"
# accept β stored (confidence 0.75, source=implicit_confirmed); reject β discarded
CRUD + evidence. Every stored correction has a persistent ID and an append-only event history (created β applied β edited β superseded):
CID=$(curl -s -X POST localhost:8000/corrections -H "Content-Type: application/json" \
-d '{"subject":"timezones","domain":"general","claim":"Store timestamps in UTC","confidence":0.9}' \
| jq -r .correction_id)
curl -s -X PATCH localhost:8000/corrections/$CID \
-H "Content-Type: application/json" -d '{"claim": "Store and display in UTC"}'
curl -s "localhost:8000/corrections/evidence?correction_id=$CID" | jq
# Soft delete β sets scope='superseded'; the row stays for audit,
# but is excluded from retrieval and prompt injection
curl -s -X DELETE localhost:8000/corrections/$CID
Reviewer findings surfaced. When fanout routing runs the arbiter, its REASON:/CORRECTION: sections are no longer discarded after the verdict β they come back on the response as review_notes so the client can show why an answer won.
18.5 Analytics, reliability, usage & pricing
Four read-only endpoints power a "look under the hood" dashboard, all computed from model_runs:
| Endpoint | Returns |
|---|---|
GET /analytics | Per-specialist run/win stats, confidence distribution (high ≥ 0.75 / medium ≥ 0.50 / uncertain), active-correction stats by domain, domain distribution, VCG welfare summary. |
GET /reliability | Per-specialist win rate plus the last-20-run welfare trajectory and trend (up/down/flat) β sparkline-ready. |
GET /usage | Query counts and estimated cost per specialist. |
GET /pricing | Per-specialist token pricing from the live model registry (self-hosted models cost 0). |
18.6 Self-maintenance: updates, crashes, bug reports, remote config
Update management. GET /version/check compares the running version against the latest GitHub release (graceful when offline). POST /update/skip {"version": "1.2.0"} persists a skipped version so the banner stays hidden; show_banner in the check response already accounts for it.
Crash reporting. Startup writes a running sentinel; clean shutdown marks it clean. A sentinel still running at the next startup means the previous session crashed β it's reported automatically (and queued runtime errors from pending_error_reports are flushed). Detection runs before the new sentinel is written, so a session never reports itself.
Bug reports. POST /bug-report assembles a structured report (platform fingerprint, log tails, opt-in last messages) and pushes it to a GitHub repo via the Contents API. Configure with AUA_BUGS_REPO (owner/repo) and a write-only AUA_BUGS_PAT. Without a PAT it returns 200 {"ok": false} β bug reporting is never itself a source of 500s.
Remote model config. Model aliases, pricing, and context windows refresh from a remote models.json (default: this site; override with AUA_REMOTE_MODELS_URL) at startup and every 24h, with a three-level fallback: remote β DB cache (kept 7 days) β built-in registry. Remote may update display names, costs, windows, and add aliases for known providers β never backend or provider, which map to code. A model_id_renames map lets retired upstream IDs hot-swap without a release.
18.7 Local model management
Register Ollama-class local models, tag them as specialists for domain nodes, and persist UI settings:
curl -s -X POST localhost:8000/local/models -H "Content-Type: application/json" \
-d '{"local_model_id": "qwen3:8b", "nickname": "Qwen 8B", "base_url": "http://localhost:11434"}'
curl -s -X PATCH localhost:8000/local/specialist/qwen3:8b \
-H "Content-Type: application/json" \
-d '{"specialist_domain": "software_engineering", "specialist_depth": 1}'
curl -s localhost:8000/local/models | jq
curl -s -X POST localhost:8000/local/settings -H "Content-Type: application/json" \
-d '{"auto_discover": true}'
18.8 The dynamic domain ontology
Field classification starts from 10 fixed L0 roots (software_engineering, mathematics, research, law, medicine, finance, writing, analysis, history, general) and grows from specialist self-reports. A raw domain string resolves in two stages β O(1) alias map, then Levenshtein similarity. Above 0.80 similarity it becomes an alias; below, it enters a candidate queue.
An hourly ontology job promotes a candidate to a real node only when all four gates pass: volume (≥5 distinct queries), diversity (≥2 distinct specialists reported it), coverage (still unresolved by the alias map), and divergence (mean per-specialist win-rate difference vs. the nearest node exceeds Ξ΄(d) = 0.10 + 0.05Β·depth). Low-evidence candidates are pruned after 30 days.
curl -s localhost:8000/domain-tree | jq '{nodes: .nodes | length, candidates: .candidates[:3]}'
Why this matters for routing: each promoted node gets its own effective-utility cell, so "compiler design" queries can route to a different winner than generic "software engineering" once the evidence says the models genuinely diverge there.
The sections below cover operational features added in v1.2.
This How-to covers every operational feature added in v1.2: aua test, aua loadtest, batch inference, model registry and version pinning, shadow mode, regression gating, experiment tracking, the empirical arbiter check, and hardware deployment tiers. These features are independent β read the sections that apply to your current task.
18.9 aua test β built-in integration test suite
aua test runs fixture datasets against a live router and reports pass/fail per case. It is the fastest way to verify routing, utility scoring, and specialist responses are working after a deployment, config change, or model swap.
aua test # smoke β 6 cases, under 60 seconds (default) aua test --suite full # full β 15 cases, 3-10 minutes aua test --suite routing # routing β 9 cases, all with expected_domain checks
smoke: Run after every deployment. Checks basic routing and non-empty responses for software_engineering, mathematics, and general. Designed to finish in under 60 seconds on any hardware.
full: Run before promoting a GREEN candidate. Covers edge cases, routing boundary queries (queries that could belong to two domains), and refusal detection (confirms the router does not say "I cannot help").
routing: Run after changing field weights, swapping the classifier, or registering a routing_strategy plugin. Every case in this suite has an expected_domain property β a failure means the classifier changed behavior on that query type.
aua test --url http://prod:8000 # target a non-local router aua test --case swe_binary_search # run one specific case (repeatable) aua test --dataset my_cases.yaml # custom fixture file (same format as built-ins) aua test --suite full --json --output report.json aua test --no-liveness # skip the pre-flight GET /health/live check
name: my_coding_suite
description: "Regression suite for the swe specialist after model update"
cases:
- id: sort_complexity
prompt: "Write bubble sort in Python and state its O complexity."
expected_properties:
- contains: "def "
- contains_any: ["O(n^2)", "n squared", "n*n", "quadratic"]
- min_length: 100
- expected_domain: software_engineering
- not_contains: "I cannot"
- not_contains: "I'm unable"
- id: math_domain_check
prompt: "Solve the quadratic equation x^2 - 5x + 6 = 0."
expected_properties:
- contains_any: ["x = 2", "x = 3", "roots are"]
- min_length: 20
- expected_domain: mathematics
Property types you can use: contains (substring, case-insensitive), contains_any (list of substrings β at least one must match), not_contains, min_length (character count), expected_domain (exact match), expected_domain_any (list β any match).
Exit code 0 on all pass, exit code 1 on any failure. Use --json to get machine-readable output for CI pipelines.
18.10 aua loadtest β latency and throughput benchmarking
aua loadtest fires concurrent POST /query requests and reports the full latency distribution. Run it before going to production, after scaling hardware, or whenever you suspect a latency regression after a model or config change.
aua loadtest # 10 workers, 30 s, smoke query mix aua loadtest -c 20 -d 60 # 20 concurrent workers, 60 seconds aua loadtest --suite full -c 5 # full fixture suite as query mix aua loadtest --dataset my_queries.yaml # custom query mix from a fixture file aua loadtest --ramp 10 -c 20 -d 120 # ramp from 0 β 20 workers over 10 s, run 120 s aua loadtest --think-ms 200 -c 10 # 200 ms pause between requests per worker aua loadtest --json --output bench.json # machine-readable output for CI aua loadtest --url http://prod:8000 # target production router
β Router live at http://localhost:8000 Starting load test β workers=10 duration=30s queries=smoke aua loadtest 150 requests in 30.1s 148 ok / 2 errors (1.3% error rate) p50 p95 p99 mean min max RPS mean U 312ms 890ms 1.2s 425ms 98ms 1.4s 4.9 0.741 Routing: single: 148 β Load test passed (error rate 1.3% < 5%)
The test exits 0 when error rate < 5%, exits 1 otherwise β suitable for CI gates. Watch the mean U score: a drop in mean U without a matching latency drop usually means routing quality degraded (e.g. more queries hitting the arbiter fallback instead of a confident single specialist).
-c / --concurrency INT simultaneous in-flight requests (default: 10) -d / --duration FLOAT wall-clock test duration in seconds (default: 30) --ramp FLOAT seconds to linearly ramp up workers (default: 0) --think-ms FLOAT pause between requests per worker (default: 0, continuous) --timeout FLOAT per-request timeout in seconds (default: 60) --suite STR smoke | full | routing (default: smoke) --dataset / -d PATH custom YAML fixture file (overrides --suite) --json emit full JSON report to stdout --output / -o PATH save JSON report to file --no-liveness skip GET /health/live pre-flight check --url URL router URL (default: http://localhost:8000)
18.11 Persistent batch queue β /batch/jobs
The batch queue lets you submit hundreds of queries asynchronously and poll for results as they complete. Jobs survive server restarts (results live in SQLite), support three priority lanes, and expose partial results before a batch finishes.
# POST /batch/jobs β returns immediately curl -X POST http://localhost:8000/batch/jobs \ -H "Content-Type: application/json" \ -d '{ "queries": ["Write binary search.", "Implement merge sort.", "Explain quicksort."], "priority": "high", "max_parallel": 4 }' # β {"job_id": "b3c1a2...", "status": "pending", "n_queries": 3} # GET /batch/jobs/{id} β partial results available immediately as queries complete curl http://localhost:8000/batch/jobs/b3c1a2... # β {"status": "running", "n_done": 2, "n_pending": 1, # "results": [{"response": "def binary_search...", "u_score": 0.78}, ...]} # GET /batch/jobs β list recent jobs curl "http://localhost:8000/batch/jobs?status=done&limit=10"
Priority lanes: "high" dispatches before "normal" before "low". Within a lane, FIFO by submission time.
Restart recovery: When the server restarts, interrupted "running" jobs are automatically reset to "pending" and will be reprocessed. Completed results are never lost.
Partial results: You can poll at any time. Completed items are returned even while the batch is still processing. A status of "running" with n_pending: 0 means all items are done but the job record has not yet been marked "done" β poll once more.
18.12 Model registry and version pinning
AUA downloads models automatically when you run aua serve. For production deployments, you almost certainly want to pin to a specific revision so a model update on HuggingFace Hub doesn't silently change your system's behavior.
specialists:
- name: swe
model: Qwen/Qwen2.5-7B-Instruct # latest (dev/prototyping only)
model: Qwen/Qwen2.5-7B-Instruct@v0.3 # pin to a branch or tag
model: Qwen/Qwen2.5-7B-Instruct@sha256:abc # pin to exact commit (production)
model: models:/my-specialist/Production # MLflow model registry stage
model: models:/my-specialist/3 # MLflow specific version number
# For MLflow URIs, also add:
mlflow_tracking_uri: http://mlflow:5000
For gated models (Llama, Gemma), set HF_TOKEN in your environment and accept the license on HuggingFace. AUA checks disk space before downloading β it warns if less than 10 GB is free.
# List all branches and tags for a HuggingFace repo aua models pin Qwen/Qwen2.5-7B-Instruct # Get the exact config snippet to paste into aua_config.yaml aua models pin Qwen/Qwen2.5-7B-Instruct --revision v0.3 # Output: model: Qwen/Qwen2.5-7B-Instruct@v0.3 # List MLflow registered versions aua models pin swe --mlflow-uri models:/my-specialist --mlflow-tracking-uri http://mlflow:5000 # Skip downloads entirely (air-gapped or pre-cached setup) aua serve --no-download
Production pinning recommendation. Use @sha256:<commit> in production β it is immutable. Branch names like @main can be silently updated by the model author. aua models pin Qwen/Qwen2.5-7B --revision main will show you the current commit hash under "main" so you can hardcode it.
18.13 Experiment tracking β MLflow and W&B
AUA logs per-query metrics automatically after every response β no code changes needed. Add the experiment_tracking: block to your config and the router starts writing to MLflow or W&B.
experiment_tracking:
enabled: true
mlflow:
enabled: true
tracking_uri: http://localhost:5000 # or file:///absolute/path/to/mlruns
experiment_name: aua-production # created automatically if it doesn't exist
run_name: router-v1.2 # optional; auto-named if omitted
log_artifacts: false # set true to also log response text
wandb:
enabled: true
project: aua-framework
entity: my-team # optional; uses your W&B default entity
run_name: production-run # optional
tags: [production, v1.2]
Metrics logged per query: u_score, confidence, latency_ms, contradictions_detected, corrections_injected, dpo_pairs_generated.
Tags logged per query (for filtering in the UI): routing_mode, primary_domain, specialist, session_id, trace_id.
Both backends are lazy-loaded β if mlflow or wandb is not installed, a warning is logged and that backend silently disables. pip install mlflow or pip install wandb is all you need. Neither is a required dependency of AUA.
18.14 Shadow mode β real-traffic evaluation before promotion
Shadow mode lets a GREEN (candidate) model receive real production traffic silently. The user always gets BLUE's (production) response. GREEN's response is evaluated and the (blue_u, green_u) pair is written to the shadow_scores table. Once enough queries accumulate, you have real-traffic evidence for the promotion decision β not just a synthetic eval.
blue_green:
swe:
delta: 0.025
shadow_endpoint: http://localhost:9011/v1/chat/completions
shadow_min_queries: 50 # minimum before promotion is considered ready
# Activate shadow mode curl -X POST http://localhost:8000/deploy/shadow/swe \ -H "Content-Type: application/json" \ -d '{"green_endpoint": "http://localhost:9011/v1/chat/completions", "min_queries": 50, "threshold": 0.025}' # Check progress β poll as frequently as you like, no side effects curl http://localhost:8000/deploy/shadow/swe # β {"active": true, "n_queries": 23, "min_queries": 50, # "progress": "23/50 shadow queries", "mean_delta": 0.031, # "blue_mean_u": 0.741, "green_mean_u": 0.772, "ready_to_promote": false} # Deactivate (optionally clear accumulated scores) curl -X DELETE "http://localhost:8000/deploy/shadow/swe?clear_scores=true"
Once ready_to_promote: true, call POST /deploy/green. The router automatically uses accumulated shadow scores (real traffic data) instead of running a synthetic eval from scratch.
18.15 Regression gate β block promotion on quality drops
The regression gate runs an eval dataset against both BLUE and GREEN before allowing promotion. If GREEN's pass rate or U score drops relative to BLUE, the promotion is blocked.
blue_green:
swe:
delta: 0.025
regression_dataset: evals/coding_smoke.yaml # eval YAML in aua test fixture format
regression_block: true # true = block promotion; false = log warning only
shadow_endpoint: http://localhost:9011/v1/chat/completions
shadow_min_queries: 50
curl -X POST http://localhost:8000/deploy/green \
-d '{"specialist": "swe", "green_model": "./models/swe_v2",
"green_endpoint": "http://localhost:9011/v1/chat/completions"}'
# Blocked by regression:
# β {"promoted": false,
# "message": "PROMOTION BLOCKED β regression detected on evals/coding_smoke.yaml.",
# "regression": {"regressed": true, "delta_pass_rate": -0.15, "blocked": true}}
# Override the dataset per request (useful for testing):
curl -X POST http://localhost:8000/deploy/green \
-d '{"specialist": "swe", "green_model": "...",
"regression_dataset": "evals/quick_check.yaml"}'
18.16 Arbiter empirical check β external ground truth
Arbiter Stage 4 cross-checks claims against external sources before issuing a verdict. No configuration is required β it activates automatically for the relevant domains.
| Domain | Source | What it checks |
|---|---|---|
| mathematics, structural_engineering | SymPy | Extracts complexity claims (O(nΒ²), O(log n) β¦), tests algebraic equivalence and asymptotic ordering |
| software_engineering, stem_research | arXiv Atom API | Keyword search returns top-3 abstracts; scores by keyword overlap with the response |
| surgery, aviation, medicine | PubMed E-utilities | esearch β efetch pipeline; same keyword-overlap scoring |
| law, art, creative_writing, general | None | Returns "not converged β no external source for this domain" |
No API keys are required for basic arXiv and PubMed use. Set NCBI_API_KEY in your environment to increase PubMed rate limits above 3 requests/second. The empirical check never blocks β a flaky external API returns "not converged", not an error that breaks the query.
from aua.empirical import empirical_check
class EmpiricalContradictionDetector:
"""Cross-check every response against external literature."""
def check(self, problem: str, solution: str, claimed_complexity=None) -> dict:
result = empirical_check(problem, "software_engineering", solution, "")
if result.converged and result.winner not in ("neither", None, "both"):
return {
"contradictions": [{
"type": "empirical",
"description": result.explanation,
"severity": 0.6,
}],
"confidence_penalty": 0.15,
"is_clean": False,
}
return {"contradictions": [], "confidence_penalty": 0.0, "is_clean": True}
18.17 Hardware deployment tiers
AUA ships six hardware tier templates. Pass --tier to aua serve to use a preconfigured specialist layout for your hardware β no need to write a full aua_config.yaml from scratch.
| Tier flag | Hardware | Backend | Specialists | Best for |
|---|---|---|---|---|
gaming-pc | RTX 3080/4080 (10β16 GB VRAM) | Ollama | qwen2.5-coder:7b + qwen2.5:7b | Local dev, offline, Windows/Linux |
macbook | Apple Silicon M-series (16β64 GB) | Ollama | qwen2.5-coder:7b + qwen2.5:7b | Local dev, macOS |
single-4090 | RTX 4090 (24 GB VRAM) | vLLM + AWQ | Qwen2.5-7B-AWQ + Qwen2.5-Math-7B-AWQ | High-quality single-GPU serving |
quad-4090 | 4Γ RTX 4090 (96 GB total) | vLLM + AWQ | 14B specialists, TP=2 | Multi-specialist parallel serving |
a100-cluster | 8Γ A100 80 GB | vLLM bf16 | Llama-3-70B + Qwen-72B, TP=4 | Production, 70B class models |
h100-cluster | 8Γ H100 SXM5 NVLink | vLLM bf16 | Llama-3.1-70B + Qwen2.5-72B, TP=4 | Frontier models, highest throughput |
aua serve --tier gaming-pc # Ollama, laptop-friendly 7B models aua serve --tier h100-cluster # 8ΓH100 NVLink, 70B+ bf16 specialists # Tier aliases (shorter to type): aua serve --tier gaming # β gaming-pc aua serve --tier h100 # β h100-cluster aua serve --tier a100 # β a100-cluster
Tensor parallelism for models that don't fit on one GPU
When a model requires more VRAM than a single GPU has, vLLM can split it across multiple GPUs using tensor parallelism β each GPU holds a subset of the weight matrices and they synchronise via NCCL all-reduce over NVLink (fast) or PCIe (slower). Configure it per specialist in your aua_config.yaml:
specialists:
- name: math
model: Qwen/Qwen2.5-72B-Instruct # 72B bf16 β 144 GB β needs 4Γ A100 or 2Γ H100
port: 9002
field: mathematics
gpu_ids: [0, 1, 2, 3] # expose these 4 GPUs to vLLM
tensor_parallel_size: 4 # must be a power of 2: 1, 2, 4, or 8
pipeline_parallel_size: 1 # for single-node, always 1
gpu_memory_utilization: 0.90
max_model_len: 8192
Constraints: tensor_parallel_size must be a power of 2. len(gpu_ids) must equal tensor_parallel_size. On startup, the banner prints GPU 0,1,2,3 TPΓ4 to confirm the layout.
NVLink (H100 SXM5, A100 SXM) is 600 GB/s β tensor parallelism is fast here. PCIe (consumer GPUs) is 64 GB/s β use tensor parallelism only when necessary, prefer pipeline parallelism or smaller models on consumer hardware.
18.18 Retry with exponential backoff (#39)
When a specialist endpoint returns a transient error (network hiccup, 503, 429 rate-limit), AUA automatically retries the HTTP call before raising an error to the user. Retry is configured per router in aua_config.yaml:
router:
retry:
max_retries: 3 # 0 = disabled; default 3
base_delay_ms: 200 # first retry delay in milliseconds
max_delay_ms: 5000 # cap on computed delay
jitter: true # Β±25% random jitter (prevents thundering-herd)
retryable_status_codes: [429, 502, 503, 504] # default
Delay schedule: attempt 1 is immediate. Attempt 2 waits base_delay_ms. Each subsequent attempt doubles the delay, capped at max_delay_ms. Jitter adds Β±25% randomness so that multiple concurrent requests don't all retry at the same instant after a hiccup.
What gets retried: ConnectError, ConnectTimeout, ReadTimeout, and HTTP 429/502/503/504 β all transient availability failures.
What is NOT retried: HTTP 400, 401, 403, 404, 422, 500. These indicate bugs in the request or the specialist β retrying won't help and would hide the problem.
Retry is transport-level β it wraps the HTTP call to the specialist endpoint. It is separate from the assertion-level retry in aua/policy.py, which re-calls a specialist when a policy assertion fails. Both can be active simultaneously.
18.19 Circuit breaker and degraded-mode failover (#37, #38)
Even with retry, a specialist that is completely down will still cause every query routed to it to time out. The circuit breaker solves this: after failure_threshold consecutive availability failures within failure_window_s seconds, the circuit opens β subsequent queries skip that specialist immediately and route to the arbiter or a remaining healthy specialist. The user gets a response; it's just produced with reduced specialist availability.
Configuration
router:
circuit_breaker:
enabled: true # false = disable entirely (default: true)
failure_threshold: 5 # failures within window before opening
failure_window_s: 60.0 # sliding window for counting failures
recovery_timeout_s: 30.0 # seconds in OPEN before probing
success_threshold: 2 # consecutive successes in probe β CLOSED
State machine
| State | Behaviour | Transition |
|---|---|---|
| CLOSED | Normal β all calls pass through | β OPEN when failure_threshold failures occur within failure_window_s |
| OPEN | Circuit tripped β calls rejected immediately with 503 | β HALF_OPEN after recovery_timeout_s seconds |
| HALF_OPEN | Probe mode β one call allowed through | Success β CLOSED (after success_threshold successes). Failure β OPEN |
What trips the circuit: ConnectError, ConnectTimeout, ReadTimeout, HTTP 429/502/503/504.
What does NOT trip it: HTTP 400/401/403/404/422/500. These are caller or specialist bugs, not endpoint availability issues.
Degraded-mode response (#38)
When one or more specialists have open circuits during a query, the router routes to the arbiter or remaining healthy specialists and stamps the response with two fields:
{
"response": "...",
"u_score": 0.71,
"routing_mode": "arbiter",
"degraded_mode": true, # one or more circuits were open
"degraded_specialists": ["mathematics"] # which domains were bypassed
}
Your client can check degraded_mode to decide whether to display a "partial availability" notice, log an alert, or fall back to a cached response. When degraded_mode: false (the default), degraded_specialists is null.
Inspecting circuit breaker state
# GET /health/ready β includes circuit_breakers array
curl -s localhost:8000/health/ready | python3 -m json.tool
{
"specialists": {"swe": "ok", "math": "unreachable"},
"circuit_breakers": [
{"specialist": "swe", "state": "closed", "failure_count": 0},
{"specialist": "math", "state": "open", "failure_count": 5,
"open_since": 1718900000.0, "recovery_timeout_s": 30.0}
]
}
Typical failure arc. Specialist goes down β first 3 requests trigger retry (3Γ specialist_timeout delay each) β 5th failure trips the circuit β all subsequent requests skip instantly to arbiter with degraded_mode: true β after recovery_timeout_s, one probe fires β specialist back up β circuit closes β normal routing resumes.
Reference
Reference The complete aua_config.yaml
Every key the loader accepts, with defaults. Validation is strict at every level β an unknown key anywhere fails at startup naming the bad key and listing the valid ones, so this reference and aua config validate can never drift apart. (aua config expand prints your resolved config with all defaults filled in.)
aua: version: "0.5" # written by aua init β don't hand-edit mode: local # local | kubernetes | cluster backend: ollama # default backend: ollama | vllm (per-specialist override below) project_name: my-aua-project # optional specialists: # one entry per model you route to - name: swe # unique; used in logs, metrics, blue_green keys model: qwen2.5-coder:7b # registry alias, raw Ollama tag, or HF ID (vLLM) field: software_engineering # one of the 11 built-in fields (Β§2.5) port: 11434 # Ollama: shared server port; vLLM: per-model port host: 127.0.0.1 # where the model server lives scheme: http # http | https backend: ollama # override the global backend per specialist gpu: 0 # GPU index (vLLM placement) gpu_memory_utilization: 0.34 # vLLM only max_model_len: 2048 # vLLM only quantization: awq # vLLM only enforce_eager: true # vLLM only endpoint_override: null # full URL β bypasses host/port/scheme assembly models_url_override: null # override the /v1/models discovery URL arbiter: # same keys as a specialist, minus name/field/gpu... : model: qwen2.5:3b # model, port, host, scheme, backend, endpoint_override, port: 11434 # models_url_override, gpu, gpu_memory_utilization, # max_model_len, quantization, enforce_eager router: port: 8000 host: 127.0.0.1 single_domain_threshold: 0.75 # top prob β₯ this β single routing fanout_threshold: 0.30 # 2+ domains β₯ this β fanout (race + arbiter) specialist_timeout: 60.0 # seconds per specialist call tau: 1.0 # routing softmax temperature (1.0 = off) arbitration_mode: pairwise # pairwise | vcg | llm retry: max_retries: 3 # 0 to disable; default 3 base_delay_ms: 200 max_delay_ms: 5000 jitter: true retryable_status_codes: [429, 502, 503, 504] circuit_breaker: enabled: true failure_threshold: 5 failure_window_s: 60.0 recovery_timeout_s: 30.0 success_threshold: 2 tau: 1.0 # routing softmax temperature (1.0 = off) arbitration_mode: pairwise # pairwise | vcg | llm retry: max_retries: 3 # 0 to disable; default 3 base_delay_ms: 200 max_delay_ms: 5000 jitter: true retryable_status_codes: [429, 502, 503, 504] circuit_breaker: enabled: true failure_threshold: 5 failure_window_s: 60.0 recovery_timeout_s: 30.0 success_threshold: 2 arbitration_mode: verdict # verdict | vcg (welfare-maximizing winner selection) cors_origins: ["http://localhost:3001"] blue_green: # promotion criteria, keyed by specialist name swe: delta: 0.025 # GREEN must beat BLUE's mean U by this much T_min: 10 # over at least this many queries tau: 0.20 # exploration fraction routed to GREEN during eval logging: level: INFO # DEBUG | INFO | WARNING | ERROR format: "%(asctime)s [%(levelname)s] %(name)s: %(message)s" secrets: # Β§12.3 β config holds NAMES, providers resolve values provider: env # env | vault | aws | gcp url: http://127.0.0.1:8200 # vault only token_env: VAULT_TOKEN # vault only β env var holding the token region: us-east-1 # aws only state: backend: sqlite # sqlite | files path: .aua/state/aua.db security: cors_origins: null # overrides router.cors_origins when set mtls: {} # Β§12.2 β {enabled, cert_dir, auto_generate} encryption: {} # Β§12.4 β {enabled, key_secret} plugins: # How-to 13 β kind β spec; config splats as kwargs field_classifier: # kinds: field_classifier, utility_scorer, import_path: plugins.mine:KeywordClassifier # arbiter_policy, promotion_policy, config: {confidence_boost: 1.1} # correction_store, model_backend, state_store hooks: # How-to 14 β a LIST of registrations - hook_point: on_correction # 11 valid points β see How-to 14 import_path: plugins.mine:SlackNotificationHook config: {webhook_url: "https://hooks.slack.com/..."} fail_closed: false # true β hook failure aborts the request middleware: # How-to 14 β ordered pipeline - import_path: plugins.mine:PIIRedactionMiddleware config: {} - plugins.mine:LogMiddleware # bare string OK when there's no config
Wired from YAML at startup: field_classifier, utility_scorer, correction_store, all hooks, all middleware. The other plugin kinds (arbiter_policy, promotion_policy, model_backend, state_store) load and contract-validate from config β typos still fail fast β and attach at the programmatic points shown in How-to 13.
Reference Troubleshooting
The errors every newcomer hits, in the order they usually hit them.
| Symptom | Cause & fix |
|---|---|
aua: command not found right after pip install | The install went to a Python whose bin/ isn't on PATH β classic with pyenv. pyenv local 3.11.x && pip install adaptive-utility-agent, or python3 -m aua.cli --help to bypass PATH entirely. |
Unknown key(s) in 'top-level': ['specialist'] (or any key) | Strict validation caught a typo. The error names the bad key and lists every valid one at that level β compare against the config reference above. This fires at aua config validate / startup, never mid-request. |
Specialist 'x' references unknown field 'devops' | field: must be one of the 11 built-ins (Β§2.5). Map your domain to the closest risk profile and, if you want custom routing language, add a FieldClassifierPlugin (How-to 13). |
aua doctor: Ollama not reachable | ollama serve & first, then re-run. If it's running on a non-default port, set port: on each specialist (or endpoint_override). |
| Query returns 503 from the router | The specialist's model server is reachable but the model isn't available β usually the tag isn't pulled. ollama list must show the exact string in model:. ollama pull <tag> fixes it. |
| Queries return 503 instantly with "Circuit open for X" | The circuit breaker has opened for specialist X after repeated failures. Check GET /health/ready β circuit_breakers for state and open_since. The circuit will probe automatically after recovery_timeout_s (default 30s). Tune with circuit_breaker.failure_threshold. Disable with circuit_breaker.enabled: false. |
Response has degraded_mode: true | One or more specialists had open circuits β degraded_specialists lists which ones. Router served via arbiter or remaining healthy specialists. Check GET /health/ready β circuit_breakers and specialist logs. |
| Queries return 503 instantly with "Circuit open for X" | The circuit breaker has opened for specialist X after repeated failures. Check GET /health/ready β circuit_breakers for state and open_since. The circuit will probe automatically after recovery_timeout_s (default 30s). To tune sensitivity: raise circuit_breaker.failure_threshold or lower failure_window_s. To disable: circuit_breaker.enabled: false. |
Response has degraded_mode: true | One or more specialists had open circuits when the query arrived β degraded_specialists lists which ones. The router served the request via the arbiter or remaining healthy specialists. Check /health/ready and specialist logs. Normal routing resumes once the circuit closes. |
| Query returns 504 / times out | The model is loading (first call after a pull can take a minute on CPU) or genuinely too slow. Raise router.specialist_timeout; warm the model with one direct ollama run first. |
Address already in use on aua serve | Something owns :8000 (often a previous aua serve). lsof -i :8000 then kill it, or change router.port. |
PluginLoadError: Cannot import module 'plugins.mine' | The router adds the config file's directory to sys.path (Django-style). Your plugins/ package must sit next to aua_config.yaml and contain an __init__.py. Pre-flight with aua extensions test --kind ... --import-path ... from the project dir. |
Failed to instantiate MyPlugin: unexpected keyword argument | The YAML config: mapping is splatted as constructor kwargs β cls(**config). Every key in your config block must be a parameter of __init__ (How-to 13's constructor contract). |
| Plugin registered but nothing changed | Ask the server, not the CLI: curl -s localhost:8000/extensions shows what the running process loaded (the CLI's extensions list runs in a fresh process and can't see it). Also check startup logs for "Plugin loaded from config". |
Every query routes to general / the arbiter | The classifier isn't confident. Check domain_distribution in the response; lower router.single_domain_threshold, use force_domain to verify the specialist itself works, or write a KeywordClassifier for your traffic (How-to 13). |
| Hooks registered but never fire | Hook config is a list of {hook_point, import_path} entries (How-to 14) β the older nested-mapping shape is rejected by validation. Verify with GET /extensions β hooks. |
sqlite3.OperationalError: database is locked | Two router processes share one state.path. Run one router per state DB, or point the second at its own path. |
The 30-second triage, in order: aua config validate (config truth) β aua doctor (environment truth) β curl localhost:8000/health/ready (specialist reachability) β curl localhost:8000/extensions (extension truth) β logs with logging.level: DEBUG.
Reference CLI command groups
aua init
aua init <name> [--preset <name>] [--tier <name>] [--force]
aua serve
aua serve [--dry-run] [--tier <name>] [--reuse-running] [--with-ui] [--config <path>]
aua doctor
aua doctor [--strict] [--json] [--check-certs]
aua config
aua config validate [--config <path>] aua config expand [--json] aua config expand aua config reload
aua models / fields / presets / defaults
aua models list | inspect <name> aua fields list | inspect <name> aua presets list | inspect <name> aua defaults show [<category>]
Pin a model revision:
aua models pin Qwen/Qwen2.5-7B-Instruct # list available revisions aua models pin Qwen/Qwen2.5-7B-Instruct --revision v0.3 # show config snippet to paste aua models pin swe --mlflow-uri models:/swe # list MLflow registered versions aua models pin swe --mlflow-uri models:/swe --revision 3 # get MLflow URI snippet
aua token
aua token create --scope <scope> [--expires <duration>] [--name <label>] aua token list [--json] aua token revoke <token-id> aua token inspect <token-id>
aua certs
aua certs generate [--ca-cert <path>] [--ca-key <path>] aua certs inspect
aua test
Run the built-in integration test suite against a live router.
aua test # smoke suite (default) against localhost:8000 aua test --suite full # full regression suite aua test --suite routing # domain classification correctness aua test --url http://prod:8000 # target a different router aua test --dataset my_cases.yaml # custom fixture file aua test --case swe_binary_search # run one specific case (repeatable) aua test --json --output report.json # machine-readable JSON report aua test --no-liveness # skip GET /health/live pre-flight
aua loadtest
Fire concurrent requests and report p50/p95/p99 latency, throughput, and U score.
aua loadtest # 10 workers, 30 s, smoke query mix aua loadtest -c 20 -d 60 # 20 workers, 60 s aua loadtest --suite full -c 5 # full fixture suite as query mix aua loadtest --dataset my_queries.yaml # custom query mix aua loadtest --ramp 10 -c 20 -d 120 # ramp from 0β20 workers over 10 s aua loadtest --think-ms 200 -c 10 # 200 ms pause between each request per worker aua loadtest --json --output bench.json aua loadtest --url http://prod:8000
aua eval
aua eval run --dataset <path> [--config <path>] [--json] aua eval report <results.json> aua eval compare --baseline <blue.json> --candidate <green.json>
aua corrections / dpo
aua corrections export --format jsonl [--redact] aua dpo export --format preference-pairs [--redact]
aua extensions
aua extensions list aua extensions inspect <name> aua extensions test --kind <type> --import-path <path> # Validate: aua extensions test --kind--import-path
aua status / rollback
aua status [--once] [--json] [--url <url>] [--refresh <seconds>] aua rollback [--specialist <name>] [--all] [--yes] [--no-restart]
aua guard
aua guard list [--json] aua guard test --import-path <module:function> [--output <text>] [--domain <name>]
aua policy
aua policy list aua policy validate <path.yaml> aua policy apply <path.yaml> [--dry-run]
aua calibrate
aua calibrate --layer <1|2|3> [--force] [--dry-run] [--config <path>]
[--dataset <path>] # layer 1 only
[--output <path.jsonl>] # layer 3 only
[--min-pairs <N>] # layer 3 only (default: 10)
aua logs
aua logs sessions [--limit <N>] [--domain <name>] [--json] aua logs assertions [--filter <key=value>] [--assertion <name>] [--tail <N>] [--json] aua logs export [--output <path>] [--table <table>] [--limit <N>]
aua metrics
aua metrics --compare <window> # 7d, 30d, or YYYY-MM-DD:YYYY-MM-DD
[--metric <name>] # u_score | assertion_fail_rate | retry_rate
[--json]
Reference REST API endpoints
All endpoints are on http://localhost:8000. Auth is disabled by default β include Authorization: Bearer $AUA_TOKEN only when auth is enabled in config. Interactive docs: http://localhost:8000/docs (Swagger UI).
| Method | Endpoint | Scope | Description |
|---|---|---|---|
| POST | /query | aua:query | Route a query. Returns response + full metadata. |
| POST | /query/stream | aua:stream | Streaming SSE query. Emits start β chunks β done. |
| POST | /query/batch | aua:batch | Route multiple queries concurrently. |
| GET | /health/live | β | Liveness probe. Returns 200 if process is alive. |
| GET | /health/ready | β | Readiness probe. Returns 200 when all specialists reachable. |
| GET | /health/startup | β | Startup probe. Returns 200 after first successful readiness check. |
| GET | /status | aua:status | Full telemetry β U scores, latency percentiles, routing stats, memory. |
| GET | /version | β | Framework version and build info. |
| GET | /config | aua:config:read | Running config (secrets redacted). |
| POST | /config/reload | aua:config:write | Hot-reload config without restart. |
| GET | /corrections | aua:corrections:read | List all stored corrections. |
| POST | /corrections | aua:corrections:write | Inject a verified fact into the corrections store. |
| POST | /deploy/green | aua:deploy | Register a GREEN candidate and evaluate against BLUE. |
| POST | /deploy/rollback | aua:rollback | Roll back specialist to previous BLUE model. |
| GET | /metrics | β | Prometheus scrape endpoint (18 metrics). |
| GET | /metrics/cost | aua:status | GPU hours and USD cost per specialist. |
| POST | /sessions | aua:query | Create a new chat session. |
| GET | /sessions | aua:query | List all sessions (most recent first). |
| GET | /sessions/{id} | aua:query | Get session metadata. |
| DELETE | /sessions/{id} | aua:query | Delete a session and its messages. |
| POST | /sessions/{id}/messages | aua:query | Send a message in a session context. |
| GET | /extensions | β | List registered plugins and hooks. |
| v1.1 β persistence, search & production ops (How-to 18) | |||
| POST | /conversations | aua:query | Create a conversation (optional project_id). |
| GET | /conversations | aua:query | List conversations; ?project_id= filters. |
| PATCH | /conversations/{id}/title | aua:query | Rename a conversation. |
| GET | /conversations/{id}/messages | aua:query | Paginated messages (before/after cursors). |
| POST | /conversations/{id}/messages | aua:query | Append a message; keyword-indexed in the background. |
| POST | /projects | aua:query | Create a project for grouping conversations. |
| GET | /projects | aua:query | List projects. |
| GET | /search | aua:query | Message-level keyword search (AND semantics, prefix match). |
| GET | /context/backup/coverage | aua:status | Stale/missing context backups per specialist. |
| POST | /context/backup/run-coverage-job | aua:query | Trigger an immediate backup coverage sweep. |
| POST | /corrections/confirm-implicit | aua:corrections:write | Accept/Reject a detected implicit correction. |
| PATCH | /corrections/{id} | aua:corrections:write | Edit a stored correction (logs an edited event). |
| DELETE | /corrections/{id} | aua:corrections:write | Soft-delete (scope='superseded'; row kept for audit). |
| GET | /corrections/evidence | aua:corrections:read | Per-correction event history and application counts. |
| GET | /analytics | aua:status | Specialist stats, confidence distribution, domain + welfare summary. |
| GET | /reliability | aua:status | Per-specialist win rate + last-20 welfare trajectory. |
| GET | /usage | aua:status | Query counts and estimated cost per specialist. |
| GET | /pricing | aua:status | Per-specialist token pricing from the live model registry. |
| GET | /version/check | β | Compare running version against the latest GitHub release. |
| POST | /update/skip | aua:config:write | Persist a skipped update version (banner stays hidden). |
| GET | /update/skipped | β | Return the skipped version, if any. |
| POST | /bug-report | β | Submit a structured bug report (200 even without a PAT). |
| GET | /local/models | aua:status | List registered local (Ollama-class) models. |
| POST | /local/models | aua:config:write | Register/upsert a local model. |
| PATCH | /local/specialist/{id} | aua:config:write | Tag a local model as a domain specialist. |
| GET | /local/settings | aua:status | Read local model settings. |
| POST | /local/settings | aua:config:write | Write local model settings. |
| GET | /domain-tree | aua:status | Full domain ontology with node stats and candidate queue. |
#15: every response also carries X-Session-ID, X-Trace-ID, and X-Request-ID headers β client-supplied values honored, UUIDs generated otherwise.
All POST bodies are JSON. All responses are JSON unless noted. Auth header: Authorization: Bearer $AUA_TOKEN.
1. Core query
POST /query
The primary endpoint. Routes the query through the specialist graph, scores the response, runs assertions (if a policy is active), and returns everything.
{
"query": "string", // REQUIRED β the query text
"session_id": "string", // optional β scopes cross-session corrections; a UUID is generated and echoed back if omitted (#15)
"conversation_history": [ // optional β prior turns for context
{"role": "user", "content": "previous question"},
{"role": "assistant", "content": "previous answer"}
],
"force_domain": "string" // optional β bypass classifier, e.g. "software_engineering"
}
curl -s -X POST http://localhost:8000/query \
-H "Content-Type: application/json" \
-d '{"query": "Write binary search in Python. State time complexity."}'
curl -s -X POST http://localhost:8000/query \
-H "Content-Type: application/json" \
-d '{
"query": "Now implement it iteratively instead.",
"session_id": "user_42",
"conversation_history": [
{"role": "user", "content": "Write binary search in Python."},
{"role": "assistant", "content": "def binary_search(arr, target):..."}
]
}'
{
"query": "Write binary search in Python.",
"response": "def binary_search(arr, target):\n ...", // the answer
"routing_mode": "single", // "single" | "fanout" | "arbiter"
"primary_domain": "software_engineering",
"domain_distribution": {"software_engineering": 0.91, "mathematics": 0.09},
"u_score": 0.731, // U = w_eΒ·E + w_cΒ·C + w_kΒ·K (higher = better)
"confidence": 0.823, // Kalman-filtered consistency score
"contradictions_detected": 0, // > 0 means a correction was stored
"dpo_pairs_generated": 0, // DPO training pairs added this query
"latency_ms": 312.4,
"specialist_responses": null // populated only in fanout mode
}
import requests
r = requests.post("http://localhost:8000/query", json={
"query": "Write binary search in Python.",
"session_id": "my-session",
})
data = r.json()
print(data["response"]) # the answer β use this in your app
print(data["primary_domain"]) # which specialist answered
print(data["u_score"]) # quality score β log this over time
print(data["routing_mode"]) # "single" | "fanout" | "arbiter"
# In fanout mode, inspect per-specialist responses:
if data["routing_mode"] == "fanout":
for sr in data["specialist_responses"]:
print(sr["domain"], sr["response"][:100])
POST /query/stream
Server-Sent Events (SSE). Same request body as /query. The connection stays open and emits three event types: start (routing decision), chunk (one token), done (final metadata). Useful for showing a live typing effect in a UI.
curl -s -X POST http://localhost:8000/query/stream \
-H "Content-Type: application/json" \
-H "Accept: text/event-stream" \
--no-buffer \
-d '{"query": "Write quicksort in Python.", "session_id": "s1"}'
# Output β one event per line pair (data: {json}\n\n):
data: {"type":"start","routing_mode":"single","primary_domain":"software_engineering","domain_distribution":{"software_engineering":0.93}}
data: {"type":"chunk","text":"def","index":0}
data: {"type":"chunk","text":" quicksort","index":1}
data: {"type":"chunk","text":"(arr):","index":2}
# ... more chunks ...
data: {"type":"done","full_response":"def quicksort(arr):\n if len(arr) <= 1:\n return arr\n pivot = arr[len(arr) // 2]\n left = [x for x in arr if x < pivot]\n middle = [x for x in arr if x == pivot]\n right = [x for x in arr if x > pivot]\n return quicksort(left) + middle + quicksort(right)","routing_mode":"single","primary_domain":"software_engineering","domain_distribution":{"software_engineering":0.93},"u_score":0.748,"confidence":0.831,"contradictions_detected":0,"dpo_pairs_generated":0,"latency_ms":1821.3}
import requests import json # pip install sseclient-py from sseclient import SSEClient response = requests.post( "http://localhost:8000/query/stream", json={"query": "Write quicksort in Python.", "session_id": "s1"}, stream=True, headers={"Accept": "text/event-stream"}, ) full_text = "" for event in SSEClient(response).events(): data = json.loads(event.data) if data["type"] == "start": print(f"Routing to: {data['primary_domain']}") elif data["type"] == "chunk": print(data["text"], end="", flush=True) # live typing effect full_text += data["text"] elif data["type"] == "done": print() # newline after last token print(f"\nU score: {data['u_score']}") print(f"Latency: {data['latency_ms']:.0f}ms") # data["full_response"] == full_text (same content) break
import requests, json
r = requests.post(
"http://localhost:8000/query/stream",
json={"query": "Write quicksort in Python."},
stream=True,
)
for line in r.iter_lines():
if line and line.startswith(b"data: "):
data = json.loads(line[6:])
if data["type"] == "chunk":
print(data["text"], end="", flush=True)
elif data["type"] == "done":
print()
break
POST /query/batch
Route multiple queries concurrently. Each query is routed independently. Results are returned in the same order as the input.
{
"queries": ["string", ...], // REQUIRED β 1 to 100 queries
"session_id": "string", // optional β shared session for all queries
"max_parallel": 4 // optional β concurrent specialist calls (default 4, max 32)
}
curl -s -X POST http://localhost:8000/query/batch \
-H "Content-Type: application/json" \
-d '{
"queries": [
"Write binary search in Python.",
"What is the derivative of x squared?"
],
"max_parallel": 4
}'
{
"results": [...], // list of RouterResponse objects β same shape as /query
"total_latency_ms": 891.2, // wall-clock for the whole batch
"n_queries": 2,
"n_errors": 0 // failed queries are excluded from results
}
r = requests.post("http://localhost:8000/query/batch", json={
"queries": ["Write binary search.", "What is O(n log n)?"],
})
data = r.json()
for i, result in enumerate(data["results"]):
print(f"Q{i}: domain={result['primary_domain']} U={result['u_score']:.3f}")
print(result["response"][:100])
2. Health & status
GET /health/live GET /health/ready GET /health/startup
Kubernetes-style probes. Use /health/live for liveness (process alive), /health/ready for readiness (all specialists reachable), /health/startup for startup (ready at least once).
curl http://localhost:8000/health/live # β 200 {"status":"alive","uptime_s":142.3} curl http://localhost:8000/health/ready # β 200 or 503 curl http://localhost:8000/health/startup # β 200 or 503
{
"status": "ready", // "ready" or "not_ready"
"specialists": {
"swe": "ok", // "ok" | "unreachable" | "http_503"
"math": "ok",
"arbiter": "ok"
}
}
import time, requests
def wait_for_ready(timeout=60):
for _ in range(timeout):
try:
r = requests.get("http://localhost:8000/health/ready", timeout=2)
if r.status_code == 200 and r.json()["status"] == "ready":
return True
except requests.ConnectionError:
pass
time.sleep(1)
raise TimeoutError("Router not ready after 60s")
wait_for_ready()
# now safe to send queries
GET /status
Full telemetry snapshot. This is the richest endpoint β use it to monitor U score trends, latency percentiles, specialist health, and routing distribution over time.
curl http://localhost:8000/status | python3 -m json.tool
{
"version": "1.0.1",
"backend": "ollama",
"uptime_s": 3621.4,
"health": {
"swe": "ok",
"math": "ok",
"arbiter": "ok"
},
"utility": { // per-domain U score history
"software_engineering": {
"mean_u": 0.7312, // track this over time β should trend up
"last_u": 0.7480,
"queries": 142,
"confidence": 0.8231
},
"mathematics": {
"mean_u": 0.6891,
"last_u": 0.7012,
"queries": 47,
"confidence": 0.7910
}
},
"latency": { // latency percentiles per component
"router": {"p50_ms": 312.4, "p95_ms": 891.2, "last_ms": 298.1, "samples": 189},
"swe": {"p50_ms": 280.1, "p95_ms": 820.4, "last_ms": 271.3, "samples": 142}
},
"routing": {
"total_queries": 189,
"by_mode": {"single": 154, "fanout": 28, "arbiter": 7},
"contradictions": 12,
"dpo_pairs": 8
},
"store": {
"corrections_total": 23,
"sessions_total": 41
},
"memory": {
"system": "CPU / Ollama" // or "gpu0: 8192 / 24576 MiB" on GPU
}
}
import requests
status = requests.get("http://localhost:8000/status").json()
for domain, stats in status["utility"].items():
mean_u = stats["mean_u"]
if mean_u and mean_u < 0.50:
print(f"β Low U score on {domain}: {mean_u:.3f} β check specialist")
else:
print(f"β {domain}: mean_u={mean_u:.3f} queries={stats['queries']}")
# Check latency
p95 = status["latency"]["router"]["p95_ms"]
if p95 and p95 > 5000:
print(f"β High p95 latency: {p95:.0f}ms")
# Routing distribution
print(status["routing"]["by_mode"]) # {"single": 154, "fanout": 28, "arbiter": 7}
GET /version
curl http://localhost:8000/version
# {"version": "1.0.1", "framework": "AUA Framework", "python": "3.11.10"}
3. Configuration
GET /config
Returns the running configuration. Secrets are redacted β you will never see API keys or token values here.
curl http://localhost:8000/config
{
"version": "0.5",
"mode": "local",
"backend": "ollama",
"specialists": [
{"name": "swe", "model": "qwen2.5-coder:7b", "port": 11434, "field": "software_engineering", "endpoint": "http://localhost:11434"},
{"name": "math", "model": "qwen2.5:7b", "port": 11434, "field": "mathematics", "endpoint": "http://localhost:11434"}
],
"arbiter": {"model": "qwen2.5:3b", "port": 11434, "endpoint": "http://localhost:11434"},
"router": {"port": 8000, "single_domain_threshold": 0.75, "fanout_threshold": 0.30, "specialist_timeout": 30.0}
}
config = requests.get("http://localhost:8000/config").json()
for spec in config["specialists"]:
print(f"{spec['name']}: {spec['model']} ({spec['field']})")
print(f"Single threshold: {config['router']['single_domain_threshold']}")
POST /config/reload
Hot-reload the config file without restarting. Only hot-reloadable settings apply (routing thresholds, utility weights, CORS). Model/port changes require restart.
curl -X POST http://localhost:8000/config/reload
# {"reloaded": true, "message": "Config reloaded from aua_config.yaml"}
4. Corrections
POST /corrections
Inject a verified fact into the corrections store. It will be included in every future query on that subject in that domain β permanently (Class A) by default.
{
"subject": "string", // REQUIRED β short label e.g. "bubble_sort_complexity"
"domain": "string", // REQUIRED β must match a field name e.g. "software_engineering"
"claim": "string", // REQUIRED β the verified fact, max 2000 chars
"confidence": 0.9, // optional β 0.0β1.0 (default 0.9)
"source": "manual" // optional β label for the correction source
}
curl -X POST http://localhost:8000/corrections \
-H "Content-Type: application/json" \
-d '{
"subject": "bubble_sort_complexity",
"domain": "software_engineering",
"claim": "Bubble sort is O(nΒ²) average and worst-case. O(1) extra space.",
"confidence": 0.99,
"source": "manual"
}'
{
"stored": true,
"subject": "bubble_sort_complexity",
"domain": "software_engineering",
"claim": "Bubble sort is O(nΒ²) average and worst-case. O(1) extra space.",
"confidence": 0.99,
"decay_class": "A" // A = never decays | B = 10yr | C = 3yr | D = 6mo
}
GET /corrections
curl http://localhost:8000/corrections
{
"total": 23,
"returned": 23,
"corrections": [
{
"subject": "bubble_sort_complexity",
"domain": "software_engineering",
"claim": "Bubble sort is O(nΒ²)...",
"effective_confidence": 0.99,
"decay_class": "A",
"source": "manual"
},
...
]
}
data = requests.get("http://localhost:8000/corrections").json()
print(f"Total corrections: {data['total']}")
for c in data["corrections"]:
print(f"[{c['decay_class']}] {c['domain']} / {c['subject']}: {c['claim'][:60]}")
5. Deployment
POST /deploy/green
Register a GREEN model candidate and evaluate it against the current BLUE. Promotes if the U score delta exceeds the configured threshold.
{
"specialist": "string", // REQUIRED β specialist name e.g. "swe"
"green_model": "string", // REQUIRED β model path or HuggingFace ID
"n_eval_queries": 10 // optional β evaluation queries to run (default 10, max 100)
}
curl -X POST http://localhost:8000/deploy/green \
-H "Content-Type: application/json" \
-d '{
"specialist": "swe",
"green_model": "qwen2.5-coder:14b",
"n_eval_queries": 20
}'
{
"specialist": "swe",
"promoted": true, // false = GREEN didn't beat BLUE by enough
"u_delta": 0.062, // green_u - blue_u (must exceed threshold)
"blue_u": 0.712,
"green_u": 0.774,
"threshold": 0.050,
"message": "GREEN promoted: U delta 0.062 exceeds threshold 0.050",
"dry_run_only": false
}
r = requests.post("http://localhost:8000/deploy/green", json={
"specialist": "swe",
"green_model": "qwen2.5-coder:14b",
})
result = r.json()
if result["promoted"]:
print(f"β Promoted. U delta: {result['u_delta']:+.3f}")
else:
print(f"β Not promoted. Delta {result['u_delta']:+.3f} below threshold {result['threshold']}")
5b. Shadow mode
| Method | Path | Description |
|---|---|---|
| POST | /deploy/shadow/{specialist} | Activate shadow mode β GREEN receives production traffic silently. Body: green_endpoint (required), min_queries, threshold |
| GET | /deploy/shadow/{specialist} | Report accumulated shadow scores: n_queries, mean_delta, blue/green mean U, ready_to_promote, progress string |
| DELETE | /deploy/shadow/{specialist} | Deactivate shadow mode. Query param: clear_scores=true to also purge accumulated scores |
5c. Batch queue
| Method | Path | Description |
|---|---|---|
| POST | /batch/jobs | Submit a batch job. Body: queries (list), priority (high|normal|low), max_parallel. Returns job_id immediately. |
| GET | /batch/jobs/{job_id} | Poll job status and partial results. Returns status, n_done, n_pending, n_errors, results (list of completed responses) |
| GET | /batch/jobs | List recent batch jobs. Query params: status (pending|running|done|error), limit |
6. Observability
GET /metrics
Prometheus scrape endpoint. Returns plain text in Prometheus exposition format β not JSON. Point your Prometheus scrape_configs here.
curl http://localhost:8000/metrics | grep "^aua_"
aua_queries_total{domain="software_engineering",routing_mode="single",status="ok"} 142.0
aua_query_latency_seconds_bucket{domain="software_engineering",routing_mode="single",le="0.5"} 89.0
aua_utility_score{domain="software_engineering"} 0.748
aua_contradiction_rate{domain="software_engineering"} 0.0
aua_assertion_results_total{assertion_name="PythonSyntaxCheck",level="blocking",passed="true",domain="software_engineering"} 38.0
aua_assertion_fail_rate β not a gauge, derive from assertion_results_total
GET /metrics/cost
curl http://localhost:8000/metrics/cost | python3 -m json.tool
{
"swe": {"queries": 142, "gpu_hours": 0.014, "cost_usd": 0.0097},
"math": {"queries": 47, "gpu_hours": 0.005, "cost_usd": 0.0034},
"total_cost_usd": 0.0131
}
cost = requests.get("http://localhost:8000/metrics/cost").json()
print(f"Total cost so far: ${cost['total_cost_usd']:.4f}")
for name, stats in cost.items():
if isinstance(stats, dict):
print(f" {name}: {stats['queries']} queries ${stats['cost_usd']:.4f}")
7. Sessions & chat
Sessions maintain conversation state across multiple queries. The Chat UI uses these endpoints. You can also call them directly to build your own chat interface on top of AUA.
# 1. Create a session SESSION=$(curl -s -X POST http://localhost:8000/sessions \ -H "Content-Type: application/json" \ -d '{"title": "Python help"}' | python3 -c "import sys,json; print(json.load(sys.stdin)['id'])") echo "Session: $SESSION" # 2. Send a message curl -s -X POST http://localhost:8000/sessions/$SESSION/messages \ -H "Content-Type: application/json" \ -d '{"content": "Write binary search in Python."}' # 3. List sessions curl http://localhost:8000/sessions # 4. Get session metadata curl http://localhost:8000/sessions/$SESSION # 5. Delete session curl -X DELETE http://localhost:8000/sessions/$SESSION
{
"id": "3f2a1b9c-...", // session UUID β use in subsequent requests
"title": "Python help",
"created_at": 1747000000.123, // Unix timestamp
"updated_at": 1747000000.123,
"message_count": 0
}
{
"sessions": [
{"id": "3f2a1b9c-...", "title": "Python help", "message_count": 4, "updated_at": 1747003200.0},
{"id": "8a1c4e2f-...", "title": "New Chat", "message_count": 1, "updated_at": 1747002000.0}
]
}
import requests BASE = "http://localhost:8000" # Create session session = requests.post(f"{BASE}/sessions", json={"title": "My chat"}).json() session_id = session["id"] while True: user_input = input("You: ") if user_input.lower() == "quit": break r = requests.post(f"{BASE}/sessions/{session_id}/messages", json={"content": user_input}).json() print(f"AUA [{r['primary_domain']}]: {r['response']}\n") # Clean up requests.delete(f"{BASE}/sessions/{session_id}")
8. Extensions
GET /extensions
curl http://localhost:8000/extensions
{
"plugins": {
"arbiter_policy": null, // null = using the built-in
"correction_store": null,
"field_classifier": "plugins.mine:KeywordClassifier",
"model_backend": null,
"promotion_policy": null,
"state_store": null,
"utility_scorer": null
},
"hooks": {
"on_correction": ["SlackNotificationHook"]
// keyed by hook point; class names of registered hooks
},
"middleware": ["ShoutMiddleware"] // in pipeline order
}
Standard error response
All errors return this shape regardless of endpoint:
{
"error": "AUA_SPECIALIST_TIMEOUT", // stable error code β use for programmatic handling
"message": "Specialist swe timed out after 30s",
"status_code": 503,
"request_id": "user_42" // echoes session_id from request
}
r = requests.post("http://localhost:8000/query", json={"query": "hello"})
if not r.ok:
err = r.json()
print(f"Error {err['status_code']}: {err['error']}")
print(err["message"])
else:
data = r.json()
print(data["response"])
linkedin.com/in/praneethtota Β· Code: GPL-3.0 Β· Docs: CC BY 4.0