AUA Framework Β· Tutorial v1.2.0
Tutorial Β· v1.2.0

Your LLM makes the same mistake twice.
AUA makes sure it doesn't make it three times.

Most frameworks give you a model call. AUA gives you a control layer around that call β€” routing, scoring, correction, and policy enforcement that runs on every query and gets smarter over time.

Here's the problem this framework exists to solve. You deploy an LLM. It gives a wrong answer on Tuesday. You notice on Thursday. You add it to the system prompt on Friday. Next Tuesday β€” different user, same wrong answer. The prompt didn't stick, or the context window dropped it, or a slightly different phrasing triggered a different path. The error lives.

AUA closes that loop without waiting for a new model release:

Right now β€” every query

Routes to the right specialist. Scores the response with a utility function. Injects prior verified corrections into the context so past mistakes don't repeat. Enforces your policy β€” blocking bad output and retrying before the user ever sees it.

Over time β€” across sessions

Accumulates what the model consistently gets wrong. Tracks which sessions followed your policy perfectly. Exports those gold-standard sessions as DPO training pairs β€” ready to fine-tune the model so the corrections become permanent.

You define β€” what good means

Write a Policy. Say what must never appear (BLOCKING). Say what you want to see rewarded (INFO, with an E-score bonus). The framework enforces it on every call, tracks adherence over every session, and uses your policy as a curriculum for the next fine-tuning cycle.

It's designed like Django β€” you get a working system in five commands, and you can customise routing thresholds, utility weights, arbiter behaviour, correction stores, model backends, hooks, middleware, and deployment policy without touching framework internals. The quickstart below takes ten minutes. Parts 10–12 show how to teach the framework what good output looks like and watch it improve over time.

Start in 5 minutes β†’ Skip to Policies & Assertions β†’ CLI reference β†’
Quick Start Tutorial (12 parts) How-to guides (6 topics) CLI reference REST API reference

5-minute quickstart

Mac / Apple Silicon prerequisites. The macbook tier uses Ollama to serve models locally. Install and start it before running aua serve:

brew install ollama
ollama serve &                     # start in background
ollama pull qwen2.5-coder:7b       # ~4 GB β€” main coding specialist
ollama pull qwen2.5:7b             # ~4 GB β€” math specialist
ollama pull qwen2.5:3b             # ~2 GB β€” arbiter
ollama list                        # confirm all three are present

aua doctor will detect a missing Ollama and tell you exactly what to install. aua serve does not install Ollama automatically.

Five commands. A live routing endpoint. No GPU required to start.

bash
# 1. Install
pip install adaptive-utility-agent

# 2. Scaffold β€” Mac/CPU uses Ollama; swap --tier for GPU
aua init my-aua-project --preset coding --tier macbook
cd my-aua-project

# 3. Validate setup
aua doctor

# 4. Start
aua serve

# 5. First query (new terminal)
# Auth is disabled by default β€” the Authorization header is optional until you enable it
curl -s -X POST http://localhost:8000/query \
  -H "Content-Type: application/json" \
  -d '{"query": "Write binary search in Python", "session_id": "qs-demo"}' \
  | python3 -m json.tool

What is now running: A multi-specialist LLM router with utility scoring, contradiction detection, assertions store, rate limiting, structured logging, and a Prometheus metrics endpoint β€” all from one command. Read on to understand each piece.

Concepts The mental model ~10 min

Five minutes here saves an hour of confusion later. If you've built with FastAPI or Django, AUA will feel familiar β€” it's a config-driven framework with a request pipeline, swappable components, and an extension system. The difference: instead of routing HTTP paths to view functions, AUA routes queries to language models, scores what comes back, and learns from the result.

What happens on every query

The request lifecycle β€” POST /query, left to right
            β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€ the router process (aua serve) ───────────────────────┐
            β”‚                                                                               β”‚
 query ──▢  middleware.before_query                                                         β”‚
            β”‚   └─ rewrite / redact / reject the query                                      β”‚
            β–Ό                                                                               β”‚
            field classifier  ──▢  {"software_engineering": 0.92, "mathematics": 0.08}      β”‚
            β”‚   └─ which domain is this? (swappable via plugin)                             β”‚
            β–Ό                                                                               β”‚
            routing decision                                                                β”‚
            β”‚   β”œβ”€ top prob β‰₯ single_domain_threshold (0.75) ─▢ SINGLE: one specialist      β”‚
            β”‚   β”œβ”€ 2+ domains β‰₯ fanout_threshold (0.30) ──────▢ FANOUT: race them, arbiter  β”‚
            β”‚   β”‚                                                judges, VCG picks winner   β”‚
            β”‚   └─ nothing confident ─────────────────────────▢ ARBITER: fallback model     β”‚
            β–Ό                                                                               β”‚
            correction injection                                                            β”‚
            β”‚   └─ verified facts from past mistakes go into the system prompt             β”‚
            β–Ό                                                                               β”‚
            specialist call(s)  ──▢  Ollama / vLLM / your ModelBackend plugin               β”‚
            β–Ό                                                                               β”‚
            utility scoring  ──▢  U = w_eΒ·E + w_cΒ·C + w_kΒ·K   (swappable via plugin)        β”‚
            β”‚   └─ plus contradiction detection, policy assertions, retries                 β”‚
            β–Ό                                                                               β”‚
            middleware.after_response ──▢ hooks fire ──▢ audit log + metrics                β”‚
            β”‚                                                                               β”‚
            └──▢ response + full metadata (U score, domain, routing mode, trace IDs) ──▢ you

The vocabulary β€” eight terms carry the whole tutorial

TermWhat it is
SpecialistA model assigned to a domain. One entry in specialists: β€” a name, a model, an endpoint, and a field. "swe runs qwen2.5-coder:7b on Ollama and handles software_engineering."
FieldA domain with scoring rules attached: the utility weights, the minimum confidence to answer (c_min), and the penalty multiplier for being wrong. surgery demands 0.95 confidence and punishes errors 10Γ—; creative_writing barely cares about confidence at all. 11 fields are built in (Part 2 lists them).
RouterThe FastAPI process you start with aua serve. Owns classification, routing, scoring, corrections, persistence, and the whole REST API.
ArbiterA small, cheap model with two jobs: judge between specialists when fanout routing races them, and catch queries nothing else is confident about.
Utility functionThe score every response gets: U = w_eΒ·E + w_cΒ·C + w_kΒ·K. E (efficacy) β€” output quality: did it run, was it complete, did it earn policy bonuses. C (confidence) β€” correctness probability, updated by test signals and contradiction detection. K (curiosity) β€” a small exploration bonus for under-sampled specialists so routing doesn't tunnel-vision. The per-field weights are why a 0.6-confidence answer is fine for brainstorming and disqualifying for aviation.
CorrectionsVerified facts stored when the system (or you) catches a mistake. Injected into future prompts in the same domain β€” this is the "doesn't make it three times" machinery, and Part 4 + How-to 18 cover its full lifecycle.
Blue-greenHow model upgrades ship safely: the current model (BLUE) keeps serving while a candidate (GREEN) is scored on the same traffic; promotion happens only when GREEN's mean U beats BLUE by delta over T_min queries.
DPO exportThe exit ramp: sessions that followed your policy perfectly become preference pairs (aua dpo export) so corrections can be fine-tuned into the model permanently.

If you know Django, you know AUA

DjangoAUANotes
settings.pyaua_config.yamlOne file, strictly validated β€” a typo'd key fails at startup with the list of valid keys, never silently.
manage.pyaua CLIaua init β‰ˆ startproject, aua doctor β‰ˆ check, aua serve β‰ˆ runserver.
AppsSpecialistsUnits of capability you compose in config.
URL dispatcherField classifierDecides who handles the request β€” and it's a plugin, so you can replace it (How-to 13).
Middlewaremiddleware:Same idea, same ordering semantics: before_query top-down, after_response bottom-up.
SignalsHooks11 named points (pre_query, on_correction, on_promotion…) instead of post_save.
ORM backendsPluginsSwap the classifier, scorer, correction store, or model backend by import path β€” no framework edits.
South/migrationsBlue-green deploysSchema changes vs. model changes β€” both get a safe, reversible upgrade path.

Your path through this tutorial. Parts 1–12 are sequential β€” each builds on the last and ends with the system visibly improving itself. How-to 13–18 are standalone recipes; jump straight to the one you need. Want the expert tour in one sitting? Do Parts 1–4, then How-to 13 (plugin system), then How-to 18 (operations toolkit). Everything below was verified against a live router β€” every command shown is runnable as written.

Part 1 Install & scaffold ~8 min

1.1 Python version

Python 3.10, 3.11, or 3.12 required. If you use pyenv: pyenv local 3.11.10 before installing, or aua may not appear in your PATH.

bash
# Runtime only (Ollama / CPU)
pip install adaptive-utility-agent

# GPU backend (Linux + CUDA required)
pip install "adaptive-utility-agent[vllm]"

# Dev tools β€” tests, linting, type checks
pip install "adaptive-utility-agent[dev]"

# Verify
aua --version
aua, version 1.0.0

1.2 Scaffold a project

Pick the tier that matches your hardware. Pick the preset that matches your domain. Together they set models, field weights, routing thresholds, and observability defaults.

TierHardwareBackendNotes
macbookMac M-series / IntelOllamaBest starting point
single-40901Γ— RTX 4090 24 GBvLLM AWQProduction-grade
quad-40904Γ— RTX 4090vLLM AWQOne GPU per specialist
a100-cluster1Γ— A100 80 GBvLLM fp16Highest accuracy
PresetFields configuredUse for
codingsoftware_engineeringCode generation, dev tools
mathmathematicsProofs, computation
researchgeneral, mathematicsResearch assistance
medical-safemedicine (c_min=0.95)Medical Q&A with abstention
legal-safelaw (c_min=0.85)Legal Q&A with abstention
generalistsoftware_engineering, mathematics, generalMulti-domain assistant
bash
aua init my-aua-project --preset coding --tier macbook
cd my-aua-project

# See what init created
aua config expand

1.3 Validate and start

Auth behavior. By default, auth is disabled β€” the Authorization header is optional and all endpoints are open. This is fine for local development. To enable auth:

aua token create --scope aua:admin --expires 30d
export AUA_TOKEN="aua_tk_..."     # then include in curl: -H "Authorization: Bearer $AUA_TOKEN"

Examples throughout this tutorial show Authorization: Bearer $AUA_TOKEN. On a local dev install without auth enabled, you can omit that header entirely.

bash
aua doctor                 # PASS / FAIL / WARN per check, with fixes
aua doctor --strict        # warnings as failures β€” use in CI
aua doctor --json          # machine-readable output

aua serve                  # start specialists + router + arbiter
aua serve --with-ui        # also start Chat UI at :3001 (see note below)
aua serve --dry-run        # print commands without executing
What you can build with this
  • A working multi-model AI system running locally in under ten minutes β€” one command to scaffold, one to serve.
  • A project that's ready to customise: config, eval folder, and .gitignore all in place.
  • A pre-flight check (aua doctor) you can drop into CI to catch config problems before they reach production.

Part 2 shows how to wire in any model β€” from a frontier API to a 1.5B model on a laptop β€” and tell the framework what you want it to optimize for.

Part 1 done. You have a running AUA router. Part 2 explains specialists and fields.

Part 2 Models & fields ~12 min

2.1 What's in aua_config.yaml

aua_config.yaml β€” macbook tier, coding preset
aua:
  version: "0.5"   # version field generated by aua init β€” do not edit manually
  mode: local
  backend: ollama

specialists:
  - name: swe
    model: qwen-coder-7b-awq    # registry alias β†’ full model ID
    port: 11434
    field: software_engineering
    gpu: 0

arbiter:
  model: qwen2.5:3b
  port: 11434

router:
  port: 8000
  single_domain_threshold: 0.75
  fanout_threshold: 0.30

security:
  cors_origins: ["http://localhost:3001"]  # Chat UI port; Grafana is on :3000

state:
  backend: sqlite
  path: .aua/state/aua.db

2.2 Model registry β€” inspect aliases

bash
aua models list
  Role         Name     Model                Field                   Port    Status
  specialist   swe      qwen2.5-coder:7b     software_engineering   11434   ready
  arbiter      β€”        qwen2.5:3b           β€”                      11434   ready

# Status comes from a live check against the model server β€” "not pulled"
# means Ollama is up but the tag isn't downloaded yet

aua models inspect qwen-coder-7b-awq   # expand any registry alias to its full ID + requirements

2.3 Field registry β€” weights and thresholds

bash
aua fields list
aua fields inspect software_engineering
Fieldw_ew_cw_kc_minPenalty
surgery0.200.700.100.9510Γ—
aviation0.200.700.100.9510Γ—
law0.300.600.100.855Γ—
mathematics0.500.400.100.753Γ—
software_engineering0.550.350.100.702Γ—
creative_writing0.800.050.150.051Γ—

2.4 Bring your own model β€” end to end

The registry aliases are conveniences, not a gate. For the Ollama backend, model: takes any Ollama tag directly β€” if ollama run accepts it, AUA can serve it. Here's the complete loop for adding a reasoning specialist that isn't in any registry:

bash β€” step 1: get the model
ollama pull deepseek-r1:8b      # any tag from ollama.com/library, or your own Modelfile build
ollama list                     # confirm the tag is present β€” this exact string goes in config
aua_config.yaml β€” step 2: add a specialist entry
specialists:
  - name: swe                      # existing entry stays
    model: qwen2.5-coder:7b
    port: 11434
    field: software_engineering
    gpu: 0

  - name: reasoning                # NEW β€” your model
    model: deepseek-r1:8b          # the raw Ollama tag, exactly as `ollama list` shows it
    port: 11434                    # all Ollama models share the one Ollama server port
    field: mathematics             # must be one of the 11 built-in fields (see 2.5)
    gpu: 0
bash β€” step 3: validate, restart, verify
aua config validate              # typo'd keys or an unknown field fail here, not at runtime
aua doctor                       # confirms Ollama is up and the tag is pulled
aua serve

# Force-route to the new specialist to test it in isolation:
curl -s -X POST localhost:8000/query -H "Content-Type: application/json" \
  -d '{"query": "Prove there are infinitely many primes.", "force_domain": "mathematics"}' \
  | python3 -m json.tool

# Then drop force_domain and confirm the classifier routes math queries there naturally β€”
# the response's "primary_domain" and "specialist_responses" show who answered.

Three notes that save debugging time. One port, many models: Ollama serves every pulled model from :11434; each specialist's model: tag selects which one per request β€” so adding specialists doesn't mean adding servers. vLLM differs: with backend: vllm, each specialist is its own server process on its own port, and model: takes a HuggingFace ID (e.g. Qwen/Qwen2.5-Coder-7B-Instruct-AWQ) β€” aua models inspect shows the full ID behind each alias. Frontier APIs: to route to OpenAI/Anthropic-style endpoints, write a ModelBackend plugin (How-to 13, interface 6) β€” about twenty lines.

2.5 The fields you can target β€” and custom taxonomies

Fields are built into the framework because each carries calibrated scoring weights. The full list of 11 β€” every specialist's field: must be one of these, and aua config validate enforces it:

The 11 built-in fields
art                      creative_writing         education
general                  law                      mathematics
software_engineering     stem_research            structural_engineering
surgery                  aviation

"But my domains are insurance-claims and customer-support." You don't need new fields β€” you need a custom classifier that maps your taxonomy onto the built-in scoring profiles. Pick the field whose risk profile matches each of your domains (claims adjudication scores like law; support chat scores like general), then write a ten-line FieldClassifierPlugin (How-to 13, interface 1) that recognizes your queries and returns those fields. Your routing logic is fully yours; the scoring calibration stays sound. For dynamic taxonomies, How-to 18.8 shows how the domain ontology grows sub-domains under these roots automatically from production traffic.

What you can build with this
  • Swap any model in or out without changing application code β€” frontier API, 7B local, or a tiny 1.5B model for fast low-stakes queries.
  • Tell the framework what matters for your domain: accuracy (w_c), output quality (w_e), or exploration (w_k).
  • Set how fast domain knowledge decays β€” fast for security practices, slow for physics principles β€” so the system stays calibrated over time.
  • Turn off exploration entirely for safety-critical domains so routing is always consistent and predictable.

Part 3 shows how the routing decision actually gets made β€” and gives you the knobs to control how aggressively the system compares specialists.

Part 2 done. You understand specialists and fields. Part 3 shows how routing decisions are made.

Part 3 Routing & utility ~15 min

3.1 The routing pipeline

Every query follows this path: middleware β†’ session lookup β†’ correction retrieval β†’ field classifier β†’ routing decision β†’ specialist calls β†’ utility scoring β†’ arbiter (if needed) β†’ hooks β†’ response.

ModeTriggerWhat happens
singleOne field above single_domain_thresholdOne specialist call, utility scored
fanoutTwo+ fields above fanout_thresholdAll qualifying specialists called; best U wins
arbiterFanout returned contradictory answersArbiter resolves; correction stored; both models updated

3.2 The utility function

Every candidate response gets a single utility score before it reaches the user. The score combines three things:

  • How useful the answer appears β€” does it correctly address the query for this domain?
  • How consistent it is with prior verified knowledge β€” does it contradict things the system already knows?
  • Whether exploring this area is valuable β€” is this a domain where the system has low confidence and should weight novelty?

In practice, you rarely touch the formula directly. The defaults work well for most domains. You tune it when you want stricter answer quality (raise w_e), more caution (raise w_c), or more exploration (raise w_k).

The formal expression:

U = w_e(f)Β·E + w_c(f)Β·C + w_k(f)Β·K

  • E (Efficacy) β€” Mann-Whitney dominance probability over prior outputs [0, 1]
  • C (Confidence) β€” Kalman-filtered internal consistency, penalized per contradiction [0, 1]
  • K (Curiosity) β€” UCB exploration bonus for novel domains [capped at 50% of U]

3.3 A full query response

bash
curl -s -X POST http://localhost:8000/query \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer $AUA_TOKEN" \
  -d '{
    "query": "Write binary search in Python. State the time complexity.",
    "session_id": "demo-session"
  }' | python3 -m json.tool
Response
{
  "session_id": "demo-session",
  "trace_id": "01HXYZ...",
  "request_id": "req_abc123",
  "routing_mode": "single",
  "primary_field": "software_engineering",
  "response": "...",
  "u_score": 0.641,
  "confidence": 0.76,
  "contradictions_detected": 0,
  "corrections_injected": 1,
  "latency_ms": 287.4,
  "cost_estimate_usd": 0.00012
}

3.4 Live status and U scores

bash
aua status                 # auto-refreshing terminal UI
aua status --once          # single snapshot
aua status --json          # machine-readable

curl http://localhost:8000/status | python3 -m json.tool
What you can build with this
  • A system that routes each question to the right specialist, scores every answer with a real number, and shows you exactly why β€” not vibes.
  • Tune how aggressively the system compares multiple specialists vs. committing fast to a single one.
  • Force a specific specialist for known query types β€” useful for dedicated deployments where you already know the domain.
  • Read U scores in API responses to build your own routing analytics β€” low scores on a domain are an early signal the specialist needs attention.

Part 4 adds persistent memory: the framework learns from its mistakes and stops making the same error twice.

Part 3 done. You understand routing and utility scoring. Part 4 covers the Arbiter.

Part 4 Arbiter & corrections ~15 min

When fanout routing produces contradictory responses, the Arbiter runs four checks, issues a verdict, injects correction signals, and stores the verified claim in the assertions store.

4.1 The four arbitration checks

CheckWeightWhat it detects
Logical0.30Output contradicts its own premises
Mathematical0.40Complexity or numerical claims provably wrong
Cross-session0.20Contradicts a prior verified assertion
Empirical0.10External ground truth check β€” SymPy (maths), arXiv (SWE/STEM), PubMed (medicine/surgery)

ArbiterAgent is the live arbitration path. When two specialists disagree (fanout routing), AUA runs all four checks via the ArbiterAgent pipeline β€” logical, mathematical, cross-session, and empirical. The empirical check (SymPy, arXiv, PubMed) fires automatically based on domain; no configuration required.

Simplified LLM-only arbitration is available for scenarios where you want lower latency and don't need the formal checks β€” creative domains where all four checks are always inconclusive, or deployments where p99 latency < 500ms matters more than structured evidence chains. Switch with one config line:

router:
  arbitration_mode: "llm"   # simplified: one LLM call, VERDICT: A/B/BOTH_WRONG
                             # default: "pairwise" (uses ArbiterAgent)

Or switch at runtime without restarting: PATCH /config with body {"arbitration_mode": "llm"}. The LLM path is also the automatic fallback if ArbiterAgent raises an exception.

4.2 The four verdict cases

CaseMeaningAction
Case 1A correct, B wrongCorrect B, reinforce A, store claim
Case 2B correct, A wrongCorrect A, reinforce B, store claim
Case 3Both wrongCorrect both + open curiosity gap bonus
Case 4InconclusiveFlag for external escalation, hedge response

4.3 Using the Arbiter directly

Python
# ArbiterAgent runs automatically inside aua serve.
# Use directly in Python for testing or custom workflows:
from aua import ArbiterAgent, AssertionsStore

store = AssertionsStore()
arbiter = ArbiterAgent(store)

verdict = arbiter.arbitrate(  # sync β€” call directly
    subject="bubble_sort_complexity",
    domain="software_engineering",
    output_A="Bubble sort is O(n) average case.",
    output_B="Bubble sort is O(nΒ²) average case.",
    field_penalty_multiplier=2.0,
)

print(verdict.case.value)        # "case_1"
print(verdict.verified_claim)    # "Bubble sort is O(nΒ²) average case."
print(verdict.external_response) # safe to return to user

4.4 The assertions store β€” decay classes

ClassDecayUsed for
ANeverMathematical proofs, algorithm complexity
B10 yearsClassical physics, structural engineering
C3 yearsMedicine, law, architecture
D6 monthsSecurity CVEs, clinical guidelines, ML benchmarks

4.5 Manual corrections via REST

bash
curl -X POST http://localhost:8000/corrections \
  -H "Authorization: Bearer $AUA_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "subject": "heapsort_complexity",
    "domain": "software_engineering",
    "claim": "Heapsort is O(n log n) worst-case, O(1) extra space.",
    "confidence": 0.99
  }'

Corrections are global in v1.0. A correction stored via POST /corrections is a verified fact about the world β€” it is injected into every future query on that subject, across all users and sessions. For internal tools, dedicated agents, and single-tenant deployments this is the intended behaviour. For multi-tenant products where users need isolated correction contexts, per-user scoping is a v1.1 roadmap item.

4.6 VCG welfare maximization β€” picking the best specialist

The default arbitration mode runs the 4-check arbiter in full verdict mode between two specialists. VCG mode replaces pairwise selection with welfare maximization: after all specialists in the fanout respond, the router computes a welfare score for each one and picks the highest β€” regardless of how many specialists are competing. The 4-check arbiter still runs, but only for contradiction detection and DPO pair accumulation; it does not override the VCG winner.

This is the mechanism tested in the RTX 4090 hardware pilot (Appendix A.1): Arm D (VCG routing) vs Arm A (no routing) showed a +43.4 pp correctness gain (95 % CI [19.5, 61.2] pp; Fisher exact p = 0.0009; Cohen's d = 1.02; n = 30/arm). The direction of the effect is robust; the interval is wide at this sample size, so treat it as a strong directional signal rather than a precise point estimate.

How the welfare score is computed

The formula has two layers. The outer layer is a multi-domain convex combination β€” because a query can straddle domains, the welfare score integrates over the classifier's probability distribution rather than committing to a single domain:

Welfare formula (router._vcg_welfare)
W_i(q) = Ξ£_j  p(j|q) Β· effective_u(i, j)

  i              β€” specialist (e.g. "swe", "math")
  j              β€” domain (e.g. "software_engineering", "mathematics")
  p(j|q)         β€” field classifier probability for domain j on query q
                   only domains with p(j|q) β‰₯ 0.05 contribute (below that
                   the term is negligible and the DB lookup is wasteful)
  effective_u(i,j) β€” shrinkage-corrected win-rate (see below)

Winner           β€” argmax W_i(q) across all specialists in the fanout
Tie-breaking     β€” raw confidence returned by the specialist, then P(top domain)

The inner layer is the shrinkage-corrected win-rate (Efron–Morris estimator, Lemma B.8.1). When a specialist has seen many queries in domain j the raw win-rate is reliable; when it has seen only a few, the estimate is pulled toward the global cross-domain prior to prevent early flukes from dominating:

Shrinkage estimator (router._vcg_effective_u)
effective_u(i, j) = (n_ij Β· Γ»_ij  +  N_cliff Β· Ε«) / (n_ij + N_cliff)

  n_ij     β€” number of past queries where specialist i answered in domain j
  Γ»_ij     β€” raw win-rate: fraction of those queries where i was the VCG winner
  N_cliff  β€” Efron-Morris pseudo-count  (default: 10)
  Ε«        β€” global cross-domain prior  (default: 0.65)

Cold start  (n_ij = 0):   effective_u = 0.65   ← pure prior
Converging  (n_ij = 10):  effective_u β‰ˆ halfway between Γ» and 0.65
Converged   (n_ij β†’ ∞):   effective_u β†’ Γ»      ← pure observed win-rate

Data source: model_runs table (vcg_winner flag, specialist and domain columns).
Falls back to Ε« = 0.65 if the DB lookup fails.

Why shrinkage? Without it, a specialist that wins its very first query in a new domain gets effective_u = 1.0 β€” instantly beating every established specialist. The Efron-Morris estimator prevents this: N_cliff = 10 means a new specialist needs roughly 10 wins before its estimate clearly separates from the prior. The prior Ε« = 0.65 matches the empirical cross-domain baseline from the hardware pilot (Arm B matched-routing accuracy).

A worked example

Query: "Is this gradient descent implementation correct? It claims O(n) but loops over all weights." The classifier splits probability mass across two domains.

Step 1 β€” classifier distribution
p("software_engineering" | q) = 0.72
p("mathematics"          | q) = 0.28
(all other domains below 0.05 threshold β€” skipped)
Step 2 β€” effective_u per (specialist, domain)
                     swe specialist       math specialist
                     ─────────────────    ─────────────────
software_engineering n=18, wins=14        n=3,  wins=2
                     Γ» = 14/18 = 0.778    Γ» = 2/3  = 0.667
  effective_u =   (18Β·0.778 + 10Β·0.65)   (3Β·0.667 + 10Β·0.65)
                  ──────────────────── =  ─────────────────── = 0.711
                         18 + 10                  3 + 10
                       = 0.732

mathematics          n=2,  wins=1         n=31, wins=24
                     Γ» = 1/2  = 0.500     Γ» = 24/31 = 0.774
  effective_u =   (2Β·0.500 + 10Β·0.65)    (31Β·0.774 + 10Β·0.65)
                  ──────────────────── =  ─────────────────── = 0.759
                          2 + 10                  31 + 10
                        = 0.625
Step 3 β€” welfare scores (convex combination)
W_swe  = 0.72 Β· 0.732  +  0.28 Β· 0.625  =  0.527  +  0.175  =  0.702
W_math = 0.72 Β· 0.711  +  0.28 Β· 0.759  =  0.512  +  0.213  =  0.725

Winner: math specialist  (W = 0.725 > 0.702)

The classifier slightly favoured swe (72 % probability), but the math
specialist's stronger domain-specific track record (31 wins vs 18) overcame
the classifier advantage. This is the key property VCG mode adds over
"always route to the most probable domain".

How the prior builds up over time

Every VCG query writes one row per specialist to the model_runs table with vcg_winner (boolean), vcg_welfare_score, specialist, and domain. The next query reads those rows to compute effective_u. There is no separate training phase β€” the prior accumulates automatically from production traffic.

What gets persisted per query (model_runs row)
{
  "run_id":            "uuid-...",
  "specialist":        "math",
  "domain":            "mathematics",
  "round":             "answer",
  "vcg_winner":        true,          ← used to compute Γ»_ij on future queries
  "vcg_welfare_score": 0.725,         ← W_i for this query
  "confidence_score":  0.81,
  "utility_score":     0.744,
  "conversation_id":   "uuid-...",
  "latency_ms":        312.4
}

After about 10–15 queries per specialist-domain pair (the N_cliff = 10 convergence point) the effective_u estimates stabilise and VCG routing becomes meaningfully better than the cold-start prior. In the hardware pilot this threshold was crossed within the first 30 queries per arm.

Reading the prior from the analytics API

Per-specialist win-rate and welfare snapshot
# Current snapshot: win rates and average welfare per specialist
curl http://localhost:8000/analytics/specialists

{
  "specialists": {
    "swe": {
      "total_runs":        42,
      "vcg_wins":          31,
      "win_rate":          0.738,
      "avg_welfare_score": 0.701,
      "avg_utility_score": 0.731
    },
    "math": {
      "total_runs":        38,
      "vcg_wins":          27,
      "win_rate":          0.711,
      "avg_welfare_score": 0.724,
      "avg_utility_score": 0.748
    }
  }
}

# Welfare trajectory over time (useful for spotting drift)
curl http://localhost:8000/analytics/welfare-trajectory
Python β€” monitor welfare convergence
import requests

r = requests.get("http://localhost:8000/analytics/specialists")
stats = r.json()["specialists"]

N_CLIFF = 10  # effective_u convergence threshold
print(f"{'Specialist':<12} {'Runs':>6} {'Win%':>6} {'Avg W':>7} {'Status':>12}")
print("-" * 48)
for name, s in sorted(stats.items()):
    status = "converged" if s["total_runs"] >= N_CLIFF else "warming up"
    print(
        f"{name:<12} {s['total_runs']:>6} "
        f"{s['win_rate']*100:>5.1f}% {s['avg_welfare_score']:>7.4f} "
        f"{status:>12}"
    )

# After 40+ queries per specialist:
# Specialist    Runs   Win%   Avg W       Status
# ------------------------------------------------
# math            38  71.1%  0.7240    converged
# swe             42  73.8%  0.7010    converged

Activating VCG

Option 1 β€” YAML (persistent, recommended for production)
router:
  arbitration_mode: vcg   # "pairwise" (default) | "vcg"
Option 2 β€” CLI flag (session-only, good for A/B testing)
aua serve --arbitration-mode vcg
Option 3 β€” REST hot-swap (no restart required)
# Enable VCG
curl -X PATCH http://localhost:8000/config \
  -H "Content-Type: application/json" \
  -d '{"arbitration_mode": "vcg", "persist": true}'

{"patched": {"arbitration_mode": "vcg"}, "persisted": true}

# Revert to pairwise
curl -X PATCH http://localhost:8000/config \
  -d '{"arbitration_mode": "pairwise", "persist": true}'

What changes in the response envelope

VCG response β€” routing_mode and welfare_scores fields
curl -s -X POST http://localhost:8000/query \
  -d '{"query": "Is gradient descent O(n) per step?"}'

{
  "routing_mode":    "vcg",                     ← "fanout" in pairwise mode
  "primary_domain":  "mathematics",
  "response":        "Per-step complexity is O(n) where n = number of params...",
  "u_score":         0.744,
  "welfare_scores": {                           ← present only in VCG mode
    "math": 0.7250,                             ← winner
    "swe":  0.7020
  }
}
Python β€” read welfare scores and winner
r = requests.post("http://localhost:8000/query", json={
    "query": "Is the gradient descent update O(n) per step?"
})
data = r.json()

if data["routing_mode"] == "vcg":
    scores = data["welfare_scores"]
    winner = max(scores, key=scores.get)
    print(f"VCG selected: {winner}")
    for name, w in sorted(scores.items(), key=lambda x: -x[1]):
        marker = " ← winner" if name == winner else ""
        print(f"  {name}: W = {w:.4f}{marker}")

When to use VCG vs pairwise

ScenarioRecommended modeWhy
Single specialist, queries rarely trigger fanoutPairwise (default)VCG fires only when fanout does; no benefit from switching
2–3 specialists, traffic just startingPairwise first, then VCGWait until ~10 queries per specialist per domain; switch via REST without restart
2+ specialists with 10+ queries of historyVCGShrinkage estimates are reliable; track record differentiates specialists the classifier treats as equally likely
3+ specialists, overlapping domainsVCGMulti-domain convex combination handles ambiguous queries; pairwise can only compare two
Safety-critical domain, need full 4-check verdictPairwisePairwise runs the arbiter in full verdict mode (case_1–4); VCG uses it for contradiction detection only

Gradual rollout pattern. Start with pairwise to accumulate model_runs history. After ~50 queries across your specialists, switch to VCG via PATCH /config β€” no restart, no disruption. Watch /analytics/specialists for win-rate divergence; once one specialist is clearly ahead, VCG's welfare scores will reflect that on every query.

What VCG does not do. VCG selects the best specialist from those that responded β€” it does not re-route if all specialists perform poorly. If every welfare score is near the prior (0.65), the specialists haven't accumulated enough history yet; check total_runs in /analytics/specialists. VCG also does not guarantee the incentive-theoretic properties of Theorems S1–S3 in deployment β€” those hold for the idealized mechanism where specialists report explicit utility bids; here welfare is inferred from historical win-rates (see Appendix A.1.2 and Β§B.8 remark).

What you can build with this
  • Self-improving routing. Every VCG query makes the next one slightly better. A specialist with a bad run accumulates losses in model_runs; its effective_u falls and the router naturally routes away from it without any manual intervention.
  • Interpretable decisions. Every response includes per-specialist welfare scores. Log them and you get a complete audit trail of exactly why each query went where it did.
  • Graceful cold starts. The shrinkage prior (0.65) means a brand-new specialist competes at a fair baseline β€” not zero. It won't dominate until it earns it, but it won't be shut out on day one either.
  • Hot A/B testing. Run pairwise on one process and VCG on another; compare U scores in their respective /analytics endpoints before committing to either mode in production.

Part 5 shows how U scores gate model promotion β€” the model_runs welfare trajectory VCG builds is an input to the blue-green promotion decision.

Part 4 done. You understand the Arbiter, corrections, and VCG welfare maximization. Part 5 shows how U scores gate model promotion.

Part 5 Blue-green deployment ~20 min

BLUE is in production. GREEN is the candidate evaluated via shadow mode or a synthetic eval run. When GREEN's U score exceeds BLUE by delta and at least T_min queries have been evaluated, the promotion gate opens. Promotion is triggered manually via POST /deploy/green or through shadow mode accumulation β€” see How-to 20 for the full workflow.

5.1 Promotion thresholds

aua_config.yaml
blue_green:
  swe:
    delta: 0.025    # GREEN must beat BLUE by at least +2.5% U
    T_min: 10       # minimum queries before promotion is considered (gate)

router:
  tau: 1.0          # routing softmax temperature: 1.0=off, <1=sharper, >1=softer

T_min is a minimum-sample gate. If you trigger POST /deploy/green before accumulating T_min shadow queries, the router returns promoted: false with a "PROMOTION DEFERRED β€” T_min gate" message. Accumulate more shadow traffic first.

tau (in the router: block, not blue_green:) is a softmax temperature applied to the field classifier's probability distribution before routing thresholds are checked:

  • tau < 1.0 β€” sharpens routing: the highest-probability domain gets boosted, making single-specialist routing more likely
  • tau > 1.0 β€” softens routing: probabilities spread more evenly, increasing fanout and arbiter traffic
  • tau = 1.0 β€” no effect (default)

5.2 Promote and rollback

bash
# Trigger blue-green promotion check via REST API
curl -X POST http://localhost:8000/deploy/green \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer $AUA_TOKEN" \
  -d '{"specialist": "swe", "new_model": "qwen2.5-coder:14b"}'

# Check status
aua status --once

# Roll back to previous BLUE
aua rollback --specialist swe
aua rollback --specialist swe --yes     # skip confirmation
aua rollback --no-restart               # update config only, no restart
aua rollback --all --yes                # roll back all specialists

5.3 Using BlueGreenDeployment in Python

Python
from aua import BlueGreenDeployment
from aua.config import load_config

config = load_config("aua_config.yaml")
bg = BlueGreenDeployment(config, specialist_name="swe")

bg.register_green("models/swe-green-v2/")

import asyncio
summary = asyncio.run(bg.evaluate(n_queries=10))  # async β€” use asyncio.run() from sync context
print(f"GREEN U mean: {summary.green_u_mean:.3f}")
print(f"BLUE  U mean: {summary.blue_u_mean:.3f}")

if bg.should_promote(summary):
    bg.promote()
    print("GREEN promoted to BLUE")

5.4 The promotions log

Every promotion is recorded atomically to .aua/state/promotions.jsonl with a UUID, timestamp, and U scores. File-locked to prevent concurrent corruption.

What you can build with this
  • Upgrade your AI models the way you upgrade software β€” safely, with a rollback button and a promotion gate.
  • Test a new model on real traffic before it touches production: deploy as GREEN, evaluate, promote only if U score delta passes the threshold.
  • Revert a bad upgrade in one command β€” no redeployment, no config archaeology.
  • Set different promotion thresholds per specialist β€” conservative for customer-facing models, aggressive for internal experimental ones.

Part 6 makes the whole system easier to operate β€” config changes that take effect in seconds without restarting anything.

Part 5 done. Part 6 covers the config system β€” hot reload, validation, and config commands.

Part 6 Config system ~15 min

6.1 Config commands

bash
aua config validate           # strict schema check β€” catches typos, dupe ports, bad ranges
aua config expand             # full resolved config with all defaults filled in (secrets redacted)
aua presets list              # list all built-in presets
aua presets inspect coding    # full preset config

6.2 Hot reload β€” no restart needed

Not all config changes take effect without restarting. The rule:

Hot-reloadable (aua config reload)Requires restart (aua serve)
Routing thresholdsModel path or name
Utility weightsSpecialist port
Logging levelBackend (vllm ↔ ollama)
CORS originsGPU assignment
Rate limitsmTLS certificate paths
Arbiter thresholdsNew specialist added/removed
bash
aua config reload             # sends SIGHUP to running router
kill -HUP $(cat .aua/pids/router.pid)  # same effect
Hot-reloadable (no restart)Requires restart
routing thresholdsmodel name or path
promotion delta / T_min / tauspecialist port
logging level, rate limitsGPU assignment
cors_originsbackend (vllm ↔ ollama)

6.3 Config versioning and migration

bash
# Config versioning: the aua.version field tracks schema compatibility.
# v1.0 does not provide automatic migration β€” update config manually if upgrading.
aua config validate   # validates your config against the current schema
aua config expand     # shows full resolved config with all defaults applied
What you can build with this
  • Change routing thresholds, utility weights, and CORS settings on a live system β€” no downtime, no restarts.
  • Version your config in git and use aua config validate as a pre-commit hook so schema errors never reach production.
  • See exactly what the running system is doing with aua config expand β€” no surprises from implicit defaults.
  • A config-driven system your whole team can modify safely, not one that lives in a single engineer's head.

Part 7 shows how every mistake the system makes in production automatically becomes training material for the next version.

Part 6 done. Part 7 covers the correction loop and DPO pair export.

Part 7 Correction loop & DPO export ~15 min

Every contradiction the Arbiter resolves produces a DPO training pair. The correction loop accumulates these pairs and exports them for fine-tuning your specialists.

7.1 Export corrections via CLI

bash
# Export all verified corrections as JSONL
aua corrections export --format jsonl

# Export as preference pairs for DPO training
aua dpo export --format preference-pairs

# With redaction (remove PII from prompts)
aua dpo export --format preference-pairs --redact

7.2 DPO pair format

v1.0 DPO pair status. In v1.0, DPO pairs are generated from corrections with a confirmed chosen answer. The rejected side is populated when the Arbiter identifies a clearly wrong response; for corrections injected manually (e.g. via POST /corrections), the rejected field is empty and must be filled before training. Case 4 (inconclusive arbiter outcomes) never produces a pair. Full chosen+rejected pair generation is a v1.1 item.

dpo_pairs_*.jsonl
{
  "query": "bubble_sort_complexity",
  "chosen": "Bubble sort is O(nΒ²) average case.",
  "rejected": "Bubble sort is O(n) average case.",
  "field": "software_engineering",
  "utility_chosen": 0.72,
  "utility_rejected": 0.41,
  "correction_ids": ["corr_abc123"],
  "trace_id": "01HXYZ..."
}

7.3 Using CorrectionLoop in Python

Python
import asyncio
from aua import CorrectionLoop
from aua.config import load_config

config = load_config("aua_config.yaml")
loop = CorrectionLoop(config, router_url="http://localhost:8000")

async def main():
    pairs = await loop.collect_pairs(min_confidence=0.8)
    print(f"Collected {len(pairs)} pairs")
    summary = loop.export_pairs(pairs, output_dir="dpo_pairs")
    print(f"Exported to: {summary.output_path}")

asyncio.run(main())

7.4 Using field penalty weights in training

Python
from aua import FIELD_CONFIGS

for pair in pairs:
    cfg = FIELD_CONFIGS.get(pair.field, FIELD_CONFIGS["general"])
    loss_weight = cfg.penalty_multiplier  # 2Γ— for SWE, 10Γ— for surgery
    # pass loss_weight to your DPO trainer's per-sample weight
What you can build with this
  • Every mistake your AI makes in production is automatically becoming training data for the next version β€” without manual labelling.
  • A fine-tuning dataset built from real traffic: the things your actual users asked, and what the right answer was.
  • Domain-filtered DPO pairs so coding corrections train the coding specialist and don't pollute math training data.
  • A closed loop: production error β†’ correction stored β†’ DPO pair exported β†’ model fine-tuned β†’ mistake doesn't recur.

Part 8 adds a quality gate β€” catch regressions automatically before a new model ever reaches users.

Part 7 done. Part 8 covers the evaluation harness β€” automated quality measurement and regression detection.

Part 8 Eval harness ~20 min

The eval harness routes YAML test datasets through the live framework, scores outputs with the utility function, detects regressions, and produces structured JSON reports. It's the gate for blue-green promotion and CI.

8.1 Built-in smoke datasets

bash
ls evals/
coding_smoke.yaml   math_smoke.yaml    routing_smoke.yaml
correction_smoke.yaml   arbiter_smoke.yaml   safety_smoke.yaml

# Run the coding smoke suite
aua eval run --dataset evals/coding_smoke.yaml --config aua_config.yaml

# View the report
aua eval report .aua/evals/latest.json

# Compare blue vs green
aua eval compare --baseline blue --candidate green

8.2 Dataset format

Property checks run against the response text. Supported check types in v1.0:

Property keyValueWhat it checks
containsstringCase-insensitive substring match
contains_any[string, ...]At least one substring present
not_containsstringSubstring must NOT appear
min_lengthintResponse character count β‰₯ N
expected_domainstringRouting domain must equal this
expected_domain_any[string, ...]Routing domain must be one of these

Regex, LLM-judge, and custom Python validators are not supported in v1.0.

evals/coding_smoke.yaml
name: coding_smoke
field: software_engineering
cases:
  - id: binary_search
    prompt: "Implement binary search in Python. State time complexity."
    expected_properties:
      - "O(log n)"
      - "def binary_search"
      - correctness: true

  - id: bubble_sort_complexity
    prompt: "What is the average-case time complexity of bubble sort?"
    expected_properties:
      - "O(nΒ²)"

8.3 Eval report

bash
aua eval report .aua/evals/latest.json
Output
Eval run: coding_smoke  2026-05-11T14:30:22Z
Cases:     8 total Β· 7 passed Β· 1 failed
U mean:    0.638 (baseline: 0.601)  β–² +6.2%
Regressions: 0

FAILED:
  merge_sort_stability β€” expected "stable" in response
  U score: 0.41 (threshold: 0.45)

8.4 CI integration

.github/workflows/eval.yml
- name: Run AUA eval
  run: |
    aua eval run \
      --dataset evals/coding_smoke.yaml \
      --config aua_config.yaml \
      --json > .aua/evals/ci_result.json
  # check exit code: 0 = pass, 1 = failure
What you can build with this
  • A quality gate that catches AI regressions the same way unit tests catch code bugs β€” automatically, on every model change.
  • Promote new models with a number, not a feeling: aua eval compare gives you a quantitative diff between baseline and candidate.
  • Custom eval datasets for your domain β€” not generic benchmarks, but the exact questions and quality criteria that matter to your users.
  • CI integration so any model change that causes a quality drop fails the pipeline before it touches anyone.

Part 9 gives you a full UI to demo all of this β€” a private ChatGPT-like product backed entirely by your own models.

Part 8 done. Part 9 covers the Chat UI and how to use the Framework Debugger.

Part 9 Chat UI ~15 min

AUA ships a Next.js 14 Chat UI at apps/aua_chat/. It requires Node.js 18+ and runs as a separate process from the AUA router.

9.0 Prerequisites

bash β€” check Node.js
node --version   # must be 18+
npm --version

# Install Node.js if missing:
brew install node          # macOS
# or download from https://nodejs.org

9.1 Starting the full stack

Package user vs. repo contributor. aua init does not scaffold a Chat UI β€” the UI lives in the AUA source repo. Package users launch it through the CLI; repo contributors can run the Next.js dev server directly.

Package user β€” CLI launch (recommended)

Open two terminals:

Terminal 1 β€” AUA router
cd my-aua-project
aua serve --tier macbook        # Mac / Apple Silicon + Ollama
# aua serve                     # Linux / RTX 4090 + vLLM
Terminal 2 β€” Chat UI
aua ui                          # starts on http://localhost:3001
# Or combined: aua serve --tier macbook --with-ui

Repo contributor β€” Next.js dev server

If you have cloned the source repo and want to edit the UI:

Terminal 2 β€” Next.js dev server (source repo only)
cd Adaptive-Utility-Agent/apps/aua_chat
npm install          # first run only
npm run dev          # starts on http://localhost:3001

Open http://localhost:3001 β€” sign in with admin / aua-admin.

Local development credentials only. The default admin / aua-admin credentials are for local use. Change them via the AUA_UI_ADMIN_PASSWORD environment variable before exposing the UI beyond localhost. In production, disable the dev login and use token-based auth instead.

Note on aua serve --with-ui. This flag attempts to start the Chat UI automatically in the background. It works when npm is on your system PATH (standard Linux/Docker installs). On macOS with nvm or homebrew, node may not be on the PATH that background processes see, causing the UI to silently fail to start. If you see no Chat UI after --with-ui, use the two-terminal approach above β€” it always works. The UI log is at .aua/logs/ui.log if you want to diagnose the background start.

9.2 Three-zone layout

ZoneContents
Left β€” Session sidebarAll sessions, search, new session
Center β€” Chat windowMessages, streaming responses, send bar
Right β€” Framework DebuggerRouting decision, utility breakdown, arbiter output, latency, cost, trace link

9.3 AUA Controls drawer

Click AUA Controls (left edge of the screen) to open the configuration drawer. Change routing thresholds, utility weights, arbiter policy, corrections, blue-green status, and observability settings β€” all without restarting. Uses aua config reload under the hood.

9.4 Chat Session API

bash β€” Session API (also used by the UI)
# Create a session
curl -X POST http://localhost:8000/sessions \
  -H "Authorization: Bearer $AUA_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{"name": "my-coding-session"}'

# Post a message (streaming)
curl -X POST http://localhost:8000/sessions/{id}/stream \
  -H "Authorization: Bearer $AUA_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{"query": "Explain quicksort"}'

# List all sessions
curl http://localhost:8000/sessions \
  -H "Authorization: Bearer $AUA_TOKEN"

9.5 SSE streaming event types

EventWhen fired
routeRouting decision made β€” field, mode, specialists
specialist_startSpecialist call begins
chunkEach token streamed from specialist
specialist_doneU score, latency for this specialist
arbiter_doneVerdict case, corrections stored
doneFull response + metadata
errorAUA_* error code + trace ID

Framework Debugger tip: Every query in the UI shows the full routing trace β€” which specialist was called, intermediate U scores, whether the Arbiter fired, and a link to the OTEL trace in Jaeger or Tempo if observability is configured.

9.6 UI screenshots

Five states to know before you start. Each accordion shows a screenshot with annotations.

Screenshot 1 β€” Empty state (three-panel layout)
πŸ“Έ Screenshot: ui_empty.png
Sidebar Β· Chat panel Β· Framework Debugger (mint)

The three-panel layout on startup. Left: session sidebar (#fafaf8). Centre: chat area. Right: mint-green Framework Debugger showing "Send a message to see routing debug info". The βš™ AUA Controls button is in the top-right corner.

Screenshot 2 β€” Single specialist routing
πŸ“Έ Screenshot: ui_single.png
Debugger showing domain, U score, confidence after a coding query

After sending "Write binary search in Python." The debugger shows: Domain (software_engineering), Mode (single), U Score (0.7xxx), Confidence, Latency, and the Classifier Output bar chart. The response appears in the chat panel with assistant formatting.

Screenshot 3 β€” AUA Controls drawer (with VCG toggle)
πŸ“Έ Screenshot: ui_controls.png
Controls drawer open showing Arbitration Mode toggle

The amber AUA Controls drawer open. At the top: the Arbitration Mode section with Pairwise / VCG segmented toggle. With a single specialist, VCG is greyed with tooltip "Requires at least 2 specialists in aua_config.yaml". Below: routing threshold sliders, Config Management reload button, and Live Config table showing backend, port, version, and active models.

Screenshot 4 β€” VCG fanout query (indigo debugger)
πŸ“Έ Screenshot: ui_vcg.png
Indigo Framework Debugger β€” VCG with welfare scores per specialist

With VCG enabled and 2+ specialists, a cross-domain query triggers fanout routing. The debugger shifts from mint-green to indigo. The header reads "Framework Debugger β€” VCG" with a purple "vcg" badge. A new "VCG Welfare Scores" section shows W_i per specialist β€” the winner is highlighted with a box border, bold text, and βœ“ indicator. Runners-up shown at reduced opacity. The Utility Breakdown note reads "Winner selected by welfare maximization".

Screenshot 5 β€” Policy active (INFO assertion bonus)
πŸ“Έ Screenshot: ui_policy.png
Debugger showing U score boost from INFO assertion firing

With an active policy containing INFO assertions (e.g. AnalogyBonus), a response that uses an analogy fires the positive assertion. The U score in the debugger is higher than the base score β€” the E bonus is applied. The routing mode remains "single" or "vcg" depending on config. Use aua logs assertions --filter passed=true to see which assertions fired.

Adding your own screenshots: Take screenshots of your running UI at http://localhost:3001, save to docs/screenshots/ (create the folder), then replace the placeholder <div class="ss-placeholder"> blocks with <img src="../docs/screenshots/ui_empty.png" alt="AUA Chat UI empty state"> etc.

What you can build with this
  • A complete, private AI product β€” a chat interface backed entirely by your own models, no data leaving your environment.
  • A way to show stakeholders every routing decision in plain language: which specialist answered, what score it got, whether the Arbiter stepped in.
  • Adjust routing and config from the UI β€” no terminal, no restarts.
  • Everything from Parts 1–9 in a single interface: routing, corrections, blue-green status, and U scores, all visible at once.

Part 10 is where the framework starts shaping itself to your definition of good output β€” and your definition becomes a curriculum.


Part 10 Policies & Assertions β€” Design your AI over time ~25 min

This is the most powerful section of the tutorial. By the end, you'll understand how to teach the framework what "good output" means β€” and how it uses that definition to block bad responses in real-time, track model reputation over sessions, and automatically identify gold-standard training data for fine-tuning.

The core idea. Instead of writing a long system prompt and hoping the model follows it, you write a Policy β€” a versioned, portable definition of what good output looks like. The framework enforces it in real-time, tracks adherence over every session, and eventually makes the defined behavior permanent through fine-tuning. Your policy becomes the model's curriculum.

10.1 The three assertion levels

Every assertion has a level that determines what happens when it fires:

LevelWhat it doesEffect on U score
BLOCKINGFails β†’ error injected back into prompt β†’ specialist retried up to max_retries (default 3). User never sees a response that violates this.U penalty if all retries exhausted
SOFTFails β†’ logged to assertion_events, response passes through. Use for guardrails you want to track without enforcing.No U change β€” logged only
INFOAlways passes. When condition fires (returns a message), adds +bonus to the Efficacy (E) score. Use for positive/incentive assertions.E_final = min(1.0, E_base + bonus)

10.2 Writing your first assertion

mypackage/policies.py
from aua.guard import assertion, AssertionLevel

# ── Guardrail: block syntax errors from ever reaching the user ─────────────
@assertion(name="PythonSyntaxCheck", level=AssertionLevel.BLOCKING)
def validate_python_syntax(output: str, context: dict) -> tuple[bool, str | None]:
    """Blocks output if any Python code block contains syntax errors."""
    import ast, re
    blocks = re.findall(r"```python(.*?)```", output, re.DOTALL)
    if not blocks:
        return True, None   # no code block β€” pass through
    for block in blocks:
        try:
            ast.parse(block)
        except SyntaxError as e:
            return False, f"Syntax error at line {e.lineno}: {e.msg}"
    return True, None

# ── Guardrail: soft-flag refusals without blocking ─────────────────────────
@assertion(name="NoAIisms", level=AssertionLevel.SOFT)
def no_ai_isms(output: str, context: dict) -> tuple[bool, str | None]:
    """Soft-flags common 'AI-isms' like 'as an AI language model'."""
    phrases = ["as an ai", "as a language model", "i cannot help with"]
    found = next((p for p in phrases if p in output.lower()), None)
    if found:
        return False, f"AI-ism detected: '{found}'"
    return True, None

10.3 Positive assertions β€” rewarding gold-standard behaviour

Negative assertions block bad output. Positive assertions reward exceptional output β€” and this is what feeds the fine-tuning pipeline. Sessions where positive assertions fire get the highest U scores and are automatically selected as "chosen" in your DPO export.

mypackage/policies.py (continued)
@assertion(name="AnalogyBonus", level=AssertionLevel.INFO, bonus=0.10)
def reward_analogy(output: str, context: dict) -> tuple[bool, str | None]:
    """Rewards responses that use analogies to explain concepts."""
    phrases = ["like a", "similar to", "imagine a", "think of it as", "just like"]
    if any(p in output.lower() for p in phrases):
        return True, "Positive: analogy used for clarity"
    return True, None   # neutral β€” no bonus if condition not met

@assertion(name="SocraticEnding", level=AssertionLevel.INFO, bonus=0.08)
def reward_question_ending(output: str, context: dict) -> tuple[bool, str | None]:
    """Rewards responses that end with an engaging question."""
    if output.strip().endswith("?"):
        return True, "Positive: Socratic engagement"
    return True, None

@assertion(name="PythonSyntaxBonus", level=AssertionLevel.INFO, bonus=0.12)
def reward_clean_code(output: str, context: dict) -> tuple[bool, str | None]:
    """Rewards syntactically clean Python with a bonus (stack with syntax check)."""
    import ast, re
    blocks = re.findall(r"```python(.*?)```", output, re.DOTALL)
    if blocks:
        try:
            for b in blocks:
                ast.parse(b)
            return True, "Positive: clean executable Python"
        except SyntaxError:
            pass
    return True, None

Option B bonus cap. Each INFO assertion contributes its declared bonus independently. The sum is capped by max_total_bonus on the Policy (default 0.30), then hard-capped at 0.50. A session where all three INFO assertions above fire adds up to 0.30 to E β€” a meaningful signal that this session is gold-standard.

10.4 Bundling into a Policy

A Policy is a versioned bundle that groups assertions, sets retry limits, and optionally shifts utility weights when active. Think of it as a Django settings.py for your AI's behaviour.

Python
from aua.policy import Policy

# Bundle guardrails + incentives into one named Policy
coding_policy = Policy(
    name="SafeCoding",
    version="1.0",
    max_retries=3,          # BLOCKING retries before giving up
    max_total_bonus=0.30,   # cap on total E bonus (Option B)
    utility_overrides={
        "w_k": 0.30,        # slightly raise curiosity weight for this policy
    }
)

# Add assertions β€” chaining supported
coding_policy.add(validate_python_syntax)   # BLOCKING
coding_policy.add(no_ai_isms)               # SOFT
coding_policy.add(reward_analogy)           # INFO +0.10
coding_policy.add(reward_clean_code)        # INFO +0.12

# Inspect before applying
print(coding_policy.summary())

10.5 YAML policy file (recommended for production)

policies/safe_coding.yaml
name: SafeCoding
version: "1.0"
max_retries: 3
max_total_bonus: 0.30
assertions:
  - import_path: mypackage.policies:validate_python_syntax
    # level defaults to what's declared on the @assertion decorator
  - import_path: mypackage.policies:no_ai_isms
  - import_path: mypackage.policies:reward_analogy
    bonus: 0.10          # override decorator default
  - import_path: mypackage.policies:reward_clean_code
    bonus: 0.12
utility_overrides:
  w_k: 0.30

10.6 Applying a policy via CLI

bash
# Validate schema before applying
aua policy validate policies/safe_coding.yaml
# βœ“ policies/safe_coding.yaml is valid

# Preview β€” see what would be activated
aua policy apply policies/safe_coding.yaml --dry-run

# Activate β€” writes pointer to .aua/active_policy
aua policy apply policies/safe_coding.yaml
# βœ“ Policy activated. Restart or hot-reload to apply.

# List all policies in policies/
aua policy list

# Test a single assertion against sample output
aua guard list
aua guard test --import-path mypackage.policies:validate_python_syntax
aua guard test --import-path mypackage.policies:reward_analogy \
    --output "Think of it as a balanced binary tree."

10.7 The three-layer learning loop

Once a policy is active, the framework creates a feedback loop that progressively shapes model behaviour β€” no manual intervention required:

Layer 1 β€” Immediate (milliseconds). BLOCKING assertions fire on every response. If PythonSyntaxCheck fails, the error is injected back into the prompt and the specialist retries. The user only ever sees syntactically valid code.

Layer 2 β€” Session-by-session. Every assertion result is stored in assertion_events with a timestamp. Specialists that consistently fail assertions accumulate lower mean U scores. Lower U scores mean they don't meet the blue-green promotion delta threshold β€” a model that can't follow your policy doesn't advance to BLUE.

Layer 3 β€” Calibration (on demand). Run aua calibrate --layer 3 to export sessions where all INFO assertions fired and no BLOCKING assertion exhausted retries. These are your gold-standard sessions β€” ready as DPO "chosen" examples for fine-tuning. After fine-tuning, the defined behaviours are baked into the model weights, and the assertions become less necessary over time.

What you can build with this
  • Bad output blocked before users ever see it β€” your guardrails run on every response, automatically.
  • A system that rewards the behaviours you want: every session that meets your gold standard is automatically flagged as training data.
  • Domain-specific personalities: strict and cautious for legal queries, curious and expressive for creative ones β€” all from one YAML file.
  • The start of a feedback loop: every failure you define an assertion for is a failure that gets corrected, tracked, and eventually eliminated.

Part 11 shows how to close the loop β€” take those gold-standard sessions and turn them into the next version of your model.

Part 10 done. Part 11 covers triggering calibration cycles to export training data and analyse routing weight health.

Part 11 Calibration cycles ~15 min

The aua calibrate command surfaces the three feedback loops as explicit, triggerable operations. You choose when to run each one β€” the framework handles the analysis.

11.1 Layer 1 β€” Measure current performance

bash
# Run the eval harness β€” same as `aua eval run` but surfaced as a calibration step
aua calibrate --layer 1 --dataset evals/coding_smoke.yaml

# Use the default dataset if it exists
aua calibrate --layer 1

11.2 Layer 2 β€” Routing weight analysis

Layer 2 reads assertion event history and shows which domains are healthy vs. degrading β€” the signal that tells you which specialists need attention.

bash
aua calibrate --layer 2

# Example output:
# β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
# β”‚ Domain                   β”‚ Queries β”‚ Pass Rate β”‚ Avg Bonus β”‚ Signal       β”‚
# β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
# β”‚ software_engineering     β”‚     312 β”‚    91.3%  β”‚  +0.087   β”‚ ↑ Strong     β”‚
# β”‚ mathematics              β”‚     148 β”‚    83.1%  β”‚  +0.041   β”‚ β†’ Stable     β”‚
# β”‚ general                  β”‚      44 β”‚    56.2%  β”‚     β€”     β”‚ ↓ Weak       β”‚
# β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
# Stagnation signal: same assertions failing week over week
# β†’ Check: is the assertion too strict? Is the model too small?

aua calibrate --layer 2 --dry-run  # preview only

11.3 Layer 3 β€” Export gold-standard DPO pairs

This is the calibration cycle that closes the loop. The framework identifies your best sessions β€” where the model followed your policy perfectly β€” and exports them as DPO training pairs.

bash
# See what would be exported without writing files
aua calibrate --layer 3 --dry-run

# Example dry-run output:
# Gold-standard sessions:   47
# Failed sessions:          12
# Exportable pairs:         12
# --dry-run: would export 12 DPO pairs β†’ dpo_pairs/calibration.jsonl

# Export when ready
aua calibrate --layer 3 --output dpo_pairs/may_calibration.jsonl

# Force export even if below min-pairs threshold
aua calibrate --layer 3 --force --output dpo_pairs/early_export.jsonl

# Fine-tune your specialist with the exported pairs:
# Axolotl:  axolotl train configs/dpo.yaml --data dpo_pairs/may_calibration.jsonl
# TRL:      trl dpo --dataset dpo_pairs/may_calibration.jsonl
# Then deploy as GREEN: curl -X POST http://localhost:8000/deploy/green

What Layer 3 does (and doesn't do). aua calibrate --layer 3 identifies gold-standard sessions and exports DPO pairs in the format your fine-tuning framework expects. It does not fine-tune models automatically β€” that step runs via Axolotl, TRL, or LLaMA-Factory using the exported JSONL. After fine-tuning, deploy the new model as a GREEN candidate and let blue-green handle promotion.

What you can build with this
  • A model that gets measurably better over time β€” not by accident, but because you've defined what better means and built a pipeline that teaches it.
  • Training data you didn't have to label: the framework identified which sessions were gold-standard based on your policy.
  • A clear picture of which domains are healthy and which specialists need attention β€” before users notice.
  • The complete loop: define what good looks like β†’ run queries β†’ identify the best sessions β†’ export training pairs β†’ fine-tune β†’ repeat.

Part 12 gives you the visibility layer β€” so you can actually see the improvement happening over time.

Part 11 done. Part 12 covers querying session and assertion logs and comparing metrics over time.

Part 12 Logs & metrics over time ~15 min

The assertion events store gives you a time-series view of how your policy is performing. These commands let you answer "is my AI actually getting better at following my policy?"

12.1 Viewing assertion events

bash
# All recent assertion events
aua logs assertions

# Filter to failures only β€” the assertions that need attention
aua logs assertions --filter passed=false

# Filter by assertion name
aua logs assertions --assertion PythonSyntaxCheck --tail 20

# Filter by domain
aua logs assertions --filter domain=software_engineering

# Export for offline analysis
aua logs assertions --json > my_assertions.json

12.2 Viewing session history

bash
# Recent sessions with U scores
aua logs sessions

# Export sessions to JSON
aua logs export --table audit_log --output sessions.json

12.3 Comparing metrics over time

This is the "is it working?" command. It compares the current window against the prior window of the same length and shows whether the key signals are moving in the right direction.

bash
# Compare last 30 days vs prior 30 days
aua metrics --compare 30d

# Example output (after a few weeks with an active policy):
# β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
# β”‚ Metric                      β”‚ Prior    β”‚ Current  β”‚ Trend            β”‚
# β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
# β”‚ Mean U score                β”‚  0.6213  β”‚  0.6891  β”‚ ↑ +0.0678        β”‚
# β”‚ Assertion fail rate         β”‚  0.2341  β”‚  0.1102  β”‚ ↓ -0.1239        β”‚ ← good
# β”‚ Retry rate (BLOCKING)       β”‚  0.1820  β”‚  0.0890  β”‚ ↓ -0.0930        β”‚ ← good
# β”‚ Avg E bonus (INFO)          β”‚  0.0120  β”‚  0.0654  β”‚ ↑ +0.0534        β”‚
# β”‚ Total queries               β”‚      312 β”‚      481 β”‚ ↑ +0.0000        β”‚
# β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
# Success signal: mean_u_score ↑, assertion_fail_rate ↓, retry_rate ↓
# Stagnation signal: same assertions failing week over week

# Focus on a single metric
aua metrics --compare 7d --metric assertion_fail_rate

# Date range
aua metrics --compare 2025-04-01:2025-05-01

# JSON output for charting
aua metrics --compare 30d --json

What success looks like. Mean U score trending up. Assertion fail rate trending down. Retry rate (BLOCKING) falling β€” meaning the model is learning to get it right on the first try. After a fine-tuning cycle, you may see a step-change drop in fail rate as the trained behaviour is baked into the weights.

What stagnation looks like. The same assertions failing week over week. This means either the assertion is too strict for the model's capability, or the model isn't receiving enough signal to learn. Check: is max_retries too low? Is the policy active on enough queries to accumulate data?

12.4 The full policy workflow in practice

Putting it all together β€” this is the cycle for designing and refining your AI over time:

bash β€” Monthly calibration cycle
# 1. Week 1-4: run queries with policy active, accumulate assertion events

# 2. Check Layer 2 health at any point
aua calibrate --layer 2

# 3. Review failures β€” add/refine assertions if the same things keep failing
aua logs assertions --filter passed=false --tail 50

# 4. End of month: export gold-standard sessions
aua calibrate --layer 3 --dry-run      # preview
aua calibrate --layer 3                # export to dpo_pairs/calibration.jsonl

# 5. Fine-tune your specialist on the exported pairs (external step)
# trl dpo --dataset dpo_pairs/calibration.jsonl

# 6. Deploy the fine-tuned model as GREEN
curl -X POST http://localhost:8000/deploy/green \
  -d '{"specialist": "swe", "new_model": "qwen2.5-coder:14b-finetuned"}'

# 7. Blue-green evaluates and promotes if U score delta passes threshold
aua status --once    # watch the promotion

# 8. Compare metrics to confirm improvement
aua metrics --compare 30d

# Repeat β€” each cycle, the model gets better at following your policy.
# After a few cycles, the assertions become less necessary because the
# defined behaviours are baked into the model weights.
What you can build with this
  • Full visibility into whether your AI is improving β€” assertion fail rate trending down, U score trending up, in whatever monitoring stack you use.
  • One trace_id that links a specific response to its log line, its Prometheus metrics, and its distributed trace β€” the full story of what happened.
  • Alerts before users notice: U score drops, assertion failure spikes, latency regressions β€” all triggerable from the same metrics.
  • A system you can hand to an ops team: ELK, Splunk, Grafana, Loki β€” whatever they already use, with working configs and the right fields already in every log line.

The how-to guides cover everything you need for production: plugins, security, Docker, and full observability setup.

Part 12 done. How-to guides follow β€” plugins, hooks, security, observability, and Docker deployment.


How-to guides

Task-oriented guides for specific things you need to accomplish.

How-to 13 Plugin system β€” all interfaces ~90 min

AUA has 8 plugin interfaces and 1 middleware interface. Each one replaces a specific internal layer. No inheritance required β€” implement the methods, register in YAML, and the framework uses your version instead of the built-in.

The pattern is always the same: minimum viable (implement only the required method with the minimum signature), then full version (using all available parameters). Like Python's sorted() β€” one argument works, but key= and reverse= give you more control.

No base class, no inheritance. AUA uses Python Protocol for structural subtyping. Your class just needs the right method names and signatures. If it has them, it satisfies the interface β€” the framework verifies this at load time with isinstance() check.

The constructor contract. The config: mapping under your plugin entry is splatted as keyword arguments β€” the loader calls YourClass(**config). So config: {confidence_boost: 1.1} means KeywordClassifier(confidence_boost=1.1). Write def __init__(self, confidence_boost: float = 1.0), give every optional key a default, and omit defaults for keys you want to be required (missing β†’ fail-fast at startup). Never write def __init__(self, config: dict).

Where the code lives β€” and how to know it loaded

Plugins are a normal Python package sitting next to your config. The router inserts the config file's directory onto sys.path at startup (the same trick manage.py does for a Django project), so plugins.mine:KeywordClassifier resolves with zero packaging work:

project layout
my-aua-project/
β”œβ”€β”€ aua_config.yaml          # import paths resolve relative to this file's directory
└── plugins/
    β”œβ”€β”€ __init__.py          # required β€” plugins/ is a package
    └── mine.py              # your classes; any module name works

The verification loop, in the order you'll actually use it:

bash β€” write β†’ test β†’ register β†’ confirm
# 1. Pre-flight before touching config: imports + contract-validates in isolation
aua extensions test --kind field_classifier --import-path plugins.mine:KeywordClassifier

# 2. Register in aua_config.yaml (each interface below shows its block), then
aua config validate            # typo'd kind, bad hook point, malformed import path β†’ caught here
aua serve                      # startup logs: "Plugin loaded from config: field_classifier ← plugins.mine:KeywordClassifier"

# 3. Ask the RUNNING server what it loaded (null = built-in):
curl -s localhost:8000/extensions | python3 -m json.tool

Which kinds wire from YAML: field_classifier, utility_scorer, and correction_store take effect the moment the server starts, as do all hooks: and middleware:. The remaining kinds (arbiter_policy, promotion_policy, model_backend, state_store) still load and contract-validate from config β€” a typo'd import path fails at startup, not in production β€” and attach programmatically at the points shown in their sections below.

Jump to: FieldClassifier Β· UtilityScorer Β· ArbiterPolicy Β· PromotionPolicy Β· CorrectionStore Β· ModelBackend Β· StateStore Β· Middleware


13.1 FieldClassifierPlugin β€” custom query routing logic

What it replaces: The built-in TF-IDF/embedding field classifier that decides which specialist handles each query.

When to write one: You have a proprietary taxonomy, an existing intent classifier, or domain-specific routing rules that the built-in classifier doesn't know about.

Required method: classify(query: str) β†’ dict[str, float]

plugins/my_classifier.py β€” minimum viable
class KeywordClassifier:
    def classify(self, query: str) -> dict[str, float]:
        """Route based on keywords. Probabilities must sum to ≀ 1.0."""
        q = query.lower()
        if any(w in q for w in ["sort", "binary", "algorithm", "complexity"]):
            return {"software_engineering": 0.9, "mathematics": 0.1}
        if any(w in q for w in ["integral", "derivative", "proof"]):
            return {"mathematics": 0.95}
        return {"general": 1.0}    # fallback
plugins/my_classifier.py β€” full version (with config + confidence calibration)
class KeywordClassifier:
    def __init__(self, confidence_boost: float = 1.0):
        # YAML config keys arrive as KEYWORD ARGUMENTS β€” cls(**config).
        # plugins.field_classifier.config: {confidence_boost: 1.1} calls
        # KeywordClassifier(confidence_boost=1.1). Give every key a default.
        self.boost = float(confidence_boost)

    def classify(self, query: str) -> dict[str, float]:
        """
        query: str β€” raw user query text (pre-processed, no conversation history)

        Returns: dict[field_name β†’ probability]
          - field names must match specialist 'field' values in aua_config.yaml
          - probabilities should sum to ≀ 1.0 (remainder is implicitly 'unknown')
          - returning {} triggers arbiter fallback
        """
        q = query.lower()
        scores: dict[str, float] = {}
        if any(w in q for w in ["sort", "binary", "algorithm", "def ", "class "]):
            scores["software_engineering"] = 0.85 * self.boost
        if any(w in q for w in ["integral", "derivative", "theorem", "proof"]):
            scores["mathematics"] = 0.90 * self.boost
        # Normalise so probabilities sum to 1.0
        total = sum(scores.values())
        if total > 1.0:
            scores = {k: v / total for k, v in scores.items()}
        return scores or {"general": 1.0}
aua_config.yaml β€” register
plugins:
  field_classifier:
    import_path: plugins.my_classifier:KeywordClassifier
    config:
      confidence_boost: 1.1

13.2 UtilityScorerPlugin β€” custom U score

What it replaces: The built-in U = w_eΒ·E + w_cΒ·C + w_kΒ·K scorer.

When to write one: You want domain-specific scoring logic β€” e.g. penalise hallucinations more in a medical domain, or reward shorter answers in a support context.

Required method: score(response, field, prior_u, confidence, metadata) β†’ float

plugins/risk_scorer.py β€” minimum viable
class RiskWeightedScorer:
    def score(
        self,
        response: str,       # the specialist's text output
        field: str,          # field name e.g. "software_engineering"
        prior_u: float,      # running mean U for this specialist (0.0–1.0)
        confidence: float,   # Kalman-filtered confidence (0.0–1.0)
        metadata: dict,      # {"session_id", "query", "latency_ms", ...}
    ) -> float:              # return U score in [0.0, 1.0]
        return 0.5 * confidence + 0.5 * prior_u
plugins/risk_scorer.py β€” full version
class RiskWeightedScorer:
    def __init__(self, risk_threshold: float = 0.80, length_penalty: float = 0.0):
        self.risk_threshold = float(risk_threshold)
        self.length_penalty = float(length_penalty)

    def score(self, response: str, field: str, prior_u: float,
              confidence: float, metadata: dict) -> float:
        # metadata keys available: session_id, query, latency_ms, specialist
        # Lower efficacy when confidence is below threshold
        efficacy = prior_u
        if confidence < self.risk_threshold:
            efficacy *= 0.6

        # Penalise very long responses (optional)
        word_count = len(response.split())
        length_factor = max(0.7, 1.0 - self.length_penalty * (word_count / 1000))

        u = 0.5 * efficacy + 0.4 * confidence + 0.1 * (prior_u * length_factor)
        return round(min(1.0, max(0.0, u)), 4)
aua_config.yaml
plugins:
  utility_scorer:
    import_path: plugins.risk_scorer:RiskWeightedScorer
    config:
      risk_threshold: 0.85
      length_penalty: 0.1

13.3 ArbiterPolicyPlugin β€” custom contradiction arbitration

What it replaces: The built-in 4-check arbitration logic (logical, mathematical, cross-session, empirical).

When to write one: You want domain-specific arbitration β€” e.g. always prefer the response from the specialist with higher past U score, or use an external fact-checker API.

Required method: arbitrate(subject, domain, output_a, output_b, metadata) β†’ dict

plugins/simple_arbiter.py β€” minimum viable
class LengthArbiter:
    def arbitrate(
        self,
        subject: str,          # short subject identifier e.g. "heapsort_complexity"
        domain: str,           # field name
        output_a: str,         # first specialist's full response text
        output_b: str,         # second specialist's full response text
        metadata: dict,        # {"session_id", "field_penalty_multiplier", ...}
    ) -> dict:
        # Required return keys:
        return {
            "case": "case_1",               # "case_1"|"case_2"|"case_3"|"case_4"
            "correct_a": False,             # True β†’ store correction for specialist A
            "correct_b": False,             # True β†’ store correction for specialist B
            "verified_claim": None,         # str if a fact was verified, else None
            "external_response": output_a,  # what the user sees
        }
plugins/simple_arbiter.py β€” full version with confidence-based decision
class ConfidenceArbiter:
    def __init__(self, prefer_longer: bool = False):
        self.prefer_longer = prefer_longer

    def arbitrate(self, subject: str, domain: str, output_a: str,
                  output_b: str, metadata: dict) -> dict:
        # metadata["specialist_a_confidence"] and ["specialist_b_confidence"]
        # are available when fanout routing produced both responses
        conf_a = metadata.get("specialist_a_confidence", 0.5)
        conf_b = metadata.get("specialist_b_confidence", 0.5)

        if abs(conf_a - conf_b) < 0.05:
            # Too close to call β€” both acceptable (case_1: no contradiction)
            winner = output_a if (not self.prefer_longer or len(output_a) >= len(output_b)) else output_b
            return {"case": "case_1", "correct_a": False, "correct_b": False,
                    "verified_claim": None, "external_response": winner}

        if conf_a > conf_b:
            # A wins β€” B needs correction (case_2)
            return {"case": "case_2", "correct_a": False, "correct_b": True,
                    "verified_claim": output_a[:200], "external_response": output_a}
        else:
            # B wins β€” A needs correction (case_3)
            return {"case": "case_3", "correct_a": True, "correct_b": False,
                    "verified_claim": output_b[:200], "external_response": output_b}
        # case_4 = both wrong β€” use "external_response" = your fallback text
aua_config.yaml
plugins:
  arbiter_policy:
    import_path: plugins.simple_arbiter:ConfidenceArbiter
    config:
      prefer_longer: true

13.4 PromotionPolicyPlugin β€” custom blue-green promotion logic

What it replaces: The built-in delta + T_min promotion threshold check.

When to write one: You want additional promotion criteria β€” e.g. minimum number of queries, assertion pass rate threshold, or a human approval step.

Required method: should_promote(specialist, blue_mean_u, green_mean_u, n_queries, metadata) β†’ bool

plugins/strict_promoter.py β€” minimum viable
class StrictPromoter:
    def should_promote(
        self,
        specialist: str,       # specialist name e.g. "swe"
        blue_mean_u: float,    # BLUE model's mean U over evaluation period
        green_mean_u: float,   # GREEN candidate's mean U
        n_queries: int,        # number of evaluation queries run
        metadata: dict,        # {"delta", "T_min", "tau", "config", ...}
    ) -> bool:                 # True = promote, False = keep BLUE
        return green_mean_u > blue_mean_u + 0.05 and n_queries >= 50
plugins/strict_promoter.py β€” full version with assertion gate
class StrictPromoter:
    def __init__(self, min_delta: float = 0.05, min_queries: int = 50,
                 max_assertion_fail_rate: float = 0.10):
        self.min_delta = float(min_delta)
        self.min_queries = int(min_queries)
        self.max_fail_rate = float(max_assertion_fail_rate)

    def should_promote(self, specialist: str, blue_mean_u: float,
                       green_mean_u: float, n_queries: int, metadata: dict) -> bool:
        # Gate 1: minimum evaluation period
        if n_queries < self.min_queries:
            return False
        # Gate 2: meaningful U improvement
        if green_mean_u - blue_mean_u < self.min_delta:
            return False
        # Gate 3: assertion pass rate (if available in metadata)
        fail_rate = metadata.get("assertion_fail_rate", 0.0)
        if fail_rate > self.max_fail_rate:
            return False
        return True
aua_config.yaml
plugins:
  promotion_policy:
    import_path: plugins.strict_promoter:StrictPromoter
    config:
      min_delta: 0.08
      min_queries: 100
      max_assertion_fail_rate: 0.05

13.5 CorrectionStorePlugin β€” custom correction backend

What it replaces: The built-in in-memory + SQLite AssertionsStore.

When to write one: You want corrections stored in Postgres, Redis, or a vector database β€” or you need multi-tenant isolation (pending v1.1 per-user scoping).

Required methods: store(), query(), export_dpo_pairs()

plugins/pg_store.py β€” minimum viable (in-memory for illustration)
class InMemoryCorrectionStore:
    def __init__(self):
        self._data: list[dict] = []

    def store(
        self,
        subject: str,      # what the assertion is about e.g. "heapsort_complexity"
        domain: str,       # field name
        claim: str,        # the verified fact string
        confidence: float, # 0.0–1.0 confidence at write time
    ) -> None:
        self._data.append({"subject": subject, "domain": domain,
                           "claim": claim, "confidence": confidence})

    def query(
        self,
        subject: str,           # filter by subject (partial match OK)
        domain: str | None,     # optional domain filter
    ) -> list[dict]:
        return [d for d in self._data
                if subject.lower() in d["subject"].lower()
                and (domain is None or d["domain"] == domain)]

    def export_dpo_pairs(
        self,
        domain: str | None,  # None = all domains
        limit: int,          # max pairs to return
    ) -> list[dict]:
        # Return list of {"chosen": str, "rejected": str} dicts
        pairs = [{"chosen": d["claim"], "rejected": ""} for d in self._data
                 if domain is None or d["domain"] == domain]
        return pairs[:limit]
aua_config.yaml
plugins:
  correction_store:
    import_path: plugins.pg_store:InMemoryCorrectionStore
    config:
      connection_string_secret: POSTGRES_URL  # for a real Postgres implementation

13.6 ModelBackendPlugin β€” connect any LLM serving infrastructure

Not yet wired for per-specialist dispatch β€” roadmap #74. The framework loads and validates your plugin class at startup (a bad import path fails fast), but the router's _call() does not yet dispatch to the plugin at query time. All traffic still goes through the built-in vLLM/Ollama HTTP path. The prebuilt plugins (OpenAI, Anthropic, etc.) are also affected. Planned for #74.

Workaround: use a routing_strategy plugin (section 19.4) to redirect traffic to an external HTTP endpoint of your choice, or use a field_classifier plugin that maps queries to a specialist already backed by a frontier API endpoint.

What it will replace: The built-in vLLM and Ollama HTTP clients.

When to write one: You want to route to a commercial API (OpenAI, Anthropic, Cohere), an internal gateway, or any serving stack that isn't vLLM or Ollama.

Prebuilt plugins available. Seven production-ready frontier model backends are included in aua/plugins/prebuilt/ β€” contributed from AUA-Veritas. Drop them into your config without writing any code:

PluginModelsimport_path
OpenAIBackendGPT-4o, GPT-4o miniaua.plugins.prebuilt.openai_backend:OpenAIBackend
AnthropicBackendClaude Sonnet 4.5, Haiku 4.5aua.plugins.prebuilt.anthropic_backend:AnthropicBackend
GoogleBackendGemini 1.5 Pro, 2.0 Flashaua.plugins.prebuilt.google_backend:GoogleBackend
XAIBackendGrok-2aua.plugins.prebuilt.xai_backend:XAIBackend
MistralBackendMistral Largeaua.plugins.prebuilt.mistral_backend:MistralBackend
GroqBackendLlama 3.3 70Baua.plugins.prebuilt.groq_backend:GroqBackend
DeepSeekBackendDeepSeek-V3, DeepSeek-R1aua.plugins.prebuilt.deepseek_backend:DeepSeekBackend
aua_config.yaml β€” use a prebuilt plugin (no code needed)
plugins:
  model_backend:
    import_path: aua.plugins.prebuilt.openai_backend:OpenAIBackend
    config:
      api_key_secret: OPENAI_API_KEY   # resolved from env or secrets provider
      model: gpt-4o

Required methods: complete(), stream(), health() β€” all async.

plugins/openai_backend.py β€” minimum viable
import httpx
from collections.abc import AsyncIterator

class OpenAIBackend:
    def __init__(self, api_key: str, model: str = "gpt-4o-mini",
                 base_url: str = "https://api.openai.com"):
        self.api_key = api_key   # required key β€” no default, fails fast if missing
        self.model = model
        self.base_url = base_url

    async def complete(self, request: dict) -> dict:
        # request is OpenAI-compatible: {"model", "messages", "temperature", ...}
        # Return OpenAI-compatible response with choices[0].message.content
        async with httpx.AsyncClient() as client:
            r = await client.post(
                f"{self.base_url}/v1/chat/completions",
                headers={"Authorization": f"Bearer {self.api_key}"},
                json={**request, "model": self.model},
                timeout=30.0,
            )
            return r.json()

    async def stream(self, request: dict) -> AsyncIterator[str]:
        # Yield token strings as they arrive
        async with httpx.AsyncClient() as client:
            async with client.stream(
                "POST", f"{self.base_url}/v1/chat/completions",
                headers={"Authorization": f"Bearer {self.api_key}"},
                json={**request, "model": self.model, "stream": True},
            ) as r:
                async for line in r.aiter_lines():
                    if line.startswith("data: ") and "[DONE]" not in line:
                        import json
                        chunk = json.loads(line[6:])
                        token = chunk["choices"][0]["delta"].get("content", "")
                        if token:
                            yield token

    async def health(self) -> dict:
        # Return {"status": "ok"|"error", "latency_ms": float}
        try:
            async with httpx.AsyncClient() as client:
                import time
                t0 = time.time()
                await client.get(f"{self.base_url}/v1/models",
                                 headers={"Authorization": f"Bearer {self.api_key}"},
                                 timeout=5.0)
                return {"status": "ok", "latency_ms": (time.time() - t0) * 1000}
        except Exception as e:
            return {"status": "error", "error": str(e), "latency_ms": 0.0}
aua_config.yaml
plugins:
  model_backend:
    import_path: plugins.openai_backend:OpenAIBackend
    config:
      api_key_secret: OPENAI_API_KEY   # resolved from env at startup
      model: gpt-4o-mini
      base_url: https://api.openai.com

13.7 StateStorePlugin β€” custom session/state backend

Not yet wired β€” roadmap #75. The framework loads and validates your plugin class at startup, but all state writes (corrections, audit log, model_runs, sessions) still go to the built-in SQLite store. The init ordering constraint (BatchQueue, ShadowStore, DomainTree, and others all receive self._state_store at construction time) must be resolved before this plugin can be activated. Planned for #75.

Workaround: for persistence across instances, configure Postgres or Redis at the OS level and point SQLite to a shared path via NFS, or use the correction_store plugin (which is wired) to redirect DPO pair storage specifically.

What it will replace: The built-in SQLite state store.

When to write one: You want sessions, corrections, and audit events stored in Postgres, Redis, or a shared datastore across multiple router instances.

πŸ—„οΈ Full DB schema β€” all tables and columns β†’

Required methods: get(), set(), append(), query()

plugins/redis_store.py β€” minimum viable (dict for illustration)
import uuid

class DictStateStore:
    def __init__(self):
        self._store: dict[str, dict] = {}   # table:key β†’ record
        self._lists: dict[str, list] = {}   # append-only tables

    def get(
        self,
        table: str,  # table name e.g. "sessions", "corrections", "assertion_events"
        key: str,    # record key (UUID string)
    ) -> dict | None:
        return self._store.get(f"{table}:{key}")

    def set(
        self,
        table: str,
        key: str,
        value: dict,  # arbitrary JSON-serialisable dict
    ) -> None:
        self._store[f"{table}:{key}"] = value

    def append(
        self,
        table: str,
        record: dict,  # record to append (no key β€” framework generates one)
    ) -> str:          # return the generated record ID
        record_id = str(uuid.uuid4())
        self._lists.setdefault(table, []).append({**record, "id": record_id})
        return record_id

    def query(
        self,
        table: str,
        filters: dict = {},  # {field_name: value} equality filters
        limit: int = 100,
    ) -> list[dict]:
        rows = self._lists.get(table, [])
        for k, v in filters.items():
            rows = [r for r in rows if r.get(k) == v]
        return rows[:limit]
aua_config.yaml
plugins:
  state_store:
    import_path: plugins.redis_store:DictStateStore
    config:
      url_secret: REDIS_URL

13.8 AUAMiddleware β€” before/after every request

What it replaces: Nothing β€” middleware is additive. It wraps every request/response pair.

When to write one: PII redaction, request logging, tenant routing, rate limiting, or any transformation that needs to run on every query.

Required methods: before_query(request: dict) β†’ dict and after_response(response: dict) β†’ dict β€” both async.

plugins/pii_middleware.py β€” minimum viable
import re

class PIIRedactionMiddleware:
    async def before_query(self, request: dict) -> dict:
        # request keys: "query", "session_id", "conversation_history", "force_domain"
        # Modify and return request dict. Raise to abort the request.
        request["query"] = re.sub(r"\b\d{3}-\d{2}-\d{4}\b", "[SSN]", request["query"])
        return request

    async def after_response(self, response: dict) -> dict:
        # response keys: "response", "u_score", "domain", "routing_mode",
        #                "confidence", "latency_ms", "contradictions_detected"
        # Return response unchanged if you don't need to modify it.
        return response
plugins/pii_middleware.py β€” full version with logging
import logging, re, time

log = logging.getLogger(__name__)

class PIIRedactionMiddleware:
    def __init__(self, patterns: list[str] | None = None):
        self.patterns = patterns or [r"\b\d{3}-\d{2}-\d{4}\b"]
        self._compiled = [re.compile(p) for p in self.patterns]

    async def before_query(self, request: dict) -> dict:
        original = request["query"]
        redacted = original
        for pattern in self._compiled:
            redacted = pattern.sub("[REDACTED]", redacted)
        if redacted != original:
            log.info("PII redacted in session %s", request.get("session_id"))
        request["query"] = redacted
        request["_redacted"] = redacted != original  # pass context forward
        return request

    async def after_response(self, response: dict) -> dict:
        # Optionally add middleware metadata to response
        if response.get("_redacted"):
            response["metadata"] = response.get("metadata", {})
            response["metadata"]["pii_redacted"] = True
        return response
aua_config.yaml β€” middleware runs in list order
middleware:                                   # TOP-LEVEL list β€” runs in order
  - import_path: plugins.pii_middleware:PIIRedactionMiddleware
    config:
      patterns:
        - '\b\d{3}-\d{2}-\d{4}\b'   # SSN
        - '\b[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}\b'  # email
  - import_path: plugins.tenant_policy:TenantMiddleware
  - plugins.simple_logger:LogMiddleware       # bare string works when there's no config

13.9 Testing plugins before registering

bash
# Pre-flight: import + contract-validate before touching config (run from the project dir)
aua extensions test --kind utility_scorer --import-path plugins.risk_scorer:RiskWeightedScorer
aua extensions inspect plugins.risk_scorer:RiskWeightedScorer
# βœ“ Interface satisfied: UtilityScorerPlugin
# βœ“ score() signature valid
# To pick up changes: restart aua serve

How-to 13 done. You now have the complete plugin reference β€” all 11 interfaces, non-linear utility functions, promotion policies, and multi-tenancy. How-to 14 covers hooks and middleware.


The sections below complete the plugin reference β€” new interfaces added in v1.2.

Every major decision point in AUA β€” routing, scoring, arbitration, promotion, contradiction detection, assertion storage β€” has a formal Python Protocol interface you can replace by registering a class in aua_config.yaml. You never fork the source. You write a class, register it, and the router wires it at startup.

This How-to walks through all plugin types from scratch, with working examples. By the end you will have replaced the utility scorer, added a custom routing strategy, implemented a non-linear utility function, and configured multi-tenancy. Each section can be read independently if you only need one plugin type.

13.10 How plugins are loaded β€” the five-step lifecycle

When the router starts, it reads plugins: from aua_config.yaml, then for each entry:

  1. Parses import_path as module.path:ClassName
  2. Adds your project root to sys.path (so plugins.scoring:MyScorer resolves to {project}/plugins/scoring.py)
  3. Imports the module and instantiates the class, passing config: dict as kwargs
  4. Validates the instance against its Protocol using isinstance() β€” fails fast at startup, not silently at query time
  5. Wires the plugin into the relevant decision point (e.g. replaces self._custom_scorer)
Minimal plugin registration β€” aua_config.yaml
plugins:
  utility_scorer:                         # which slot
    import_path: plugins.scoring:MyScorer # module:ClassName
    config:                               # passed as **kwargs to __init__
      risk_weight: 0.7

Structural typing β€” no inheritance needed. AUA uses Python Protocols. Your class does not import or extend anything from AUA. It just needs the right method name and signature. This means you can wrap any existing class β€” an sklearn model, a FastAPI endpoint, a numpy function β€” as a plugin with a thin adapter.

13.11 Plugin types at a glance

YAML keyProtocol classWhat it replacesRequired methodFallback on error
field_classifierFieldClassifierPluginDomain classifierclassify(query) β†’ dict[str,float]Built-in classifier
utility_scorerUtilityScorerPluginFinal U score (adjustment)score(response, field, prior_u, confidence, metadata) β†’ floatBuilt-in U
full_utility_scorerFullUtilityScorerPluginEntire U computationscore_full(field, efficacy, confidence, curiosity, weights, metadata) β†’ floatFalls back to score() then built-in
contradiction_detectorContradictionDetectorPluginBuilt-in code checkercheck(problem, solution, claimed_complexity=None) β†’ dictBuilt-in detector
assertion_storeAssertionStorePluginIn-memory AssertionsStoreadd(), query(), query_contradictions()β€”
routing_strategyRoutingStrategyPluginPost-classifier distributionroute(query, distribution, metadata) β†’ dict[str,float]Classifier output
scoring_componentScoringComponentPluginOne sub-score (E, C, or K)compute(component, value, field, metadata) β†’ floatBuilt-in sub-score
arbiter_policyArbiterPolicyPluginLLM arbitration callarbitrate(subject, domain, output_a, output_b, metadata) β†’ dictBuilt-in LLM arbiter
promotion_policyPromotionPolicyPluginPromotion gate (simple)should_promote(specialist, blue_mean_u, green_mean_u, n_queries, metadata) β†’ boolBuilt-in threshold
full_promotion_policyFullPromotionPolicyPluginPromotion gate (full context)should_promote_full(context: dict) β†’ boolFalls back to should_promote()
correction_storeCorrectionStorePluginDPO pair / correction storagestore(), query(), export_dpo_pairs()β€”

Every plugin has a safe fallback. If your plugin raises an unhandled exception at query time, AUA logs it at DEBUG level and falls back to the built-in implementation. Your production traffic is never blocked by a plugin bug. Fix and redeploy at your own pace.

13.12 Field classifier plugin β€” route queries differently

The built-in classifier uses a keyword/embedding model trained on the 11 built-in fields. Replace it when you have tenant-specific routing rules, a proprietary domain taxonomy, or an external classification service.

plugins/routing.py
class TenantAwareClassifier:
    """
    Route based on tenant context first, then keyword fallback.
    Reads the tenant ID set by TenantPolicyMiddleware.
    """
    def __init__(self, tenant_overrides: dict):
        self.overrides = tenant_overrides   # e.g. {"tenant-finance": "mathematics"}

    def classify(self, query: str) -> dict[str, float]:
        from aua.tenancy import get_tenant_id
        tenant = get_tenant_id()
        if tenant and tenant in self.overrides:
            return {self.overrides[tenant]: 1.0}
        if any(w in query.lower() for w in ["integral", "derivative", "proof"]):
            return {"mathematics": 0.92, "software_engineering": 0.05}
        return {"software_engineering": 0.85, "mathematics": 0.10}
aua_config.yaml
plugins:
  field_classifier:
    import_path: plugins.routing:TenantAwareClassifier
    config:
      tenant_overrides:
        tenant-finance: mathematics
        tenant-devtools: software_engineering

Your classify() receives the raw query string and must return a dict mapping field names to probabilities (0.0–1.0). Probabilities should sum to ≀ 1.0 (the remainder is treated as "unknown" and routes to the arbiter). Returning {"software_engineering": 1.0} forces single-specialist routing to the swe specialist.

13.13 Routing strategy plugin β€” intercept after classification

A routing_strategy plugin sits between the classifier and the routing threshold decision. It receives the probability distribution and can reorder, cap, or override it. This runs after the classifier and before the single/fanout/arbiter threshold check β€” so the router's mode decision uses your adjusted distribution.

plugins/routing.py β€” deterministic domain lock for certain query prefixes
class PrefixRouter:
    """
    Queries starting with [math] always go to the mathematics specialist.
    Useful for power users who know which specialist they want.
    """
    def route(self, query: str, distribution: dict, metadata: dict) -> dict:
        q = query.strip()
        if q.startswith("[math]"):
            return {"mathematics": 1.0}
        if q.startswith("[code]"):
            return {"software_engineering": 1.0}
        return distribution   # pass through unchanged
aua_config.yaml
plugins:
  routing_strategy:
    import_path: plugins.routing:PrefixRouter

The metadata dict contains session_id. When force_domain is set on a request, the routing strategy is skipped β€” explicit overrides always win.

13.14 Contradiction detector plugin β€” custom validation

The built-in detector catches syntax errors, logical contradictions, and cross-session conflicts in code generation. Replace it when you need domain-specific validation β€” checking medical claims against a drug database, verifying proofs with SymPy, or using a fine-tuned classifier.

Return contract β€” what your check() must return
def check(self, problem: str, solution: str,
          claimed_complexity: str | None = None) -> dict:
    return {
        "contradictions": [
            {
                "type": "domain_specific",         # any string label
                "description": "Claim contradicts known literature",
                "severity": 0.8,                  # 0.0 to 1.0
            }
        ],
        "confidence_penalty": 0.3,   # total deducted from confidence score
        "is_clean": False,           # True when contradictions list is empty
    }

Tip: use the built-in empirical module inside your plugin. from aua.empirical import empirical_check gives you the same SymPy / arXiv / PubMed cross-check the Arbiter Stage 4 uses. Call it from your detector to get external ground-truth verification at the per-response level.

aua_config.yaml
plugins:
  contradiction_detector:
    import_path: plugins.validation:MyDomainDetector

13.15 Assertion store plugin β€” persistent claim memory

The built-in AssertionsStore is in-memory and resets on restart. Replace it when you want verified claims to survive restarts, be shared across instances, or be queryable from outside AUA β€” for example, via a Postgres database that a reporting tool can also read.

plugins/stores.py β€” minimal Postgres assertion store
import psycopg2

class PostgresAssertionStore:
    def __init__(self, dsn: str):
        self.conn = psycopg2.connect(dsn)

    def add(self, subject, domain, claim, confidence,
            source="arbiter", evidence_summary=""):
        with self.conn.cursor() as cur:
            cur.execute(
                "INSERT INTO assertions (subject, domain, claim, confidence, source)"
                " VALUES (%s,%s,%s,%s,%s) ON CONFLICT (subject,domain) DO UPDATE"
                " SET claim=EXCLUDED.claim, confidence=EXCLUDED.confidence",
                (subject, domain, claim, confidence, source)
            )
        self.conn.commit()

    def query(self, subject, domain=None, min_confidence=None):
        ...  # SELECT WHERE subject ILIKE %s AND confidence >= %s

    def query_contradictions(self, subject, new_claim, domain=None):
        ...  # compare new_claim against stored claims

assertion_store vs correction_store. These are two different stores. assertion_store holds knowledge-level verified claims (what the arbiter learned is true about "bubble sort complexity"). correction_store holds DPO training pairs (which response was better and why). They are independent plugin slots with different interfaces.

13.16 Scoring component plugin β€” adjust one sub-score

The utility function is U = w_eΒ·E + w_cΒ·C + w_kΒ·K. A scoring_component plugin intercepts one component (E, C, or K) after the built-in pipeline computes it, applies your adjustment, then the weighted sum is recomputed with your adjusted value. This is right when you want to bias one dimension without touching the overall architecture.

plugins/scoring.py β€” penalise short responses via efficacy
class LengthAwareEfficacy:
    """
    Responses shorter than min_chars get an efficacy penalty.
    Encourages the specialist to give complete answers.
    """
    def __init__(self, min_chars: int = 200):
        self.min_chars = min_chars

    def compute(self, component: str, value: float,
                field: str, metadata: dict) -> float:
        if component != "efficacy":
            return value   # pass C and K through unchanged
        response = metadata.get("response", "")
        if len(response) < self.min_chars:
            shortfall = 1.0 - len(response) / self.min_chars
            return max(0.0, value * (1.0 - 0.4 * shortfall))
        return value
aua_config.yaml
plugins:
  scoring_component:
    import_path: plugins.scoring:LengthAwareEfficacy
    config:
      min_chars: 300

The metadata dict available inside compute() contains: query, response, pass_rate, and contradiction_penalty.

13.17 Utility scorer β€” adjustment mode vs full replacement

There are two ways to customise the utility score. Understanding the difference is important before choosing one.

Adjustment mode (utility_scorer) β€” the built-in pipeline runs first, produces a U score, passes it to your plugin as prior_u. Your plugin returns a scalar. Use this when you want to post-process the built-in score β€” multiplying by a risk factor, capping on confidence, or blending with an external signal.

Full replacement mode (full_utility_scorer) β€” the built-in w_eΒ·E + w_cΒ·C + w_kΒ·K step is skipped entirely. Your plugin receives the raw E, C, K components and can compute any function of them. Use this when the linear form is wrong for your domain.

Why the built-in is linear β€” Axiom A5

The linear form is not arbitrary. It follows necessarily from five axioms proved in Appendix B.1 of the whitepaper. The load-bearing axiom is A5 (linear scaling): if you scale all three inputs by the same factor Ξ», utility scales by the same factor. A5 forces the component functions to be linear. It is what makes the additive representation theorem (Theorem B.1) work.

score_full() deliberately bypasses A5. You lose the theoretical guarantee of Theorem B.1 but gain the freedom to express any utility model. Non-linear models often outperform the linear form empirically even when they lack the formal proof β€” the Appendix B.1 Remark on A5 as the "load-bearing axiom" is exactly about this tradeoff.

What score_full() receives

ArgumentTypeWhat it is
fieldstrDomain name β€” "surgery", "mathematics", "software_engineering" …
efficacyfloat 0–1E_ema β€” EMA-accumulated efficacy (Ξ±=0.2): tracks how well the specialist performs over time
confidencefloat 0–1Kalman-filtered confidence after contradiction penalty has been applied
curiosityfloat 0–1K_effective = K_base + gap_bonus from Arbiter Case 3
weightsdict{"w_e": float, "w_c": float, "w_k": float} from this field's config
metadatadictquery, response, pass_rate, task_score (the full built-in TaskScore object)

Non-linear utility functions you can implement

Quadratic β€” safety-critical domains where low confidence tanks the score
def score_full(self, field, efficacy, confidence, curiosity, weights, metadata):
    if field == "surgery":
        # C=0.9 β†’ multiplier 0.81; C=0.5 β†’ multiplier 0.25
        return min(1.0, efficacy * (confidence ** 2) * (1 + 0.1 * curiosity))
    # all other fields: standard linear
    return (weights["w_e"]*efficacy + weights["w_c"]*confidence + weights["w_k"]*curiosity)
Multiplicative β€” all three dimensions must be high; one weak component collapses U
def score_full(self, field, efficacy, confidence, curiosity, weights, metadata):
    return min(1.0, efficacy * confidence * (1 + weights["w_k"] * curiosity))
Cobb-Douglas β€” diminishing returns; common in welfare economics
def score_full(self, field, efficacy, confidence, curiosity, weights, metadata):
    e = max(0.001, efficacy)
    c = max(0.001, confidence)
    k = max(0.001, curiosity)
    # w_e + w_c + w_k = 1.0 so this is a proper weighted geometric mean
    return min(1.0, (e ** weights["w_e"]) * (c ** weights["w_c"]) * (k ** weights["w_k"]))
Rawlsian min β€” score equals the weakest component (pessimistic welfare)
def score_full(self, field, efficacy, confidence, curiosity, weights, metadata):
    return min(efficacy, confidence)   # curiosity is excluded from the minimum
Threshold gate β€” confidence must exceed c_min or U near-zeroes
from aua.config import FIELD_CONFIGS

def score_full(self, field, efficacy, confidence, curiosity, weights, metadata):
    c_min = FIELD_CONFIGS.get(field, FIELD_CONFIGS["general"]).c_min
    if confidence < c_min:
        return confidence * 0.1   # near-zero: this answer should not reach the user
    return (weights["w_e"]*efficacy + weights["w_c"]*confidence + weights["w_k"]*curiosity)
Full class implementing both score() and score_full()
class SurgeryAwareScorer:
    def score(self, response, field, prior_u, confidence, metadata) -> float:
        return prior_u   # fallback: return built-in unchanged

    def score_full(self, field, efficacy, confidence, curiosity,
                   weights, metadata) -> float:
        if field == "surgery":
            return min(1.0, efficacy * (confidence ** 2))
        return (weights["w_e"]*efficacy
                + weights["w_c"]*confidence
                + weights["w_k"]*curiosity)
aua_config.yaml
plugins:
  full_utility_scorer:
    import_path: plugins.scoring:SurgeryAwareScorer

Fallback chain. If score_full() raises, AUA falls back to score(). If that raises, it uses the built-in linear U. Always implement score() returning prior_u unchanged as a safety net β€” it costs you nothing and guarantees traffic is never blocked by a scoring bug.

13.18 Arbiter policy plugin β€” replace the LLM arbitration call

When two specialists disagree (fanout routing), AUA calls the arbiter LLM to decide the winner. Replace this with a deterministic rule, a faster heuristic, or a domain-specific logic layer to cut latency or use stronger domain knowledge.

Return contract β€” what arbitrate() must return
def arbitrate(self, subject: str, domain: str,
              output_a: str, output_b: str, metadata: dict) -> dict:
    return {
        "winner":            "A",          # "A" | "B" | "BOTH_WRONG"
        "reason":            "A includes the time complexity proof",
        "external_response": "",           # shown to user only when BOTH_WRONG
        "case":              "case_1",     # optional metadata for DPO labelling
    }

metadata contains: domain_a, domain_b, specialist_a, specialist_b.

plugins/arbitration.py β€” response-length heuristic (no LLM, no latency)
class LengthHeuristicArbiter:
    """
    Pick the more detailed response. Fast, deterministic, zero-cost.
    Good baseline before you have enough DPO data to train a judge.
    """
    def arbitrate(self, subject, domain, output_a, output_b, metadata):
        winner = "A" if len(output_a) >= len(output_b) else "B"
        return {
            "winner":  winner,
            "reason":  f"{winner} is more complete ({len(output_a)} vs {len(output_b)} chars)",
            "external_response": "",
        }
aua_config.yaml
plugins:
  arbiter_policy:
    import_path: plugins.arbitration:LengthHeuristicArbiter

13.19 Promotion policy plugin β€” any function of any signal

The default promotion gate is green_u - blue_u >= threshold β€” a simple scalar comparison. Replace it when you need multi-factor gating, confidence intervals, minimum sample sizes, or non-linear combinations of the available signals.

Two modes β€” same pattern as the utility scorer:

Simple mode (promotion_policy) β€” receives pre-computed scalars. Good for straightforward rules.

Full context mode (full_promotion_policy) β€” receives the complete promotion context dict including raw shadow score rows, std dev of the delta distribution, regression results, and the full config objects. Use this for anything statistical.

Full context dict β€” every key explained

KeyTypeWhat it is
specialiststrSpecialist name
blue_u / green_ufloatMean U for BLUE (current production) and GREEN (candidate)
u_deltafloatgreen_u - blue_u
mean_deltafloatMean U_delta across all shadow queries (real traffic)
n_queriesintNumber of shadow/eval queries used
min_queriesintshadow_min_queries from config
thresholdfloatdelta from blue_green config
shadow_scoreslist[dict]Raw rows from ShadowStore β€” each has blue_u, green_u, u_delta, domain
shadow_std_deltafloatStd dev of U_delta across shadow queries β€” 0.0 when n < 2
regression_resultdict or NoneRegression gate output: regressed, delta_pass_rate, delta_u_score
dryboolTrue when no green_endpoint was available (dry-run scores only)
sourcestr"shadow (N queries)" | "synthetic eval" | "dry-run"
bg_configBlueGreenFieldConfigFull blue_green config for this specialist
plugins/promotion.py β€” three non-linear promotion functions
class CIGatePromoter:
    """Promote only when mean_delta > 2 standard deviations β€” statistically significant."""
    def should_promote(self, specialist, blue_mean_u, green_mean_u, n_queries, metadata):
        return True   # fallback if should_promote_full raises

    def should_promote_full(self, context: dict) -> bool:
        std = context["shadow_std_delta"]
        mean = context["mean_delta"]
        if std == 0:
            return mean > 0
        return mean > 2 * std


class AdaptiveThresholdPromoter:
    """Require a larger delta when we have fewer queries β€” conservative early, liberal later."""
    def should_promote(self, *a, **kw): return False
    def should_promote_full(self, context: dict) -> bool:
        n = context["n_queries"]
        adaptive = context["threshold"] + 0.5 / max(n, 1)
        return context["mean_delta"] >= adaptive


class MultiFactorGate:
    """All three conditions must pass: no regression, enough queries, positive delta."""
    def should_promote(self, *a, **kw): return False
    def should_promote_full(self, context: dict) -> bool:
        if context.get("regression_result") and context["regression_result"].get("regressed"):
            return False
        if context["n_queries"] < context["min_queries"]:
            return False
        return context["mean_delta"] >= context["threshold"]
aua_config.yaml
plugins:
  full_promotion_policy:
    import_path: plugins.promotion:MultiFactorGate

13.20 Multi-tenancy β€” isolated namespaces per tenant

AUA supports full per-tenant isolation: separate rate limits, field allowlists, model bindings, and namespaced writes across all four persistent tables (corrections, promotions, audit_log, model_runs each carry a tenant_id column). Isolation is enforced at the database level β€” one tenant's queries cannot interfere with another's corrections or promotion history.

aua_config.yaml β€” full multi-tenant configuration
middleware:
  - import_path: aua.middleware:TenantPolicyMiddleware
    config:
      reject_unknown: true     # 403 for any X-Tenant-ID not listed below
      tenants:
        tenant-a:
          allowed_fields: [software_engineering, mathematics]
          rate_limit_rpm: 60       # requests per minute for this tenant
          model_binding: swe       # force ALL queries to the swe specialist
        tenant-b:
          allowed_fields: [law, software_engineering]
          rate_limit_rpm: 120
          model_binding: null      # normal routing for tenant-b

Clients pass their tenant ID in the X-Tenant-ID HTTP header. The middleware runs before any routing β€” the tenant's allowlist and rate limit are enforced before the query reaches the field classifier.

Reading tenant context inside your own plugin
from aua.tenancy import get_tenant_id

class TenantScoringComponent:
    def compute(self, component: str, value: float, field: str, metadata: dict) -> float:
        tenant = get_tenant_id()   # None for anonymous / no middleware
        if tenant == "tenant-premium" and component == "curiosity":
            return min(1.0, value * 1.3)   # premium tenants get +30% curiosity
        return value
Querying tenant-scoped data
# Only tenant-a's corrections
rows = store.query("corrections", filters={"tenant_id": "tenant-a"})

# All anonymous queries (no tenant header)
rows = store.query("model_runs", filters={"tenant_id": None})

How-to 14 Hooks & middleware ~25 min

Hooks fire at 11 named points in the request pipeline. Each hook receives an event dict and returns a (possibly modified) event dict. Register one hook class per point, or one class for multiple points.

All 11 hook points are live in v1.1. Hooks are fail-open by default β€” if a hook errors or times out, the pipeline continues. Set fail_closed: true in YAML to abort the request on hook failure.

14.1 The minimal hook

plugins/my_hook.py β€” minimum viable
class MyHook:
    async def __call__(self, event: dict) -> dict:
        # event always has: "type" (hook point name), "session_id", "trace_id"
        # plus hook-specific fields documented below
        print(f"Hook fired: {event['type']}")
        return event  # always return the event dict (modified or unchanged)
aua_config.yaml β€” register
hooks:                                      # a LIST β€” one entry per registration
  - hook_point: on_correction
    import_path: plugins.my_hook:MyHook
    fail_closed: false   # fail-open: log error, continue (default)
    config: {}           # optional β€” splatted as constructor kwargs

14.2 All 11 hook points β€” event fields

Each hook receives a specific event dict. Here are the fields available at each point, shown as the dict your __call__ method receives:

pre_query β€” fires before field classification

Use for: PII scrubbing, request logging, query transformation, rate limiting per session.

event dict
event = {
    "type":                 "pre_query",         # always present
    "session_id":           "s_abc123",          # current session
    "trace_id":             "01HX...",           # W3C trace ID
    "query":                "Write binary search.",
    "conversation_history": [...],               # list of prior messages
    "force_domain":         None,                # str if routing forced, else None
}
# Modify event["query"] to transform the query before routing
post_route β€” fires after routing decision, before specialist calls

Use for: logging routing decisions, overriding the routing mode, alerting on unexpected domains.

event dict
event = {
    "type":                "post_route",
    "session_id":          "s_abc123",
    "trace_id":            "01HX...",
    "query":               "Write binary search.",
    "domain_distribution": {"software_engineering": 0.9, "mathematics": 0.1},
    "top_domain":          "software_engineering",
    "routing_mode":        "single",             # "single" | "fanout" | "arbiter"
    "active_specialists":  ["swe"],              # specialists involved
}
pre_specialist_call β€” fires before each specialist API call

Use for: per-specialist logging, injecting prompt context, circuit breakers. Fires once per specialist in fanout mode.

event dict
event = {
    "type":        "pre_specialist_call",
    "session_id":  "s_abc123",
    "trace_id":    "01HX...",
    "query":       "Write binary search.",
    "domain":      "software_engineering",
    "specialist":  "swe",                 # specialist name from config
    "model":       "qwen2.5-coder:7b",   # model being called
    "endpoint":    "http://localhost:11434",
}
post_specialist_call β€” fires after each specialist returns

Use for: response logging, latency tracking per specialist, content filtering before scoring. Fires once per specialist in fanout mode.

event dict
event = {
    "type":             "post_specialist_call",
    "session_id":       "s_abc123",
    "trace_id":         "01HX...",
    "domain":           "software_engineering",
    "specialist":       "swe",
    "response_preview": "Here is a binary search implementation...",  # first 200 chars
    "confidence":       0.823,            # base confidence before Kalman
}
pre_arbiter β€” fires before arbiter receives fanout responses

Use for: logging both specialist responses before arbitration, injecting human review, recording disagreements.

event dict
event = {
    "type":          "pre_arbiter",
    "session_id":    "s_abc123",
    "trace_id":      "01HX...",
    "query":         "What is heapsort complexity?",
    "specialist_a":  "swe",
    "response_a":    "O(n log n) worst-case...",   # first 200 chars
    "specialist_b":  "math",
    "response_b":    "O(n^2) in the worst case...", # first 200 chars
}
post_arbiter β€” fires after arbiter verdict issued

Use for: alerting on case_4 (both wrong), logging verdicts, triggering human review on contradictions.

event dict
event = {
    "type":          "post_arbiter",
    "session_id":    "s_abc123",
    "trace_id":      "01HX...",
    "verdict":       "case_2: specialist A is correct...",  # first 200 chars
    "winner_field":  "software_engineering",  # or "both_wrong"
    "specialist_a":  "swe",
    "specialist_b":  "math",
}
# Alert on contradictions:
# if event["winner_field"] == "both_wrong": β†’ trigger review
on_correction β€” fires when a correction is stored

Use for: Slack/webhook notifications when the system learns something new, audit logging, syncing to external knowledge bases. Fires in background β€” non-blocking.

event dict
event = {
    "type":        "on_correction",
    "session_id":  "",              # may be empty for manual corrections
    "trace_id":    "",
    "subject":     "heapsort_complexity",
    "domain":      "software_engineering",
    "claim":       "Heapsort is O(n log n) worst-case, O(1) extra space.",
    "confidence":  0.99,
    "decay_class": "A",             # A=permanent, B=slow, C=moderate, D=fast
    "source":      "manual",        # "manual" | "arbiter" | "cross_session"
}
Example β€” Slack notification hook
import httpx

class SlackNotificationHook:
    def __init__(self, webhook_url: str):
        self.webhook_url = webhook_url

    async def __call__(self, event: dict) -> dict:
        if event["type"] == "on_correction":
            msg = (f"*New correction stored*\n"
                   f"Domain: {event['domain']}\n"
                   f"Claim: {event['claim']}\n"
                   f"Confidence: {event['confidence']}")
            async with httpx.AsyncClient() as client:
                await client.post(self.webhook_url, json={"text": msg}, timeout=3.0)
        return event
pre_response β€” fires before response is sent to client

Use for: response transformation, content filtering, adding metadata, modifying the response text before it reaches the user. Modify event["response"] to change what the user sees.

event dict
event = {
    "type":          "pre_response",
    "session_id":    "s_abc123",
    "trace_id":      "01HX...",
    "domain":        "software_engineering",
    "routing_mode":  "single",
    "u_score":       0.731,
    "confidence":    0.823,
    "latency_ms":    312.4,
    "response":      "Here is the implementation...",  # modify this to change response
}
# Return modified event to change the response the user receives:
# event["response"] = event["response"] + "\n\n[Disclaimer: ...]"
post_response β€” fires after response is sent (background, non-blocking)

Use for: async analytics, usage tracking, long-running post-processing. Never blocks the response. Return value is ignored.

event dict
event = {
    "type":          "post_response",
    "session_id":    "s_abc123",
    "trace_id":      "01HX...",
    "domain":        "software_engineering",
    "routing_mode":  "single",
    "u_score":       0.731,
    "latency_ms":    312.4,
    "gold_standard": True,   # True if all INFO assertions fired
}
on_promotion β€” fires when GREEN promotes to BLUE (background)

Use for: deployment notifications, updating dashboards, triggering downstream systems.

event dict
event = {
    "type":           "on_promotion",
    "session_id":     "",
    "trace_id":       "",
    "specialist":     "swe",
    "promoted_from":  "qwen2.5-coder:7b",
    "promoted_to":    "qwen2.5-coder:14b-finetuned",
    "project_dir":    "/home/user/my-aua-project",
}
on_rollback β€” fires when a rollback completes (background)

Use for: incident alerts, updating dashboards, triggering post-rollback diagnostics.

event dict
event = {
    "type":               "on_rollback",
    "session_id":         "",
    "trace_id":           "",
    "specialist":         "swe",
    "rolled_back_from":   "qwen2.5-coder:14b-finetuned",
    "rolled_back_to":     "qwen2.5-coder:7b",
    "project_dir":        "/home/user/my-aua-project",
}

14.3 One hook for multiple points

plugins/analytics_hook.py
class AnalyticsHook:
    """Single hook that handles multiple pipeline points."""

    async def __call__(self, event: dict) -> dict:
        hook_type = event["type"]

        if hook_type == "pre_query":
            self._log_query(event["session_id"], event["query"])

        elif hook_type == "post_response":
            self._record_latency(event["domain"], event["latency_ms"], event["u_score"])

        elif hook_type == "on_correction":
            self._notify_team(event["domain"], event["claim"])

        elif hook_type == "on_promotion":
            self._update_dashboard(event["specialist"], event["promoted_to"])

        return event  # always return event

    def _log_query(self, session_id, query): ...
    def _record_latency(self, domain, ms, u): ...
    def _notify_team(self, domain, claim): ...
    def _update_dashboard(self, specialist, model): ...
aua_config.yaml β€” register one class on multiple points
hooks:
  - hook_point: pre_query
    import_path: plugins.analytics_hook:AnalyticsHook
  - hook_point: post_response
    import_path: plugins.analytics_hook:AnalyticsHook
  - hook_point: on_correction
    import_path: plugins.analytics_hook:AnalyticsHook
    fail_closed: false
  - hook_point: on_promotion
    import_path: plugins.analytics_hook:AnalyticsHook

Part 14 done. Part 15 covers production security β€” tokens, mTLS, secrets, and the audit log.

How-to 15 Security ~25 min

15.1 Bearer tokens and scopes

AUA uses HMAC-SHA256 bearer tokens with 15 fine-grained scopes. Auth is off by default (for local development); enable it in production with one config line:

aua_config.yaml β€” enable auth
security:
  auth_enabled: true                   # false by default β€” open in dev
  token_secret_env: AUA_TOKEN_SECRET   # env var holding your signing secret
  token_expiry_days: 30

# Generate a signing secret:
# python3 -c "import secrets; print(secrets.token_hex(32))"
# export AUA_TOKEN_SECRET=<output>

When auth_enabled: false (the default), AUA logs a WARNING on startup and all endpoints are open. Never run a public deployment with auth_enabled: false.

Create and manage tokens with the CLI:

bash
# Create a query-only token expiring in 30 days
aua token create --scope aua:query --expires 30d

# Create an admin token
aua token create --scope aua:admin --expires 7d

# List all tokens
aua token list

# Revoke a token
aua token revoke <token-id>
ScopeGrants access to
aua:queryPOST /query, POST /sessions/{id}/messages
aua:streamPOST /sessions/{id}/stream
aua:statusGET /status, GET /version, GET /health
aua:config:readGET /config (secrets redacted)
aua:config:writePOST /config/reload
aua:corrections:writePOST /corrections
aua:deployPOST /deploy/green
aua:rollbackPOST /deploy/rollback
aua:extensions:writePOST /extensions, POST /extensions/reload
aua:adminAll scopes
bash β€” use a token
export AUA_TOKEN="aua_tk_..."

curl -X POST http://localhost:8000/query \
  -H "Authorization: Bearer $AUA_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{"query": "Explain quicksort", "session_id": "s1"}'

15.2 mTLS β€” encrypted communication

AUA supports TLS (server-side) and mutual TLS (both sides). Configure cert paths in security.mtls:

aua_config.yaml β€” TLS (server-only)
security:
  mtls:
    key_file: certs/server.key      # or keyfile:
    cert_file: certs/server.crt     # or certfile:
aua_config.yaml β€” mutual TLS (require client certs)
security:
  mtls:
    key_file: certs/server.key
    cert_file: certs/server.crt
    ca_file: certs/ca.crt           # presence triggers CERT_REQUIRED
bash β€” generate dev certs
# Generate self-signed dev certs
aua certs generate

# Inspect cert details
aua certs inspect

When key_file and cert_file are set, AUA passes them directly to uvicorn's SSL arguments. If ca_file is also set, client certificates are required on every connection (mutual TLS). AUA logs which certs are active at startup.

15.3 Secrets management

AUA never stores plaintext secrets in config. Instead, config references a secret name, and the secrets provider resolves it at startup:

aua_config.yaml
secrets:
  provider: env            # "env" (default) | "vault" | "aws" | "gcp"

specialists:
  - name: swe
    api_key_secret: SWE_API_KEY    # reads env var SWE_API_KEY
aua_config.yaml β€” HashiCorp Vault (KV v2)
secrets:
  provider: vault
  url: https://vault.internal:8200
  token_env: VAULT_TOKEN     # env var holding the Vault token
aua_config.yaml β€” AWS Secrets Manager
secrets:
  provider: aws
  region: us-east-1

Resolution order is always environment variable first, then the configured provider; values are cached for the process lifetime and never appear in logs, repr(), or GET /config. Vault secrets are read from the KV v2 path matching the secret name, taking the value key (or a key named after the secret). Both providers are covered by live integration tests in CI β€” a wire-faithful Vault KV v2 server driven by the real hvac client, and moto intercepting the real boto3 client (tests/test_secrets_live.py).

15.4 Encryption at rest

Correction payloads, assertions, DPO pairs, token metadata, and sensitive audit fields are encrypted at rest with AES-256-GCM:

aua_config.yaml
security:
  encryption:
    enabled: true
    key_secret: AUA_ENCRYPTION_KEY   # 64-char hex key β€” see Β§14.3 for generation

15.5 Audit log

The audit log is append-only with a tamper-evident SHA-256 hash chain. Every security-relevant event is recorded:

bash
# View recent audit events (written to .aua/audit.log)
tail -f .aua/audit.log

# Export corrections (machine-readable audit trail)
aua corrections export --format jsonl

15.6 Session, trace & request IDs (#15)

Every HTTP request gets three IDs: session_id (logical conversation β€” persistent across queries), trace_id (one per request, W3C-compatible 48-hex format, used by OTEL/audit/logs), and request_id (one per HTTP request, never reused). Client-supplied IDs are honored via the X-Session-ID, X-Trace-ID (or traceparent), and X-Request-ID headers; UUIDs are generated otherwise. All three come back as headers on every response, and /query additionally echoes them in the body.

bash
# No session_id supplied β†’ a UUID is generated and echoed back.
# Adopt it for the rest of the conversation:
curl -si -X POST localhost:8000/query -H "Content-Type: application/json" \
  -d '{"query": "Write binary search in Python."}' | grep -iE "x-(session|trace|request)-id"

# Pin your own session across requests (body session_id wins over the header)
curl -s -X POST localhost:8000/query -H "Content-Type: application/json" \
  -d '{"query": "Now make it iterative.", "session_id": "user_42"}' \
  | jq '{session_id, trace_id, request_id}'

The context propagates everywhere: downstream specialist and arbiter calls carry the three headers, every hook payload includes them, the audit log records them per event, and structured logs attach them automatically β€” so one trace_id stitches a query across the router, the winning specialist, the arbiter verdict, and the audit chain.

Production checklist: aua doctor --strict validates that if cors_origins is * and the host is 0.0.0.0 and auth is disabled, a loud warning is emitted. Use the Team Server or Enterprise deployment profile to enforce auth + mTLS requirements.

Part 15 done. Part 16 covers observability β€” Prometheus, Grafana, OTEL, and structured logging.

How-to 16 Observability ~25 min

AUA emits three observability streams out of the box: structured JSON logs (every query, assertion, and error), Prometheus metrics (18 gauges/counters/histograms), and optional OpenTelemetry distributed traces. All three are designed to ship directly to ELK, Splunk, Grafana, or any OTEL-compatible backend β€” no code changes required.

16.1 Structured JSON logging

Every log line the framework emits is a single-line JSON object. Every line automatically includes the current request's session_id, trace_id, and request_id β€” so a Kibana or Splunk search on a session ID returns the complete picture of everything that happened in that request.

aua_config.yaml
logging:
  level: INFO          # DEBUG | INFO | WARNING | ERROR
  format: json         # "json" (default) | "text" (human-readable dev mode)
  output: stdout       # "stdout" | "stderr" | "/var/log/aua/router.log"
Example log output β€” one query
{"ts":1747000000.12,"level":"INFO","logger":"aua.router","msg":"single→software_engineering  U=0.731","session_id":"s_abc123","trace_id":"01HX...","field":"software_engineering","routing_mode":"single","latency_ms":312.4,"utility_score":0.731,"confidence":0.823}
{"ts":1747000000.43,"level":"INFO","logger":"aua.router","msg":"Query routed","session_id":"s_abc123","trace_id":"01HX...","domain":"software_engineering","u_score":0.731,"latency_ms":315.1}

Fields included in every structured log line:

FieldDescription
tsUnix timestamp (float)
levelDEBUG / INFO / WARNING / ERROR
loggerModule name (aua.router, aua.arbiter, aua.auth, ...)
session_idChat session identifier β€” auto-injected from request context
trace_idW3C-compatible trace ID β€” links to OTEL spans if enabled
request_idPer-request unique ID
fieldRouted domain (software_engineering, mathematics, ...)
specialistSpecialist name that handled the query
routing_modesingle / fanout / arbiter
utility_scoreFinal U score for this response
confidenceKalman-filtered confidence estimate
latency_msEnd-to-end latency in milliseconds
error_codeHTTP status on errors
verdictArbiter verdict case (A/B/C/D) when arbiter fires

16.2 Shipping logs to ELK (Elasticsearch / Kibana)

AUA's JSON output is Filebeat-native. No parsing config needed β€” all fields are already top-level JSON keys that become indexed Elasticsearch fields automatically.

Step 1 β€” write logs to file
logging:
  format: json
  output: /var/log/aua/router.log    # Filebeat monitors this path
Step 2 β€” filebeat.yml
filebeat.inputs:
  - type: log
    paths: ["/var/log/aua/router.log"]
    json.keys_under_root: true     # promote JSON fields to top-level
    json.add_error_key: true

processors:
  - timestamp:
      field: ts
      layouts: ["UNIX"]
      target_field: "@timestamp"

output.elasticsearch:
  hosts: ["https://your-elastic:9200"]
  index: "aua-logs-%{+yyyy.MM.dd}"
  api_key: "your-api-key"
Step 3 β€” useful Kibana queries
# All failed assertions in the last 24h
logger: "aua.router" AND level: "WARNING" AND msg: "assertion"

# Low U-score sessions (worth reviewing)
utility_score < 0.4

# All events for a specific session
session_id: "s_abc123"

# High latency queries
latency_ms > 5000

# Authentication failures
logger: "aua.auth" AND level: "WARNING"

# Arbiter fired
routing_mode: "arbiter" AND verdict: *

Logstash pipeline alternative. If you're using Logstash instead of Filebeat, pipe aua serve stdout directly: aua serve 2>&1 | logstash -f aua.conf. In the pipeline filter: json { source => "message" } then date { match => ["ts", "UNIX"] }. The JSON structure needs no grok patterns.

16.3 Shipping logs to Splunk

Two options depending on your Splunk setup:

Option A β€” Universal Forwarder (file-based)

inputs.conf
[monitor:///var/log/aua/router.log]
index = aua
sourcetype = aua_json
props.conf
[aua_json]
KV_MODE = json
TIME_FORMAT = %s%3N
TIME_PREFIX = "ts":
MAX_TIMESTAMP_LOOKAHEAD = 20

Option B β€” HTTP Event Collector (HEC, no file needed)

bash β€” install Splunk handler
pip install splunk-handler
Python β€” add to your startup script or hook
from splunk_handler import SplunkHandler
import logging

logging.getLogger("aua").addHandler(
    SplunkHandler(
        host="splunk.yourcompany.com",
        port=8088,
        token="your-hec-token",
        index="aua",
        sourcetype="aua_json",
    )
)
Useful Splunk searches
# Failed assertions over time
index=aua sourcetype=aua_json logger="aua.router" "assertion"
| timechart count by assertion_name

# U-score trend per domain
index=aua sourcetype=aua_json utility_score=*
| timechart avg(utility_score) by field

# P95 latency by routing mode
index=aua sourcetype=aua_json latency_ms=*
| stats perc95(latency_ms) by routing_mode

16.4 Prometheus metrics

bash
curl http://localhost:8000/metrics | head -30
MetricTypeWhat it measures
aua_queries_totalCounterTotal queries by field, routing mode, status
aua_query_latency_secondsHistogramLatency (p50/p95/p99)
aua_utility_scoreGaugeLast U score per domain
aua_contradiction_rateGaugeContradiction rate per domain
aua_routing_field_distributionCounterQuery distribution across fields
aua_specialist_confidenceGaugeConfidence per specialist
aua_correction_countCounterCorrections accumulated
aua_arbiter_verdict_distributionCounterCase 1/2/3/4 breakdown
aua_dpo_pairs_accumulatedGaugeTotal DPO pairs in store
aua_token_requests_totalCounterToken-gated requests by scope
aua_hook_failures_totalCounterHook execution failures
aua_plugin_execution_secondsHistogramPlugin latency
aua_specialist_vram_utilizationGaugeGPU VRAM % per specialist
aua_cost_gpu_hours_totalCounterCumulative GPU hours per specialist
aua_cost_usd_totalCounterCumulative USD cost per specialist
aua_assertion_results_totalCounterAssertion pass/fail by name, level, domain
aua_assertion_retries_totalCounterBLOCKING assertion retry count
aua_assertion_bonus_appliedHistogramE-score bonus applied by INFO assertions

16.5 Cost tracking

bash
curl http://localhost:8000/metrics/cost | python3 -m json.tool
Response
{
  "swe":  {"queries": 42, "gpu_hours": 0.012, "cost_usd": 0.0083},
  "math": {"queries": 18, "gpu_hours": 0.005, "cost_usd": 0.0034},
  "total_cost_usd": 0.0117
}

16.6 Grafana dashboard

bash β€” start with observability profile
docker compose --profile obs up

# Grafana at http://localhost:3000 (admin / aua-admin)
# Dashboard pre-loaded: 20 panels covering query volume, latency p50/p95/p99,
# routing distribution, U score trends, contradiction rate, arbiter verdicts,
# specialist health, VRAM usage, blue-green split, assertion fail rate,
# DPO pairs accumulated, auth failures, cost per specialist

16.7 OpenTelemetry β€” distributed traces

Optional. Sends full request traces to Jaeger, Tempo, Elastic APM, Splunk Observability, or any OTLP-compatible backend. Each trace covers the complete request path: router β†’ classifier β†’ routing decision β†’ specialist calls β†’ utility scoring β†’ arbiter β†’ hooks β†’ policy assertions β†’ response.

bash
pip install "adaptive-utility-agent[otel]"
aua_config.yaml β€” OTEL to Jaeger/Tempo
observability:
  otel:
    enabled: true
    endpoint: http://localhost:4317   # OTLP gRPC collector
    service_name: aua-router
aua_config.yaml β€” OTEL to Splunk Observability Cloud
observability:
  otel:
    enabled: true
    endpoint: https://ingest.us1.signalfx.com:443
    service_name: aua-router
    headers:
      X-SF-Token: "your-splunk-o11y-token"

Log + trace correlation. The trace_id in every JSON log line is W3C-compatible. When OTEL is enabled, clicking a log line in Kibana or Splunk and following its trace_id jumps directly to the corresponding distributed trace in Jaeger or Elastic APM β€” showing the exact specialist calls, latencies, and assertion checks for that request.

16.8 Structured logging in Docker / Kubernetes

docker-compose.yml β€” ship stdout to Loki via Grafana Alloy
services:
  aua-router:
    logging:
      driver: json-file    # Docker captures stdout as JSON
    labels:
      logging: "aua"

  alloy:
    image: grafana/alloy
    volumes:
      - /var/lib/docker/containers:/var/lib/docker/containers:ro
    # Alloy β†’ Loki β†’ Grafana: full log + metric correlation
Kubernetes β€” fluent-bit to Elasticsearch
# fluent-bit ConfigMap snippet
[INPUT]
    Name              tail
    Path              /var/log/containers/aua-*.log
    Parser            json
    Tag               aua.*

[OUTPUT]
    Name              es
    Match             aua.*
    Host              elasticsearch.logging.svc
    Index             aua-logs
    Type              _doc

Part 16 done. Part 17 covers Docker deployment profiles.

How-to 17 Docker deployment ~20 min

17.1 Docker Compose profiles

All examples use the modern docker compose (V2) command. If your system only has the legacy binary, replace with docker-compose.

bash
# CPU / Ollama
docker compose up

# GPU / vLLM (requires NVIDIA runtime)
docker compose --profile gpu up

# + Prometheus and Grafana
docker compose --profile obs up

# Full local stack (Ollama + observability)
docker compose --profile ollama --profile obs up

# GPU (Linux + NVIDIA) β€” uses separate compose file
docker compose -f docker compose.gpu.yml up

17.2 Deployment profiles

ProfileAuthmTLSStateUse for
Local DeveloperOptionalNoSQLitelocalhost only
Single GPU WorkstationRecommendedNoSQLiteOne-machine GPU server
Team ServerRequiredRequiredPostgresShared team deployment
EnterpriseIAM + scopesRequiredPostgresCustom backends, strict audit

17.3 Environment configuration

Generate your encryption key before deploying β€” it must be a 32-byte value encoded as 64 hex characters. Run either command once and store the output:

bash β€” generate AUA_ENCRYPTION_KEY
# Option 1 β€” Python (no extra dependencies)
python3 -c "import os; print(os.urandom(32).hex())"

# Option 2 β€” OpenSSL
openssl rand -hex 32

# Either prints a 64-character hex string, e.g.:
# a3f2c1e8b7d4509261af3e2c84b19d07f6a5c3e1b8294d6072f1e3a5c8b2d490

Keep this value secret and never commit it to version control. Rotate it by generating a new key, re-encrypting state, and restarting. Encryption uses AES-256-GCM; the key is loaded at startup from the named environment variable.

.env
AUA_ENCRYPTION_KEY=<64-char hex string from above>
AUA_ADMIN_TOKEN=aua_tk_...
SWE_API_KEY=...
POSTGRES_URL=postgresql://aua:password@db:5432/aua_state
aua_config.yaml β€” Team Server profile
security:
  mtls: {enabled: true, cert_dir: /certs, auto_generate: false}
  encryption: {enabled: true, key_secret: AUA_ENCRYPTION_KEY}
  cors_origins: ["https://your-domain.com"]

state:
  backend: postgres
  url_secret: POSTGRES_URL

audit:
  enabled: true
  hash_chain: true

How-to 18 Persistence, search & operations toolkit ~70 min

Everything in this part was battle-tested in AUA-Veritas β€” a macOS desktop assistant built on this framework β€” and backported in v1.1. It turns the router from a stateless query engine into a product backend: durable conversations, full-text search, automatic context handoffs, a correction lifecycle, analytics, and self-maintenance jobs. All of it ships in the default aua serve β€” nothing extra to enable.

18.1 Conversations, messages & projects

The state store (SQLite by default, .aua/state/aua.db) persists conversations and messages independently of the chat-session API. Projects group conversations; project_id=NULL means "All chats".

bash
# Create a project, then a conversation inside it
PROJECT=$(curl -s -X POST localhost:8000/projects -H "Content-Type: application/json" \
  -d '{"name": "Q3 Research"}' | jq -r .project_id)

CONV=$(curl -s -X POST localhost:8000/conversations -H "Content-Type: application/json" \
  -d "{\"title\": \"Vector DB eval\", \"project_id\": \"$PROJECT\"}" | jq -r .conversation_id)

# Append messages (role: user | assistant)
curl -s -X POST localhost:8000/conversations/$CONV/messages \
  -H "Content-Type: application/json" \
  -d '{"role": "user", "content": "Compare pgvector and Qdrant for 10M embeddings"}'

# Paginated reads β€” newest page by default, timestamp cursors for history
curl -s "localhost:8000/conversations/$CONV/messages?limit=50"
curl -s "localhost:8000/conversations/$CONV/messages?before=1765432100.5&limit=50"

# Filter the sidebar to one project; omit project_id for all chats
curl -s "localhost:8000/conversations?project_id=$PROJECT"

Cache rule (learned in production): first-page reads at the default limit≥50 are served from a true-LRU MessageCache. Any non-default limit bypasses the cache and hits the DB with the actual limit β€” otherwise ?limit=1 would silently return the whole cached page and break pagination.

18.2 Full-text keyword search

Every message write is keyword-indexed by a background worker (50 ms batches, never on the response path) into an in-memory inverted index with O(log n) prefix matching. Search is message-level β€” the Cmd+F model: one result per matching message, with AND semantics for multi-word queries. On restart, a startup backfill re-indexes anything the worker didn't flush.

bash
curl -s "localhost:8000/search?q=pgvector+embeddings" | jq
Response β€” one entry per matching message, newest first
[
  {
    "conversation_id":  "7f3a…",
    "title":            "Vector DB eval",
    "message_id":       "c91d…",
    "match_message_id": "c91d…",
    "match_message_ts": 1765432101.7
  }
]

Extraction is pure Python (~8 Β΅s/message, no spaCy): stopword filtering, CamelCase/snake_case identifiers, years and multi-digit numbers β€” so fire_and_forget, MessageCache, and 2026 are all searchable. Prefix search means kuber finds kubernetes.

18.3 Context backups β€” automatic session handoffs

Long conversations exhaust context windows. AUA keeps a per-(specialist, conversation) token counter and, when a trigger fires, asks the specialist to write a structured handoff note that a fresh window can resume from. Triggers: token_threshold (70% of the context window β€” bumps the thread number), message_count (every 30 messages), and time_gap (returning after 24h+ with ≥5 messages).

The backup prompt forces six sections β€” GOAL Β· DECISIONS MADE Β· CURRENT STATUS Β· ACTIVE FILE / CODE CONTEXT Β· USER PREFERENCES LEARNED Β· RESUME INSTRUCTION β€” capped at 900 tokens. Two production rules are baked in: the generator always reads the last 60 messages from the database, never the slice the client happened to send; and a 6-hour coverage job sweeps for any conversation whose newest backup is older than its newest message (backup valid ⇔ MAX(backup.created_at) > MAX(messages.created_at)), regenerating stale ones at 1/s.

bash
# Which conversations need a backup for this specialist?
curl -s "localhost:8000/context/backup/coverage?specialist=swe" | jq

# Don't wait 6 hours β€” sweep now (runs in the background)
curl -s -X POST "localhost:8000/context/backup/run-coverage-job?specialist=swe"

18.4 The correction lifecycle

Part 4 covered arbiter-driven corrections. v1.1 adds the full user-facing lifecycle.

Explicit prefix. A message starting with correction: is a preference statement and is stored immediately β€” even as the first message of a conversation, with no prior AI turn. (In Veritas, an early-return guard silently discarded these for months.)

bash
SESSION=$(curl -s -X POST localhost:8000/sessions -H "Content-Type: application/json" -d '{}' | jq -r .id)
curl -s -X POST localhost:8000/sessions/$SESSION/messages \
  -H "Content-Type: application/json" \
  -d '{"content": "correction: always use ISO-8601 dates"}' | jq .correction_stored
# β†’ a correction_id β€” stored at confidence 0.85, source=explicit_prefix

Implicit detection. A two-layer trigger detector (regex layer ships with the framework; plug a classifier into TriggerDetector(layer2=…) for semantic catches) flags replies like "no, that's wrong β€” use UTC" after an AI turn. Instead of asking the user to re-type their intent, the response carries implicit_correction_pending: true and your UI shows Accept / Reject:

bash
curl -s -X POST localhost:8000/corrections/confirm-implicit \
  -H "Content-Type: application/json" \
  -d "{\"conversation_id\": \"$SESSION\", \"action\": \"accept\"}"
# accept β†’ stored (confidence 0.75, source=implicit_confirmed); reject β†’ discarded

CRUD + evidence. Every stored correction has a persistent ID and an append-only event history (created β†’ applied β†’ edited β†’ superseded):

bash
CID=$(curl -s -X POST localhost:8000/corrections -H "Content-Type: application/json" \
  -d '{"subject":"timezones","domain":"general","claim":"Store timestamps in UTC","confidence":0.9}' \
  | jq -r .correction_id)

curl -s -X PATCH localhost:8000/corrections/$CID \
  -H "Content-Type: application/json" -d '{"claim": "Store and display in UTC"}'

curl -s "localhost:8000/corrections/evidence?correction_id=$CID" | jq

# Soft delete β€” sets scope='superseded'; the row stays for audit,
# but is excluded from retrieval and prompt injection
curl -s -X DELETE localhost:8000/corrections/$CID

Reviewer findings surfaced. When fanout routing runs the arbiter, its REASON:/CORRECTION: sections are no longer discarded after the verdict β€” they come back on the response as review_notes so the client can show why an answer won.

18.5 Analytics, reliability, usage & pricing

Four read-only endpoints power a "look under the hood" dashboard, all computed from model_runs:

EndpointReturns
GET /analyticsPer-specialist run/win stats, confidence distribution (high ≥ 0.75 / medium ≥ 0.50 / uncertain), active-correction stats by domain, domain distribution, VCG welfare summary.
GET /reliabilityPer-specialist win rate plus the last-20-run welfare trajectory and trend (up/down/flat) β€” sparkline-ready.
GET /usageQuery counts and estimated cost per specialist.
GET /pricingPer-specialist token pricing from the live model registry (self-hosted models cost 0).

18.6 Self-maintenance: updates, crashes, bug reports, remote config

Update management. GET /version/check compares the running version against the latest GitHub release (graceful when offline). POST /update/skip {"version": "1.2.0"} persists a skipped version so the banner stays hidden; show_banner in the check response already accounts for it.

Crash reporting. Startup writes a running sentinel; clean shutdown marks it clean. A sentinel still running at the next startup means the previous session crashed β€” it's reported automatically (and queued runtime errors from pending_error_reports are flushed). Detection runs before the new sentinel is written, so a session never reports itself.

Bug reports. POST /bug-report assembles a structured report (platform fingerprint, log tails, opt-in last messages) and pushes it to a GitHub repo via the Contents API. Configure with AUA_BUGS_REPO (owner/repo) and a write-only AUA_BUGS_PAT. Without a PAT it returns 200 {"ok": false} β€” bug reporting is never itself a source of 500s.

Remote model config. Model aliases, pricing, and context windows refresh from a remote models.json (default: this site; override with AUA_REMOTE_MODELS_URL) at startup and every 24h, with a three-level fallback: remote β†’ DB cache (kept 7 days) β†’ built-in registry. Remote may update display names, costs, windows, and add aliases for known providers β€” never backend or provider, which map to code. A model_id_renames map lets retired upstream IDs hot-swap without a release.

18.7 Local model management

Register Ollama-class local models, tag them as specialists for domain nodes, and persist UI settings:

bash
curl -s -X POST localhost:8000/local/models -H "Content-Type: application/json" \
  -d '{"local_model_id": "qwen3:8b", "nickname": "Qwen 8B", "base_url": "http://localhost:11434"}'

curl -s -X PATCH localhost:8000/local/specialist/qwen3:8b \
  -H "Content-Type: application/json" \
  -d '{"specialist_domain": "software_engineering", "specialist_depth": 1}'

curl -s localhost:8000/local/models | jq
curl -s -X POST localhost:8000/local/settings -H "Content-Type: application/json" \
  -d '{"auto_discover": true}'

18.8 The dynamic domain ontology

Field classification starts from 10 fixed L0 roots (software_engineering, mathematics, research, law, medicine, finance, writing, analysis, history, general) and grows from specialist self-reports. A raw domain string resolves in two stages β€” O(1) alias map, then Levenshtein similarity. Above 0.80 similarity it becomes an alias; below, it enters a candidate queue.

An hourly ontology job promotes a candidate to a real node only when all four gates pass: volume (≥5 distinct queries), diversity (≥2 distinct specialists reported it), coverage (still unresolved by the alias map), and divergence (mean per-specialist win-rate difference vs. the nearest node exceeds Ξ΄(d) = 0.10 + 0.05Β·depth). Low-evidence candidates are pruned after 30 days.

bash
curl -s localhost:8000/domain-tree | jq '{nodes: .nodes | length, candidates: .candidates[:3]}'

Why this matters for routing: each promoted node gets its own effective-utility cell, so "compiler design" queries can route to a different winner than generic "software engineering" once the evidence says the models genuinely diverge there.


The sections below cover operational features added in v1.2.

This How-to covers every operational feature added in v1.2: aua test, aua loadtest, batch inference, model registry and version pinning, shadow mode, regression gating, experiment tracking, the empirical arbiter check, and hardware deployment tiers. These features are independent β€” read the sections that apply to your current task.

18.9 aua test β€” built-in integration test suite

aua test runs fixture datasets against a live router and reports pass/fail per case. It is the fastest way to verify routing, utility scoring, and specialist responses are working after a deployment, config change, or model swap.

The three built-in suites
aua test                        # smoke β€” 6 cases, under 60 seconds (default)
aua test --suite full           # full  β€” 15 cases, 3-10 minutes
aua test --suite routing        # routing β€” 9 cases, all with expected_domain checks

smoke: Run after every deployment. Checks basic routing and non-empty responses for software_engineering, mathematics, and general. Designed to finish in under 60 seconds on any hardware.

full: Run before promoting a GREEN candidate. Covers edge cases, routing boundary queries (queries that could belong to two domains), and refusal detection (confirms the router does not say "I cannot help").

routing: Run after changing field weights, swapping the classifier, or registering a routing_strategy plugin. Every case in this suite has an expected_domain property β€” a failure means the classifier changed behavior on that query type.

Key flags
aua test --url http://prod:8000       # target a non-local router
aua test --case swe_binary_search     # run one specific case (repeatable)
aua test --dataset my_cases.yaml      # custom fixture file (same format as built-ins)
aua test --suite full --json --output report.json
aua test --no-liveness                # skip the pre-flight GET /health/live check
Custom fixture file format β€” my_cases.yaml
name: my_coding_suite
description: "Regression suite for the swe specialist after model update"
cases:
  - id: sort_complexity
    prompt: "Write bubble sort in Python and state its O complexity."
    expected_properties:
      - contains: "def "
      - contains_any: ["O(n^2)", "n squared", "n*n", "quadratic"]
      - min_length: 100
      - expected_domain: software_engineering
      - not_contains: "I cannot"
      - not_contains: "I'm unable"

  - id: math_domain_check
    prompt: "Solve the quadratic equation x^2 - 5x + 6 = 0."
    expected_properties:
      - contains_any: ["x = 2", "x = 3", "roots are"]
      - min_length: 20
      - expected_domain: mathematics

Property types you can use: contains (substring, case-insensitive), contains_any (list of substrings β€” at least one must match), not_contains, min_length (character count), expected_domain (exact match), expected_domain_any (list β€” any match).

Exit code 0 on all pass, exit code 1 on any failure. Use --json to get machine-readable output for CI pipelines.

18.10 aua loadtest β€” latency and throughput benchmarking

aua loadtest fires concurrent POST /query requests and reports the full latency distribution. Run it before going to production, after scaling hardware, or whenever you suspect a latency regression after a model or config change.

Common invocations
aua loadtest                              # 10 workers, 30 s, smoke query mix
aua loadtest -c 20 -d 60                 # 20 concurrent workers, 60 seconds
aua loadtest --suite full -c 5           # full fixture suite as query mix
aua loadtest --dataset my_queries.yaml   # custom query mix from a fixture file
aua loadtest --ramp 10 -c 20 -d 120     # ramp from 0 β†’ 20 workers over 10 s, run 120 s
aua loadtest --think-ms 200 -c 10       # 200 ms pause between requests per worker
aua loadtest --json --output bench.json  # machine-readable output for CI
aua loadtest --url http://prod:8000      # target production router
Example output
βœ“ Router live at http://localhost:8000
Starting load test β€” workers=10 duration=30s queries=smoke

aua loadtest  150 requests in 30.1s   148 ok / 2 errors (1.3% error rate)

  p50     p95     p99     mean    min    max     RPS    mean U
  312ms   890ms   1.2s    425ms   98ms   1.4s    4.9    0.741

Routing: single: 148

βœ“ Load test passed (error rate 1.3% < 5%)

The test exits 0 when error rate < 5%, exits 1 otherwise β€” suitable for CI gates. Watch the mean U score: a drop in mean U without a matching latency drop usually means routing quality degraded (e.g. more queries hitting the arbiter fallback instead of a confident single specialist).

Flags reference
-c / --concurrency  INT    simultaneous in-flight requests (default: 10)
-d / --duration     FLOAT  wall-clock test duration in seconds (default: 30)
--ramp              FLOAT  seconds to linearly ramp up workers (default: 0)
--think-ms          FLOAT  pause between requests per worker (default: 0, continuous)
--timeout           FLOAT  per-request timeout in seconds (default: 60)
--suite             STR    smoke | full | routing (default: smoke)
--dataset / -d      PATH   custom YAML fixture file (overrides --suite)
--json                     emit full JSON report to stdout
--output / -o       PATH   save JSON report to file
--no-liveness              skip GET /health/live pre-flight check
--url               URL    router URL (default: http://localhost:8000)

18.11 Persistent batch queue β€” /batch/jobs

The batch queue lets you submit hundreds of queries asynchronously and poll for results as they complete. Jobs survive server restarts (results live in SQLite), support three priority lanes, and expose partial results before a batch finishes.

Submit a batch and poll for results
# POST /batch/jobs β€” returns immediately
curl -X POST http://localhost:8000/batch/jobs \
  -H "Content-Type: application/json" \
  -d '{
    "queries": ["Write binary search.", "Implement merge sort.", "Explain quicksort."],
    "priority": "high",
    "max_parallel": 4
  }'
# β†’ {"job_id": "b3c1a2...", "status": "pending", "n_queries": 3}

# GET /batch/jobs/{id} β€” partial results available immediately as queries complete
curl http://localhost:8000/batch/jobs/b3c1a2...
# β†’ {"status": "running", "n_done": 2, "n_pending": 1,
#    "results": [{"response": "def binary_search...", "u_score": 0.78}, ...]}

# GET /batch/jobs β€” list recent jobs
curl "http://localhost:8000/batch/jobs?status=done&limit=10"

Priority lanes: "high" dispatches before "normal" before "low". Within a lane, FIFO by submission time.

Restart recovery: When the server restarts, interrupted "running" jobs are automatically reset to "pending" and will be reprocessed. Completed results are never lost.

Partial results: You can poll at any time. Completed items are returned even while the batch is still processing. A status of "running" with n_pending: 0 means all items are done but the job record has not yet been marked "done" β€” poll once more.

18.12 Model registry and version pinning

AUA downloads models automatically when you run aua serve. For production deployments, you almost certainly want to pin to a specific revision so a model update on HuggingFace Hub doesn't silently change your system's behavior.

Version pinning syntax in aua_config.yaml
specialists:
  - name: swe
    model: Qwen/Qwen2.5-7B-Instruct            # latest (dev/prototyping only)
    model: Qwen/Qwen2.5-7B-Instruct@v0.3       # pin to a branch or tag
    model: Qwen/Qwen2.5-7B-Instruct@sha256:abc # pin to exact commit (production)
    model: models:/my-specialist/Production     # MLflow model registry stage
    model: models:/my-specialist/3              # MLflow specific version number

    # For MLflow URIs, also add:
    mlflow_tracking_uri: http://mlflow:5000

For gated models (Llama, Gemma), set HF_TOKEN in your environment and accept the license on HuggingFace. AUA checks disk space before downloading β€” it warns if less than 10 GB is free.

aua models pin β€” discover and pin revisions
# List all branches and tags for a HuggingFace repo
aua models pin Qwen/Qwen2.5-7B-Instruct

# Get the exact config snippet to paste into aua_config.yaml
aua models pin Qwen/Qwen2.5-7B-Instruct --revision v0.3
# Output: model: Qwen/Qwen2.5-7B-Instruct@v0.3

# List MLflow registered versions
aua models pin swe --mlflow-uri models:/my-specialist --mlflow-tracking-uri http://mlflow:5000

# Skip downloads entirely (air-gapped or pre-cached setup)
aua serve --no-download

Production pinning recommendation. Use @sha256:<commit> in production β€” it is immutable. Branch names like @main can be silently updated by the model author. aua models pin Qwen/Qwen2.5-7B --revision main will show you the current commit hash under "main" so you can hardcode it.

18.13 Experiment tracking β€” MLflow and W&B

AUA logs per-query metrics automatically after every response β€” no code changes needed. Add the experiment_tracking: block to your config and the router starts writing to MLflow or W&B.

aua_config.yaml β€” full experiment tracking configuration
experiment_tracking:
  enabled: true

  mlflow:
    enabled: true
    tracking_uri: http://localhost:5000   # or file:///absolute/path/to/mlruns
    experiment_name: aua-production       # created automatically if it doesn't exist
    run_name: router-v1.2                 # optional; auto-named if omitted
    log_artifacts: false                  # set true to also log response text

  wandb:
    enabled: true
    project: aua-framework
    entity: my-team                       # optional; uses your W&B default entity
    run_name: production-run              # optional
    tags: [production, v1.2]

Metrics logged per query: u_score, confidence, latency_ms, contradictions_detected, corrections_injected, dpo_pairs_generated.

Tags logged per query (for filtering in the UI): routing_mode, primary_domain, specialist, session_id, trace_id.

Both backends are lazy-loaded β€” if mlflow or wandb is not installed, a warning is logged and that backend silently disables. pip install mlflow or pip install wandb is all you need. Neither is a required dependency of AUA.

18.14 Shadow mode β€” real-traffic evaluation before promotion

Shadow mode lets a GREEN (candidate) model receive real production traffic silently. The user always gets BLUE's (production) response. GREEN's response is evaluated and the (blue_u, green_u) pair is written to the shadow_scores table. Once enough queries accumulate, you have real-traffic evidence for the promotion decision β€” not just a synthetic eval.

Option A β€” static config (shadow starts on serve)
blue_green:
  swe:
    delta: 0.025
    shadow_endpoint: http://localhost:9011/v1/chat/completions
    shadow_min_queries: 50    # minimum before promotion is considered ready
Option B β€” activate at runtime, no restart needed
# Activate shadow mode
curl -X POST http://localhost:8000/deploy/shadow/swe \
  -H "Content-Type: application/json" \
  -d '{"green_endpoint": "http://localhost:9011/v1/chat/completions",
       "min_queries": 50, "threshold": 0.025}'

# Check progress β€” poll as frequently as you like, no side effects
curl http://localhost:8000/deploy/shadow/swe
# β†’ {"active": true, "n_queries": 23, "min_queries": 50,
#    "progress": "23/50 shadow queries", "mean_delta": 0.031,
#    "blue_mean_u": 0.741, "green_mean_u": 0.772, "ready_to_promote": false}

# Deactivate (optionally clear accumulated scores)
curl -X DELETE "http://localhost:8000/deploy/shadow/swe?clear_scores=true"

Once ready_to_promote: true, call POST /deploy/green. The router automatically uses accumulated shadow scores (real traffic data) instead of running a synthetic eval from scratch.

18.15 Regression gate β€” block promotion on quality drops

The regression gate runs an eval dataset against both BLUE and GREEN before allowing promotion. If GREEN's pass rate or U score drops relative to BLUE, the promotion is blocked.

aua_config.yaml
blue_green:
  swe:
    delta: 0.025
    regression_dataset: evals/coding_smoke.yaml   # eval YAML in aua test fixture format
    regression_block: true     # true = block promotion; false = log warning only
    shadow_endpoint: http://localhost:9011/v1/chat/completions
    shadow_min_queries: 50
Triggering promotion β€” the response tells you what happened
curl -X POST http://localhost:8000/deploy/green \
  -d '{"specialist": "swe", "green_model": "./models/swe_v2",
       "green_endpoint": "http://localhost:9011/v1/chat/completions"}'

# Blocked by regression:
# β†’ {"promoted": false,
#    "message": "PROMOTION BLOCKED β€” regression detected on evals/coding_smoke.yaml.",
#    "regression": {"regressed": true, "delta_pass_rate": -0.15, "blocked": true}}

# Override the dataset per request (useful for testing):
curl -X POST http://localhost:8000/deploy/green \
  -d '{"specialist": "swe", "green_model": "...",
       "regression_dataset": "evals/quick_check.yaml"}'

18.16 Arbiter empirical check β€” external ground truth

Arbiter Stage 4 cross-checks claims against external sources before issuing a verdict. No configuration is required β€” it activates automatically for the relevant domains.

DomainSourceWhat it checks
mathematics, structural_engineeringSymPyExtracts complexity claims (O(nΒ²), O(log n) …), tests algebraic equivalence and asymptotic ordering
software_engineering, stem_researcharXiv Atom APIKeyword search returns top-3 abstracts; scores by keyword overlap with the response
surgery, aviation, medicinePubMed E-utilitiesesearch β†’ efetch pipeline; same keyword-overlap scoring
law, art, creative_writing, generalNoneReturns "not converged β€” no external source for this domain"

No API keys are required for basic arXiv and PubMed use. Set NCBI_API_KEY in your environment to increase PubMed rate limits above 3 requests/second. The empirical check never blocks β€” a flaky external API returns "not converged", not an error that breaks the query.

Using the empirical module directly from a plugin
from aua.empirical import empirical_check

class EmpiricalContradictionDetector:
    """Cross-check every response against external literature."""

    def check(self, problem: str, solution: str, claimed_complexity=None) -> dict:
        result = empirical_check(problem, "software_engineering", solution, "")
        if result.converged and result.winner not in ("neither", None, "both"):
            return {
                "contradictions": [{
                    "type": "empirical",
                    "description": result.explanation,
                    "severity": 0.6,
                }],
                "confidence_penalty": 0.15,
                "is_clean": False,
            }
        return {"contradictions": [], "confidence_penalty": 0.0, "is_clean": True}

18.17 Hardware deployment tiers

AUA ships six hardware tier templates. Pass --tier to aua serve to use a preconfigured specialist layout for your hardware β€” no need to write a full aua_config.yaml from scratch.

Tier flagHardwareBackendSpecialistsBest for
gaming-pcRTX 3080/4080 (10–16 GB VRAM)Ollamaqwen2.5-coder:7b + qwen2.5:7bLocal dev, offline, Windows/Linux
macbookApple Silicon M-series (16–64 GB)Ollamaqwen2.5-coder:7b + qwen2.5:7bLocal dev, macOS
single-4090RTX 4090 (24 GB VRAM)vLLM + AWQQwen2.5-7B-AWQ + Qwen2.5-Math-7B-AWQHigh-quality single-GPU serving
quad-40904Γ— RTX 4090 (96 GB total)vLLM + AWQ14B specialists, TP=2Multi-specialist parallel serving
a100-cluster8Γ— A100 80 GBvLLM bf16Llama-3-70B + Qwen-72B, TP=4Production, 70B class models
h100-cluster8Γ— H100 SXM5 NVLinkvLLM bf16Llama-3.1-70B + Qwen2.5-72B, TP=4Frontier models, highest throughput
Using a tier
aua serve --tier gaming-pc      # Ollama, laptop-friendly 7B models
aua serve --tier h100-cluster   # 8Γ—H100 NVLink, 70B+ bf16 specialists

# Tier aliases (shorter to type):
aua serve --tier gaming         # β†’ gaming-pc
aua serve --tier h100           # β†’ h100-cluster
aua serve --tier a100           # β†’ a100-cluster

Tensor parallelism for models that don't fit on one GPU

When a model requires more VRAM than a single GPU has, vLLM can split it across multiple GPUs using tensor parallelism β€” each GPU holds a subset of the weight matrices and they synchronise via NCCL all-reduce over NVLink (fast) or PCIe (slower). Configure it per specialist in your aua_config.yaml:

aua_config.yaml β€” 4-GPU tensor parallel specialist
specialists:
  - name: math
    model: Qwen/Qwen2.5-72B-Instruct   # 72B bf16 β‰ˆ 144 GB β€” needs 4Γ— A100 or 2Γ— H100
    port: 9002
    field: mathematics
    gpu_ids: [0, 1, 2, 3]             # expose these 4 GPUs to vLLM
    tensor_parallel_size: 4            # must be a power of 2: 1, 2, 4, or 8
    pipeline_parallel_size: 1          # for single-node, always 1
    gpu_memory_utilization: 0.90
    max_model_len: 8192

Constraints: tensor_parallel_size must be a power of 2. len(gpu_ids) must equal tensor_parallel_size. On startup, the banner prints GPU 0,1,2,3 TPΓ—4 to confirm the layout.

NVLink (H100 SXM5, A100 SXM) is 600 GB/s β€” tensor parallelism is fast here. PCIe (consumer GPUs) is 64 GB/s β€” use tensor parallelism only when necessary, prefer pipeline parallelism or smaller models on consumer hardware.

18.18 Retry with exponential backoff (#39)

When a specialist endpoint returns a transient error (network hiccup, 503, 429 rate-limit), AUA automatically retries the HTTP call before raising an error to the user. Retry is configured per router in aua_config.yaml:

aua_config.yaml β€” retry configuration
router:
  retry:
    max_retries: 3          # 0 = disabled; default 3
    base_delay_ms: 200      # first retry delay in milliseconds
    max_delay_ms: 5000      # cap on computed delay
    jitter: true            # Β±25% random jitter (prevents thundering-herd)
    retryable_status_codes: [429, 502, 503, 504]  # default

Delay schedule: attempt 1 is immediate. Attempt 2 waits base_delay_ms. Each subsequent attempt doubles the delay, capped at max_delay_ms. Jitter adds Β±25% randomness so that multiple concurrent requests don't all retry at the same instant after a hiccup.

What gets retried: ConnectError, ConnectTimeout, ReadTimeout, and HTTP 429/502/503/504 β€” all transient availability failures.

What is NOT retried: HTTP 400, 401, 403, 404, 422, 500. These indicate bugs in the request or the specialist β€” retrying won't help and would hide the problem.

Retry is transport-level β€” it wraps the HTTP call to the specialist endpoint. It is separate from the assertion-level retry in aua/policy.py, which re-calls a specialist when a policy assertion fails. Both can be active simultaneously.

18.19 Circuit breaker and degraded-mode failover (#37, #38)

Even with retry, a specialist that is completely down will still cause every query routed to it to time out. The circuit breaker solves this: after failure_threshold consecutive availability failures within failure_window_s seconds, the circuit opens β€” subsequent queries skip that specialist immediately and route to the arbiter or a remaining healthy specialist. The user gets a response; it's just produced with reduced specialist availability.

Configuration

aua_config.yaml β€” circuit breaker
router:
  circuit_breaker:
    enabled: true                # false = disable entirely (default: true)
    failure_threshold: 5         # failures within window before opening
    failure_window_s: 60.0       # sliding window for counting failures
    recovery_timeout_s: 30.0     # seconds in OPEN before probing
    success_threshold: 2         # consecutive successes in probe β†’ CLOSED

State machine

StateBehaviourTransition
CLOSEDNormal — all calls pass through→ OPEN when failure_threshold failures occur within failure_window_s
OPENCircuit tripped β€” calls rejected immediately with 503β†’ HALF_OPEN after recovery_timeout_s seconds
HALF_OPENProbe mode β€” one call allowed throughSuccess β†’ CLOSED (after success_threshold successes). Failure β†’ OPEN

What trips the circuit: ConnectError, ConnectTimeout, ReadTimeout, HTTP 429/502/503/504.

What does NOT trip it: HTTP 400/401/403/404/422/500. These are caller or specialist bugs, not endpoint availability issues.

Degraded-mode response (#38)

When one or more specialists have open circuits during a query, the router routes to the arbiter or remaining healthy specialists and stamps the response with two fields:

Response when a circuit is open
{
  "response":             "...",
  "u_score":              0.71,
  "routing_mode":         "arbiter",
  "degraded_mode":        true,          # one or more circuits were open
  "degraded_specialists": ["mathematics"] # which domains were bypassed
}

Your client can check degraded_mode to decide whether to display a "partial availability" notice, log an alert, or fall back to a cached response. When degraded_mode: false (the default), degraded_specialists is null.

Inspecting circuit breaker state

bash
# GET /health/ready β€” includes circuit_breakers array
curl -s localhost:8000/health/ready | python3 -m json.tool

{
  "specialists": {"swe": "ok", "math": "unreachable"},
  "circuit_breakers": [
    {"specialist": "swe",  "state": "closed",   "failure_count": 0},
    {"specialist": "math", "state": "open",     "failure_count": 5,
     "open_since": 1718900000.0, "recovery_timeout_s": 30.0}
  ]
}

Typical failure arc. Specialist goes down β†’ first 3 requests trigger retry (3Γ— specialist_timeout delay each) β†’ 5th failure trips the circuit β†’ all subsequent requests skip instantly to arbiter with degraded_mode: true β†’ after recovery_timeout_s, one probe fires β†’ specialist back up β†’ circuit closes β†’ normal routing resumes.

How-to 18 done. That covers persistence, search, the full v1.2 operations toolkit, retry, and circuit breaker failover. The reference sections below list every CLI command and REST endpoint.


Reference

Reference The complete aua_config.yaml

Every key the loader accepts, with defaults. Validation is strict at every level β€” an unknown key anywhere fails at startup naming the bad key and listing the valid ones, so this reference and aua config validate can never drift apart. (aua config expand prints your resolved config with all defaults filled in.)

aua_config.yaml β€” every accepted key, annotated
aua:
  version: "0.5"                    # written by aua init β€” don't hand-edit
  mode: local                       # local | kubernetes | cluster
  backend: ollama                   # default backend: ollama | vllm (per-specialist override below)
  project_name: my-aua-project      # optional

specialists:                        # one entry per model you route to
  - name: swe                       # unique; used in logs, metrics, blue_green keys
    model: qwen2.5-coder:7b         # registry alias, raw Ollama tag, or HF ID (vLLM)
    field: software_engineering     # one of the 11 built-in fields (Β§2.5)
    port: 11434                     # Ollama: shared server port; vLLM: per-model port
    host: 127.0.0.1                 # where the model server lives
    scheme: http                    # http | https
    backend: ollama                 # override the global backend per specialist
    gpu: 0                          # GPU index (vLLM placement)
    gpu_memory_utilization: 0.34    # vLLM only
    max_model_len: 2048             # vLLM only
    quantization: awq               # vLLM only
    enforce_eager: true             # vLLM only
    endpoint_override: null         # full URL β€” bypasses host/port/scheme assembly
    models_url_override: null       # override the /v1/models discovery URL

arbiter:                            # same keys as a specialist, minus name/field/gpu... :
  model: qwen2.5:3b                 # model, port, host, scheme, backend, endpoint_override,
  port: 11434                       # models_url_override, gpu, gpu_memory_utilization,
                                    # max_model_len, quantization, enforce_eager

router:
  port: 8000
  host: 127.0.0.1
  single_domain_threshold: 0.75     # top prob β‰₯ this β†’ single routing
  fanout_threshold: 0.30            # 2+ domains β‰₯ this β†’ fanout (race + arbiter)
  specialist_timeout: 60.0          # seconds per specialist call
  tau: 1.0                          # routing softmax temperature (1.0 = off)
  arbitration_mode: pairwise        # pairwise | vcg | llm
  retry:
    max_retries: 3                  # 0 to disable; default 3
    base_delay_ms: 200
    max_delay_ms: 5000
    jitter: true
    retryable_status_codes: [429, 502, 503, 504]
  circuit_breaker:
    enabled: true
    failure_threshold: 5
    failure_window_s: 60.0
    recovery_timeout_s: 30.0
    success_threshold: 2
  tau: 1.0                          # routing softmax temperature (1.0 = off)
  arbitration_mode: pairwise        # pairwise | vcg | llm
  retry:
    max_retries: 3                  # 0 to disable; default 3
    base_delay_ms: 200
    max_delay_ms: 5000
    jitter: true
    retryable_status_codes: [429, 502, 503, 504]
  circuit_breaker:
    enabled: true
    failure_threshold: 5
    failure_window_s: 60.0
    recovery_timeout_s: 30.0
    success_threshold: 2
  arbitration_mode: verdict         # verdict | vcg (welfare-maximizing winner selection)
  cors_origins: ["http://localhost:3001"]

blue_green:                         # promotion criteria, keyed by specialist name
  swe:
    delta: 0.025                    # GREEN must beat BLUE's mean U by this much
    T_min: 10                      # over at least this many queries
    tau: 0.20                       # exploration fraction routed to GREEN during eval

logging:
  level: INFO                       # DEBUG | INFO | WARNING | ERROR
  format: "%(asctime)s [%(levelname)s] %(name)s: %(message)s"

secrets:                            # Β§12.3 β€” config holds NAMES, providers resolve values
  provider: env                     # env | vault | aws | gcp
  url: http://127.0.0.1:8200        # vault only
  token_env: VAULT_TOKEN            # vault only β€” env var holding the token
  region: us-east-1                 # aws only

state:
  backend: sqlite                   # sqlite | files
  path: .aua/state/aua.db

security:
  cors_origins: null                # overrides router.cors_origins when set
  mtls: {}                          # Β§12.2 β€” {enabled, cert_dir, auto_generate}
  encryption: {}                    # Β§12.4 β€” {enabled, key_secret}

plugins:                            # How-to 13 β€” kind β†’ spec; config splats as kwargs
  field_classifier:                 # kinds: field_classifier, utility_scorer,
    import_path: plugins.mine:KeywordClassifier   # arbiter_policy, promotion_policy,
    config: {confidence_boost: 1.1} # correction_store, model_backend, state_store

hooks:                              # How-to 14 β€” a LIST of registrations
  - hook_point: on_correction       # 11 valid points β€” see How-to 14
    import_path: plugins.mine:SlackNotificationHook
    config: {webhook_url: "https://hooks.slack.com/..."}
    fail_closed: false              # true β†’ hook failure aborts the request

middleware:                         # How-to 14 β€” ordered pipeline
  - import_path: plugins.mine:PIIRedactionMiddleware
    config: {}
  - plugins.mine:LogMiddleware      # bare string OK when there's no config

Wired from YAML at startup: field_classifier, utility_scorer, correction_store, all hooks, all middleware. The other plugin kinds (arbiter_policy, promotion_policy, model_backend, state_store) load and contract-validate from config β€” typos still fail fast β€” and attach at the programmatic points shown in How-to 13.

Reference Troubleshooting

The errors every newcomer hits, in the order they usually hit them.

SymptomCause & fix
aua: command not found right after pip installThe install went to a Python whose bin/ isn't on PATH β€” classic with pyenv. pyenv local 3.11.x && pip install adaptive-utility-agent, or python3 -m aua.cli --help to bypass PATH entirely.
Unknown key(s) in 'top-level': ['specialist'] (or any key)Strict validation caught a typo. The error names the bad key and lists every valid one at that level β€” compare against the config reference above. This fires at aua config validate / startup, never mid-request.
Specialist 'x' references unknown field 'devops'field: must be one of the 11 built-ins (Β§2.5). Map your domain to the closest risk profile and, if you want custom routing language, add a FieldClassifierPlugin (How-to 13).
aua doctor: Ollama not reachableollama serve & first, then re-run. If it's running on a non-default port, set port: on each specialist (or endpoint_override).
Query returns 503 from the routerThe specialist's model server is reachable but the model isn't available β€” usually the tag isn't pulled. ollama list must show the exact string in model:. ollama pull <tag> fixes it.
Queries return 503 instantly with "Circuit open for X"The circuit breaker has opened for specialist X after repeated failures. Check GET /health/ready β†’ circuit_breakers for state and open_since. The circuit will probe automatically after recovery_timeout_s (default 30s). Tune with circuit_breaker.failure_threshold. Disable with circuit_breaker.enabled: false.
Response has degraded_mode: trueOne or more specialists had open circuits β€” degraded_specialists lists which ones. Router served via arbiter or remaining healthy specialists. Check GET /health/ready β†’ circuit_breakers and specialist logs.
Queries return 503 instantly with "Circuit open for X"The circuit breaker has opened for specialist X after repeated failures. Check GET /health/ready β†’ circuit_breakers for state and open_since. The circuit will probe automatically after recovery_timeout_s (default 30s). To tune sensitivity: raise circuit_breaker.failure_threshold or lower failure_window_s. To disable: circuit_breaker.enabled: false.
Response has degraded_mode: trueOne or more specialists had open circuits when the query arrived β€” degraded_specialists lists which ones. The router served the request via the arbiter or remaining healthy specialists. Check /health/ready and specialist logs. Normal routing resumes once the circuit closes.
Query returns 504 / times outThe model is loading (first call after a pull can take a minute on CPU) or genuinely too slow. Raise router.specialist_timeout; warm the model with one direct ollama run first.
Address already in use on aua serveSomething owns :8000 (often a previous aua serve). lsof -i :8000 then kill it, or change router.port.
PluginLoadError: Cannot import module 'plugins.mine'The router adds the config file's directory to sys.path (Django-style). Your plugins/ package must sit next to aua_config.yaml and contain an __init__.py. Pre-flight with aua extensions test --kind ... --import-path ... from the project dir.
Failed to instantiate MyPlugin: unexpected keyword argumentThe YAML config: mapping is splatted as constructor kwargs β€” cls(**config). Every key in your config block must be a parameter of __init__ (How-to 13's constructor contract).
Plugin registered but nothing changedAsk the server, not the CLI: curl -s localhost:8000/extensions shows what the running process loaded (the CLI's extensions list runs in a fresh process and can't see it). Also check startup logs for "Plugin loaded from config".
Every query routes to general / the arbiterThe classifier isn't confident. Check domain_distribution in the response; lower router.single_domain_threshold, use force_domain to verify the specialist itself works, or write a KeywordClassifier for your traffic (How-to 13).
Hooks registered but never fireHook config is a list of {hook_point, import_path} entries (How-to 14) β€” the older nested-mapping shape is rejected by validation. Verify with GET /extensions β†’ hooks.
sqlite3.OperationalError: database is lockedTwo router processes share one state.path. Run one router per state DB, or point the second at its own path.

The 30-second triage, in order: aua config validate (config truth) β†’ aua doctor (environment truth) β†’ curl localhost:8000/health/ready (specialist reachability) β†’ curl localhost:8000/extensions (extension truth) β†’ logs with logging.level: DEBUG.

Reference CLI command groups

aua init

bash
aua init <name> [--preset <name>] [--tier <name>] [--force]

aua serve

bash
aua serve [--dry-run] [--tier <name>] [--reuse-running] [--with-ui] [--config <path>]

aua doctor

bash
aua doctor [--strict] [--json] [--check-certs]

aua config

bash
aua config validate [--config <path>]
aua config expand [--json]
aua config expand
aua config reload

aua models / fields / presets / defaults

bash
aua models list | inspect <name>
aua fields list | inspect <name>
aua presets list | inspect <name>
aua defaults show [<category>]

Pin a model revision:

aua models pin Qwen/Qwen2.5-7B-Instruct                 # list available revisions
aua models pin Qwen/Qwen2.5-7B-Instruct --revision v0.3 # show config snippet to paste
aua models pin swe --mlflow-uri models:/swe              # list MLflow registered versions
aua models pin swe --mlflow-uri models:/swe --revision 3 # get MLflow URI snippet

aua token

bash
aua token create --scope <scope> [--expires <duration>] [--name <label>]
aua token list [--json]
aua token revoke <token-id>
aua token inspect <token-id>

aua certs

bash
aua certs generate [--ca-cert <path>] [--ca-key <path>]
aua certs inspect

aua test

Run the built-in integration test suite against a live router.

aua test                              # smoke suite (default) against localhost:8000
aua test --suite full                 # full regression suite
aua test --suite routing              # domain classification correctness
aua test --url http://prod:8000       # target a different router
aua test --dataset my_cases.yaml      # custom fixture file
aua test --case swe_binary_search     # run one specific case (repeatable)
aua test --json --output report.json  # machine-readable JSON report
aua test --no-liveness                # skip GET /health/live pre-flight

aua loadtest

Fire concurrent requests and report p50/p95/p99 latency, throughput, and U score.

aua loadtest                              # 10 workers, 30 s, smoke query mix
aua loadtest -c 20 -d 60                 # 20 workers, 60 s
aua loadtest --suite full -c 5           # full fixture suite as query mix
aua loadtest --dataset my_queries.yaml   # custom query mix
aua loadtest --ramp 10 -c 20 -d 120     # ramp from 0β†’20 workers over 10 s
aua loadtest --think-ms 200 -c 10       # 200 ms pause between each request per worker
aua loadtest --json --output bench.json
aua loadtest --url http://prod:8000

aua eval

bash
aua eval run --dataset <path> [--config <path>] [--json]
aua eval report <results.json>
aua eval compare --baseline <blue.json> --candidate <green.json>

aua corrections / dpo

bash
aua corrections export --format jsonl [--redact]
aua dpo export --format preference-pairs [--redact]

aua extensions

bash
aua extensions list
aua extensions inspect <name>
aua extensions test --kind <type> --import-path <path>
# Validate: aua extensions test --kind  --import-path 

aua status / rollback

bash
aua status [--once] [--json] [--url <url>] [--refresh <seconds>]
aua rollback [--specialist <name>] [--all] [--yes] [--no-restart]

aua guard

bash
aua guard list [--json]
aua guard test --import-path <module:function> [--output <text>] [--domain <name>]

aua policy

bash
aua policy list
aua policy validate <path.yaml>
aua policy apply <path.yaml> [--dry-run]

aua calibrate

bash
aua calibrate --layer <1|2|3> [--force] [--dry-run] [--config <path>]
              [--dataset <path>]          # layer 1 only
              [--output <path.jsonl>]     # layer 3 only
              [--min-pairs <N>]           # layer 3 only (default: 10)

aua logs

bash
aua logs sessions [--limit <N>] [--domain <name>] [--json]
aua logs assertions [--filter <key=value>] [--assertion <name>] [--tail <N>] [--json]
aua logs export [--output <path>] [--table <table>] [--limit <N>]

aua metrics

bash
aua metrics --compare <window>   # 7d, 30d, or YYYY-MM-DD:YYYY-MM-DD
            [--metric <name>]   # u_score | assertion_fail_rate | retry_rate
            [--json]

Reference REST API endpoints

All endpoints are on http://localhost:8000. Auth is disabled by default β€” include Authorization: Bearer $AUA_TOKEN only when auth is enabled in config. Interactive docs: http://localhost:8000/docs (Swagger UI).

MethodEndpointScopeDescription
POST/queryaua:queryRoute a query. Returns response + full metadata.
POST/query/streamaua:streamStreaming SSE query. Emits start β†’ chunks β†’ done.
POST/query/batchaua:batchRoute multiple queries concurrently.
GET/health/liveβ€”Liveness probe. Returns 200 if process is alive.
GET/health/readyβ€”Readiness probe. Returns 200 when all specialists reachable.
GET/health/startupβ€”Startup probe. Returns 200 after first successful readiness check.
GET/statusaua:statusFull telemetry β€” U scores, latency percentiles, routing stats, memory.
GET/versionβ€”Framework version and build info.
GET/configaua:config:readRunning config (secrets redacted).
POST/config/reloadaua:config:writeHot-reload config without restart.
GET/correctionsaua:corrections:readList all stored corrections.
POST/correctionsaua:corrections:writeInject a verified fact into the corrections store.
POST/deploy/greenaua:deployRegister a GREEN candidate and evaluate against BLUE.
POST/deploy/rollbackaua:rollbackRoll back specialist to previous BLUE model.
GET/metricsβ€”Prometheus scrape endpoint (18 metrics).
GET/metrics/costaua:statusGPU hours and USD cost per specialist.
POST/sessionsaua:queryCreate a new chat session.
GET/sessionsaua:queryList all sessions (most recent first).
GET/sessions/{id}aua:queryGet session metadata.
DELETE/sessions/{id}aua:queryDelete a session and its messages.
POST/sessions/{id}/messagesaua:querySend a message in a session context.
GET/extensionsβ€”List registered plugins and hooks.
v1.1 β€” persistence, search & production ops (How-to 18)
POST/conversationsaua:queryCreate a conversation (optional project_id).
GET/conversationsaua:queryList conversations; ?project_id= filters.
PATCH/conversations/{id}/titleaua:queryRename a conversation.
GET/conversations/{id}/messagesaua:queryPaginated messages (before/after cursors).
POST/conversations/{id}/messagesaua:queryAppend a message; keyword-indexed in the background.
POST/projectsaua:queryCreate a project for grouping conversations.
GET/projectsaua:queryList projects.
GET/searchaua:queryMessage-level keyword search (AND semantics, prefix match).
GET/context/backup/coverageaua:statusStale/missing context backups per specialist.
POST/context/backup/run-coverage-jobaua:queryTrigger an immediate backup coverage sweep.
POST/corrections/confirm-implicitaua:corrections:writeAccept/Reject a detected implicit correction.
PATCH/corrections/{id}aua:corrections:writeEdit a stored correction (logs an edited event).
DELETE/corrections/{id}aua:corrections:writeSoft-delete (scope='superseded'; row kept for audit).
GET/corrections/evidenceaua:corrections:readPer-correction event history and application counts.
GET/analyticsaua:statusSpecialist stats, confidence distribution, domain + welfare summary.
GET/reliabilityaua:statusPer-specialist win rate + last-20 welfare trajectory.
GET/usageaua:statusQuery counts and estimated cost per specialist.
GET/pricingaua:statusPer-specialist token pricing from the live model registry.
GET/version/checkβ€”Compare running version against the latest GitHub release.
POST/update/skipaua:config:writePersist a skipped update version (banner stays hidden).
GET/update/skippedβ€”Return the skipped version, if any.
POST/bug-reportβ€”Submit a structured bug report (200 even without a PAT).
GET/local/modelsaua:statusList registered local (Ollama-class) models.
POST/local/modelsaua:config:writeRegister/upsert a local model.
PATCH/local/specialist/{id}aua:config:writeTag a local model as a domain specialist.
GET/local/settingsaua:statusRead local model settings.
POST/local/settingsaua:config:writeWrite local model settings.
GET/domain-treeaua:statusFull domain ontology with node stats and candidate queue.

#15: every response also carries X-Session-ID, X-Trace-ID, and X-Request-ID headers β€” client-supplied values honored, UUIDs generated otherwise.

All POST bodies are JSON. All responses are JSON unless noted. Auth header: Authorization: Bearer $AUA_TOKEN.

1. Core query

POST /query

The primary endpoint. Routes the query through the specialist graph, scores the response, runs assertions (if a policy is active), and returns everything.

Request body
{
  "query":                "string",   // REQUIRED β€” the query text
  "session_id":           "string",   // optional β€” scopes cross-session corrections; a UUID is generated and echoed back if omitted (#15)
  "conversation_history": [           // optional β€” prior turns for context
    {"role": "user",      "content": "previous question"},
    {"role": "assistant", "content": "previous answer"}
  ],
  "force_domain":         "string"    // optional β€” bypass classifier, e.g. "software_engineering"
}
bash β€” minimal
curl -s -X POST http://localhost:8000/query \
  -H "Content-Type: application/json" \
  -d '{"query": "Write binary search in Python. State time complexity."}'
bash β€” with session and history
curl -s -X POST http://localhost:8000/query \
  -H "Content-Type: application/json" \
  -d '{
    "query": "Now implement it iteratively instead.",
    "session_id": "user_42",
    "conversation_history": [
      {"role": "user",      "content": "Write binary search in Python."},
      {"role": "assistant", "content": "def binary_search(arr, target):..."}
    ]
  }'
Response
{
  "query":                  "Write binary search in Python.",
  "response":               "def binary_search(arr, target):\n    ...",  // the answer
  "routing_mode":           "single",         // "single" | "fanout" | "arbiter"
  "primary_domain":         "software_engineering",
  "domain_distribution":    {"software_engineering": 0.91, "mathematics": 0.09},
  "u_score":                0.731,            // U = w_eΒ·E + w_cΒ·C + w_kΒ·K  (higher = better)
  "confidence":             0.823,            // Kalman-filtered consistency score
  "contradictions_detected": 0,              // > 0 means a correction was stored
  "dpo_pairs_generated":    0,               // DPO training pairs added this query
  "latency_ms":             312.4,
  "specialist_responses":   null             // populated only in fanout mode
}
Python
import requests

r = requests.post("http://localhost:8000/query", json={
    "query": "Write binary search in Python.",
    "session_id": "my-session",
})
data = r.json()

print(data["response"])              # the answer β€” use this in your app
print(data["primary_domain"])        # which specialist answered
print(data["u_score"])               # quality score β€” log this over time
print(data["routing_mode"])          # "single" | "fanout" | "arbiter"

# In fanout mode, inspect per-specialist responses:
if data["routing_mode"] == "fanout":
    for sr in data["specialist_responses"]:
        print(sr["domain"], sr["response"][:100])

POST /query/stream

Server-Sent Events (SSE). Same request body as /query. The connection stays open and emits three event types: start (routing decision), chunk (one token), done (final metadata). Useful for showing a live typing effect in a UI.

bash β€” full SSE example
curl -s -X POST http://localhost:8000/query/stream \
  -H "Content-Type: application/json" \
  -H "Accept: text/event-stream" \
  --no-buffer \
  -d '{"query": "Write quicksort in Python.", "session_id": "s1"}'

# Output β€” one event per line pair (data: {json}\n\n):

data: {"type":"start","routing_mode":"single","primary_domain":"software_engineering","domain_distribution":{"software_engineering":0.93}}

data: {"type":"chunk","text":"def","index":0}

data: {"type":"chunk","text":" quicksort","index":1}

data: {"type":"chunk","text":"(arr):","index":2}

# ... more chunks ...

data: {"type":"done","full_response":"def quicksort(arr):\n    if len(arr) <= 1:\n        return arr\n    pivot = arr[len(arr) // 2]\n    left = [x for x in arr if x < pivot]\n    middle = [x for x in arr if x == pivot]\n    right = [x for x in arr if x > pivot]\n    return quicksort(left) + middle + quicksort(right)","routing_mode":"single","primary_domain":"software_engineering","domain_distribution":{"software_engineering":0.93},"u_score":0.748,"confidence":0.831,"contradictions_detected":0,"dpo_pairs_generated":0,"latency_ms":1821.3}
Python β€” consuming SSE with sseclient
import requests
import json

# pip install sseclient-py
from sseclient import SSEClient

response = requests.post(
    "http://localhost:8000/query/stream",
    json={"query": "Write quicksort in Python.", "session_id": "s1"},
    stream=True,
    headers={"Accept": "text/event-stream"},
)

full_text = ""
for event in SSEClient(response).events():
    data = json.loads(event.data)

    if data["type"] == "start":
        print(f"Routing to: {data['primary_domain']}")

    elif data["type"] == "chunk":
        print(data["text"], end="", flush=True)  # live typing effect
        full_text += data["text"]

    elif data["type"] == "done":
        print()  # newline after last token
        print(f"\nU score: {data['u_score']}")
        print(f"Latency: {data['latency_ms']:.0f}ms")
        # data["full_response"] == full_text (same content)
        break
Python β€” consuming SSE without a library
import requests, json

r = requests.post(
    "http://localhost:8000/query/stream",
    json={"query": "Write quicksort in Python."},
    stream=True,
)
for line in r.iter_lines():
    if line and line.startswith(b"data: "):
        data = json.loads(line[6:])
        if data["type"] == "chunk":
            print(data["text"], end="", flush=True)
        elif data["type"] == "done":
            print()
            break

POST /query/batch

Route multiple queries concurrently. Each query is routed independently. Results are returned in the same order as the input.

Request body
{
  "queries":      ["string", ...],  // REQUIRED β€” 1 to 100 queries
  "session_id":   "string",         // optional β€” shared session for all queries
  "max_parallel": 4                 // optional β€” concurrent specialist calls (default 4, max 32)
}
bash
curl -s -X POST http://localhost:8000/query/batch \
  -H "Content-Type: application/json" \
  -d '{
    "queries": [
      "Write binary search in Python.",
      "What is the derivative of x squared?"
    ],
    "max_parallel": 4
  }'
Response
{
  "results":         [...],   // list of RouterResponse objects β€” same shape as /query
  "total_latency_ms": 891.2,  // wall-clock for the whole batch
  "n_queries":        2,
  "n_errors":         0       // failed queries are excluded from results
}
Python
r = requests.post("http://localhost:8000/query/batch", json={
    "queries": ["Write binary search.", "What is O(n log n)?"],
})
data = r.json()

for i, result in enumerate(data["results"]):
    print(f"Q{i}: domain={result['primary_domain']}  U={result['u_score']:.3f}")
    print(result["response"][:100])

2. Health & status

GET /health/live   GET /health/ready   GET /health/startup

Kubernetes-style probes. Use /health/live for liveness (process alive), /health/ready for readiness (all specialists reachable), /health/startup for startup (ready at least once).

bash
curl http://localhost:8000/health/live    # β†’ 200 {"status":"alive","uptime_s":142.3}
curl http://localhost:8000/health/ready   # β†’ 200 or 503
curl http://localhost:8000/health/startup # β†’ 200 or 503
GET /health/ready response
{
  "status":      "ready",            // "ready" or "not_ready"
  "specialists": {
    "swe":     "ok",                 // "ok" | "unreachable" | "http_503"
    "math":    "ok",
    "arbiter": "ok"
  }
}
Python β€” wait for ready before sending queries
import time, requests

def wait_for_ready(timeout=60):
    for _ in range(timeout):
        try:
            r = requests.get("http://localhost:8000/health/ready", timeout=2)
            if r.status_code == 200 and r.json()["status"] == "ready":
                return True
        except requests.ConnectionError:
            pass
        time.sleep(1)
    raise TimeoutError("Router not ready after 60s")

wait_for_ready()
# now safe to send queries

GET /status

Full telemetry snapshot. This is the richest endpoint β€” use it to monitor U score trends, latency percentiles, specialist health, and routing distribution over time.

bash
curl http://localhost:8000/status | python3 -m json.tool
Response
{
  "version":   "1.0.1",
  "backend":   "ollama",
  "uptime_s":  3621.4,
  "health": {
    "swe":   "ok",
    "math":  "ok",
    "arbiter": "ok"
  },
  "utility": {                            // per-domain U score history
    "software_engineering": {
      "mean_u":    0.7312,               // track this over time β€” should trend up
      "last_u":    0.7480,
      "queries":   142,
      "confidence": 0.8231
    },
    "mathematics": {
      "mean_u":    0.6891,
      "last_u":    0.7012,
      "queries":   47,
      "confidence": 0.7910
    }
  },
  "latency": {                           // latency percentiles per component
    "router": {"p50_ms": 312.4, "p95_ms": 891.2, "last_ms": 298.1, "samples": 189},
    "swe":    {"p50_ms": 280.1, "p95_ms": 820.4, "last_ms": 271.3, "samples": 142}
  },
  "routing": {
    "total_queries":      189,
    "by_mode": {"single": 154, "fanout": 28, "arbiter": 7},
    "contradictions":     12,
    "dpo_pairs":          8
  },
  "store": {
    "corrections_total":  23,
    "sessions_total":     41
  },
  "memory": {
    "system": "CPU / Ollama"             // or "gpu0: 8192 / 24576 MiB" on GPU
  }
}
Python β€” monitor U score and alert on degradation
import requests

status = requests.get("http://localhost:8000/status").json()

for domain, stats in status["utility"].items():
    mean_u = stats["mean_u"]
    if mean_u and mean_u < 0.50:
        print(f"⚠ Low U score on {domain}: {mean_u:.3f} β€” check specialist")
    else:
        print(f"βœ“ {domain}: mean_u={mean_u:.3f}  queries={stats['queries']}")

# Check latency
p95 = status["latency"]["router"]["p95_ms"]
if p95 and p95 > 5000:
    print(f"⚠ High p95 latency: {p95:.0f}ms")

# Routing distribution
print(status["routing"]["by_mode"])  # {"single": 154, "fanout": 28, "arbiter": 7}

GET /version

bash + response
curl http://localhost:8000/version
# {"version": "1.0.1", "framework": "AUA Framework", "python": "3.11.10"}

3. Configuration

GET /config

Returns the running configuration. Secrets are redacted β€” you will never see API keys or token values here.

bash + response shape
curl http://localhost:8000/config

{
  "version":     "0.5",
  "mode":        "local",
  "backend":     "ollama",
  "specialists": [
    {"name": "swe",  "model": "qwen2.5-coder:7b",  "port": 11434, "field": "software_engineering", "endpoint": "http://localhost:11434"},
    {"name": "math", "model": "qwen2.5:7b",         "port": 11434, "field": "mathematics",          "endpoint": "http://localhost:11434"}
  ],
  "arbiter":  {"model": "qwen2.5:3b", "port": 11434, "endpoint": "http://localhost:11434"},
  "router":   {"port": 8000, "single_domain_threshold": 0.75, "fanout_threshold": 0.30, "specialist_timeout": 30.0}
}
Python β€” check which model is active
config = requests.get("http://localhost:8000/config").json()

for spec in config["specialists"]:
    print(f"{spec['name']}: {spec['model']} ({spec['field']})")

print(f"Single threshold: {config['router']['single_domain_threshold']}")

POST /config/reload

Hot-reload the config file without restarting. Only hot-reloadable settings apply (routing thresholds, utility weights, CORS). Model/port changes require restart.

bash + response
curl -X POST http://localhost:8000/config/reload
# {"reloaded": true, "message": "Config reloaded from aua_config.yaml"}

4. Corrections

POST /corrections

Inject a verified fact into the corrections store. It will be included in every future query on that subject in that domain β€” permanently (Class A) by default.

Request body
{
  "subject":    "string",   // REQUIRED β€” short label e.g. "bubble_sort_complexity"
  "domain":     "string",   // REQUIRED β€” must match a field name e.g. "software_engineering"
  "claim":      "string",   // REQUIRED β€” the verified fact, max 2000 chars
  "confidence": 0.9,        // optional β€” 0.0–1.0 (default 0.9)
  "source":     "manual"    // optional β€” label for the correction source
}
bash
curl -X POST http://localhost:8000/corrections \
  -H "Content-Type: application/json" \
  -d '{
    "subject":    "bubble_sort_complexity",
    "domain":     "software_engineering",
    "claim":      "Bubble sort is O(nΒ²) average and worst-case. O(1) extra space.",
    "confidence": 0.99,
    "source":     "manual"
  }'
Response
{
  "stored":      true,
  "subject":     "bubble_sort_complexity",
  "domain":      "software_engineering",
  "claim":       "Bubble sort is O(nΒ²) average and worst-case. O(1) extra space.",
  "confidence":  0.99,
  "decay_class": "A"   // A = never decays | B = 10yr | C = 3yr | D = 6mo
}

GET /corrections

bash + response shape
curl http://localhost:8000/corrections

{
  "total":    23,
  "returned": 23,
  "corrections": [
    {
      "subject":              "bubble_sort_complexity",
      "domain":               "software_engineering",
      "claim":                "Bubble sort is O(nΒ²)...",
      "effective_confidence": 0.99,
      "decay_class":          "A",
      "source":               "manual"
    },
    ...
  ]
}
Python β€” audit what the system currently knows
data = requests.get("http://localhost:8000/corrections").json()
print(f"Total corrections: {data['total']}")

for c in data["corrections"]:
    print(f"[{c['decay_class']}] {c['domain']} / {c['subject']}: {c['claim'][:60]}")

5. Deployment

POST /deploy/green

Register a GREEN model candidate and evaluate it against the current BLUE. Promotes if the U score delta exceeds the configured threshold.

Request body
{
  "specialist":     "string",  // REQUIRED β€” specialist name e.g. "swe"
  "green_model":    "string",  // REQUIRED β€” model path or HuggingFace ID
  "n_eval_queries": 10         // optional β€” evaluation queries to run (default 10, max 100)
}
bash
curl -X POST http://localhost:8000/deploy/green \
  -H "Content-Type: application/json" \
  -d '{
    "specialist":  "swe",
    "green_model": "qwen2.5-coder:14b",
    "n_eval_queries": 20
  }'
Response
{
  "specialist":  "swe",
  "promoted":    true,          // false = GREEN didn't beat BLUE by enough
  "u_delta":     0.062,         // green_u - blue_u  (must exceed threshold)
  "blue_u":      0.712,
  "green_u":     0.774,
  "threshold":   0.050,
  "message":     "GREEN promoted: U delta 0.062 exceeds threshold 0.050",
  "dry_run_only": false
}
Python
r = requests.post("http://localhost:8000/deploy/green", json={
    "specialist": "swe",
    "green_model": "qwen2.5-coder:14b",
})
result = r.json()

if result["promoted"]:
    print(f"βœ“ Promoted. U delta: {result['u_delta']:+.3f}")
else:
    print(f"βœ— Not promoted. Delta {result['u_delta']:+.3f} below threshold {result['threshold']}")

5b. Shadow mode

MethodPathDescription
POST/deploy/shadow/{specialist}Activate shadow mode β€” GREEN receives production traffic silently. Body: green_endpoint (required), min_queries, threshold
GET/deploy/shadow/{specialist}Report accumulated shadow scores: n_queries, mean_delta, blue/green mean U, ready_to_promote, progress string
DELETE/deploy/shadow/{specialist}Deactivate shadow mode. Query param: clear_scores=true to also purge accumulated scores

5c. Batch queue

MethodPathDescription
POST/batch/jobsSubmit a batch job. Body: queries (list), priority (high|normal|low), max_parallel. Returns job_id immediately.
GET/batch/jobs/{job_id}Poll job status and partial results. Returns status, n_done, n_pending, n_errors, results (list of completed responses)
GET/batch/jobsList recent batch jobs. Query params: status (pending|running|done|error), limit

6. Observability

GET /metrics

Prometheus scrape endpoint. Returns plain text in Prometheus exposition format β€” not JSON. Point your Prometheus scrape_configs here.

bash β€” sample output
curl http://localhost:8000/metrics | grep "^aua_"

aua_queries_total{domain="software_engineering",routing_mode="single",status="ok"} 142.0
aua_query_latency_seconds_bucket{domain="software_engineering",routing_mode="single",le="0.5"} 89.0
aua_utility_score{domain="software_engineering"} 0.748
aua_contradiction_rate{domain="software_engineering"} 0.0
aua_assertion_results_total{assertion_name="PythonSyntaxCheck",level="blocking",passed="true",domain="software_engineering"} 38.0
aua_assertion_fail_rate β€” not a gauge, derive from assertion_results_total

GET /metrics/cost

bash + response
curl http://localhost:8000/metrics/cost | python3 -m json.tool

{
  "swe":  {"queries": 142, "gpu_hours": 0.014, "cost_usd": 0.0097},
  "math": {"queries":  47, "gpu_hours": 0.005, "cost_usd": 0.0034},
  "total_cost_usd": 0.0131
}
Python β€” cost tracking in a loop
cost = requests.get("http://localhost:8000/metrics/cost").json()
print(f"Total cost so far: ${cost['total_cost_usd']:.4f}")
for name, stats in cost.items():
    if isinstance(stats, dict):
        print(f"  {name}: {stats['queries']} queries  ${stats['cost_usd']:.4f}")

7. Sessions & chat

Sessions maintain conversation state across multiple queries. The Chat UI uses these endpoints. You can also call them directly to build your own chat interface on top of AUA.

bash β€” full session flow
# 1. Create a session
SESSION=$(curl -s -X POST http://localhost:8000/sessions \
  -H "Content-Type: application/json" \
  -d '{"title": "Python help"}' | python3 -c "import sys,json; print(json.load(sys.stdin)['id'])")

echo "Session: $SESSION"

# 2. Send a message
curl -s -X POST http://localhost:8000/sessions/$SESSION/messages \
  -H "Content-Type: application/json" \
  -d '{"content": "Write binary search in Python."}'

# 3. List sessions
curl http://localhost:8000/sessions

# 4. Get session metadata
curl http://localhost:8000/sessions/$SESSION

# 5. Delete session
curl -X DELETE http://localhost:8000/sessions/$SESSION
POST /sessions β€” response
{
  "id":            "3f2a1b9c-...",   // session UUID β€” use in subsequent requests
  "title":         "Python help",
  "created_at":    1747000000.123,   // Unix timestamp
  "updated_at":    1747000000.123,
  "message_count": 0
}
GET /sessions β€” response
{
  "sessions": [
    {"id": "3f2a1b9c-...", "title": "Python help", "message_count": 4, "updated_at": 1747003200.0},
    {"id": "8a1c4e2f-...", "title": "New Chat",    "message_count": 1, "updated_at": 1747002000.0}
  ]
}
Python β€” build a simple chat loop
import requests

BASE = "http://localhost:8000"

# Create session
session = requests.post(f"{BASE}/sessions", json={"title": "My chat"}).json()
session_id = session["id"]

while True:
    user_input = input("You: ")
    if user_input.lower() == "quit":
        break

    r = requests.post(f"{BASE}/sessions/{session_id}/messages",
                      json={"content": user_input}).json()
    print(f"AUA [{r['primary_domain']}]: {r['response']}\n")

# Clean up
requests.delete(f"{BASE}/sessions/{session_id}")

8. Extensions

GET /extensions

bash + response shape
curl http://localhost:8000/extensions

{
  "plugins": {
    "arbiter_policy":    null,      // null = using the built-in
    "correction_store":  null,
    "field_classifier":  "plugins.mine:KeywordClassifier",
    "model_backend":     null,
    "promotion_policy":  null,
    "state_store":       null,
    "utility_scorer":    null
  },
  "hooks": {
    "on_correction": ["SlackNotificationHook"]
    // keyed by hook point; class names of registered hooks
  },
  "middleware": ["ShoutMiddleware"]   // in pipeline order
}

Standard error response

All errors return this shape regardless of endpoint:

JSON
{
  "error":       "AUA_SPECIALIST_TIMEOUT",   // stable error code β€” use for programmatic handling
  "message":     "Specialist swe timed out after 30s",
  "status_code": 503,
  "request_id":  "user_42"                  // echoes session_id from request
}
Python β€” error handling pattern
r = requests.post("http://localhost:8000/query", json={"query": "hello"})

if not r.ok:
    err = r.json()
    print(f"Error {err['status_code']}: {err['error']}")
    print(err["message"])
else:
    data = r.json()
    print(data["response"])
Praneeth Tota Β· Ph.D. Computer Science (Algorithmic Game Theory) Β· Illinois Institute of Technology
linkedin.com/in/praneethtota Β· Code: GPL-3.0 Β· Docs: CC BY 4.0
Home Whitepaper GitHub
AUA Framework v1.0.2