Audit Stores¶
Every arbitration emits a sequence of structured audit events. The audit store persists them so you can replay, query, and prove what happened.
Audit events¶
| Event type | When emitted |
|---|---|
arbitration_started |
At the start of every arbitrate() call |
policy_checked |
After pre- and post-policy evaluation |
strategy_resolved |
After the strategy picks a winner |
verdict_issued |
After the final verdict is ready |
human_deferred |
When DeferToHuman is used |
human_responded |
When a human resolves a deferred decision (sidecar/MCP) |
Each event has an id, event_type, timestamp, and a payload dict carrying decision/verdict IDs and relevant metadata.
NullAuditStore¶
Default. Discards all events. Zero overhead for development or testing.
from saalis.audit.base import NullAuditStore
arb = Arbitrator(strategies=[WeightedVote()])
# NullAuditStore is used automatically when no audit_store is passed
JSONLAuditStore¶
Appends events as newline-delimited JSON to a file. Simple, portable, and readable with any text tool.
from saalis.audit.jsonl import JSONLAuditStore
store = JSONLAuditStore("audit.jsonl")
arb = Arbitrator(strategies=[WeightedVote()], audit_store=store)
await arb.arbitrate(decision)
# Query all events
events = await store.query()
# Query by event type
from saalis.models import AuditEventType
started = await store.query(event_type=AuditEventType.arbitration_started)
# Query by time range
from datetime import datetime, UTC
recent = await store.query(
since=datetime(2025, 1, 1, tzinfo=UTC),
limit=50,
)
File writes are non-blocking (run_in_executor), safe for high-throughput async use.
Note
JSONLAuditStore supports append and query only. It does not support deferred decisions (DeferToHuman flow). Use SQLiteAuditStore for that.
SQLiteAuditStore¶
Full-featured store backed by SQLite via SQLAlchemy async. Supports audit events and the deferred decision lifecycle.
from saalis.audit.sqlite import SQLiteAuditStore
store = SQLiteAuditStore("sqlite+aiosqlite:///./saalis_audit.db")
arb = Arbitrator(strategies=[WeightedVote()], audit_store=store)
await arb.arbitrate(decision)
# Query events
events = await store.query(limit=100)
# Fetch a single event by ID
event = await store.get_event("some-event-id")
# Close the connection pool when done
await store.close()
The schema is created automatically on first use. Two tables:
audit_events— all eventsdeferred_decisions— records forDeferToHumanflow
Deferred decision methods¶
These are used internally by the sidecar and MCP server. You can also call them directly:
# Check if a decision is pending human input
deferred = await store.get_deferred(decision_id)
if deferred and deferred.resolved_at is None:
print("Still waiting for human input")
# List all pending decisions
pending = await store.list_pending_deferred()
# Resolve a deferred decision
await store.resolve_deferred(
decision_id=decision_id,
outcome="p2", # the winning proposal id
resolved_by="alice",
)
Choosing a store¶
| Requirement | Store |
|---|---|
| Development / testing | NullAuditStore |
| Simple persistent log, no HTTP/MCP | JSONLAuditStore |
Full audit trail + DeferToHuman |
SQLiteAuditStore |
| Production scale, multi-process | PostgreSQL (roadmap) |
Writing a custom store¶
Subclass AuditStore and implement append and query:
from saalis.audit.base import AuditStore
from saalis.models import AuditEvent, AuditEventType
from datetime import datetime
class InMemoryAuditStore(AuditStore):
def __init__(self) -> None:
self._events: list[AuditEvent] = []
async def append(self, event: AuditEvent) -> None:
self._events.append(event)
async def query(
self,
event_type: AuditEventType | None = None,
since: datetime | None = None,
until: datetime | None = None,
limit: int = 100,
) -> list[AuditEvent]:
results = self._events
if event_type:
results = [e for e in results if e.event_type == event_type]
if since:
results = [e for e in results if e.timestamp >= since]
if until:
results = [e for e in results if e.timestamp <= until]
return results[:limit]