Continuous Maintenance
Keep Answers Current

A traditional database stores what you put in and returns what you ask for. AttestDB adds continuous maintenance — freshness checks, drift detection, composite updates, query prediction, and background upkeep for changing knowledge.

The Loop

Continuous maintenance runs a perceive–plan–act cycle via the Heartbeat Scheduler. Every cycle (default: 30 seconds) it surveys the claim system, plans maintenance work, and acts on what it finds.

1

Perceive

Scan access patterns, freshness scores, schema changes, and claim lifecycle tiers.

2

Plan

Promote hot claims, demote cold ones, flag stale composites, detect recurring queries.

3

Act

Re-synthesize stale composites, apply confidence decay, consolidate near-duplicates, detect gaps.

4

Report

Emit a cycle report with metrics: promotions, demotions, composites synthesized, gaps found.

from attestdb import AttestDB
from attestdb.intelligence.heartbeat import HeartbeatScheduler, HeartbeatConfig

db = AttestDB("my.attest")

# Start the heartbeat with custom config
config = HeartbeatConfig(
    cycle_interval_seconds=30.0,
    hot_threshold=0.65,
    freshness_threshold=0.4,
    decay_half_life_hours=336.0,   # 2 weeks
)
scheduler = HeartbeatScheduler(db, config=config)
scheduler.start()

# Check status anytime
status = scheduler.get_status()
print(f"Cycles: {status.cycle_count}")
print(f"Hot claims: {status.hot_claims}")
print(f"Stale composites: {status.stale_composites}")
print(f"Tier distribution: {status.tier_distribution}")

# Review recent cycle history
for report in scheduler.history[-3:]:
    print(f"Cycle {report.cycle_number}: {report.duration_ms:.0f}ms, "
          f"composites={report.composites_synthesized}, "
          f"gaps={report.gaps_detected}")

# Trigger an immediate cycle
scheduler.run_now()

# Stop cleanly
scheduler.stop()

Heartbeat Configuration

HeartbeatConfig controls every aspect of the cycle. All parameters have sensible defaults — start with HeartbeatScheduler(db) and tune from there.

ParameterDefaultDescription
cycle_interval_seconds30.0Seconds between perceive–plan–act cycles
hot_threshold0.65Combined access score above which a claim is “hot”
freshness_threshold0.4Below this score, entities get freshness warnings on query
decay_half_life_hours336.0Confidence half-life for temporal decay (default: 2 weeks)
consolidation_similarity_threshold0.90Cosine similarity above which claims are consolidated
max_consolidation_batch64Max claims to consolidate per cycle
composite_synthesis_budget_seconds10.0Max time to spend synthesizing composites per cycle
working_memory_max_claims512Working set size for hot claims
schema_drift_interval_hours24.0How often to check for schema drift
retention_days180Days before low-importance claims are pruned
jitter0.1Random jitter fraction on cycle interval

Features

Enterprise

Composite Claims

LLM-synthesized summaries that roll up raw claims into executive briefs, risk assessments, relationship maps, and temporal profiles. Auto-invalidated when source claims change.

Open Source

Drift Detection

Monitors data sources for schema changes: new fields, removed fields, type changes, and value distribution drift. Circuit breaker pauses ingestion on destructive changes.

Open Source

Schema Versioning

Point-in-time snapshots of every source’s schema. Diff any two versions. Track how your data sources evolve over weeks and months.

Enterprise

Freshness Monitoring

Tracks when entities were last queried and updated. Stale entities get freshness warnings attached to query results. Drives autodidact re-research priorities.

Enterprise

Query Prediction

Detects recurring query patterns and predicts when they’ll fire next. Pre-computes results for predictable queries so they resolve instantly.

Enterprise

Proactive Hooks

Wires intelligence into the ingest and query paths. Every ingestion initializes a lifecycle record; every query checks freshness and records access patterns.

Open Source

Auto-Discovery

Infers semantic types for data source fields using heuristics or LLM. Maps raw field names to a shared taxonomy so disparate sources become structurally comparable.

Open Source

Entity Resolution

Matches entities across sources using exact IDs, fuzzy names, domain rules, and optional AI-assisted matching. Auto-links on ingestion above configurable thresholds.

Composite Claims

A composite claim is a derived, LLM-synthesized summary that rolls up the raw claims about an entity into a single narrative. Seven composite types are supported, each tuned for different use cases:

TypeWhat it produces
entity_brief2–3 sentence executive summary of everything known about an entity
risk_assessmentSynthesized risk narrative with a 0–1 risk score and key risk factors
revenue_summaryUnified financial view aggregating revenue claims across sources
relationship_healthOverall health of an entity’s key relationships
relationship_mapKey relationships and their strength, organized by type
evidence_summaryEvidence quality and coverage assessment — where are the gaps?
temporal_profileHow knowledge about this entity has evolved over time
from attestdb.intelligence.composite_synthesizer import CompositeClaimSynthesizer

synth = CompositeClaimSynthesizer(db, model="auto")

# Synthesize an executive brief
composite = synth.synthesize("EGFR", "entity_brief")
print(composite.narrative)
# "EGFR is a receptor tyrosine kinase implicated in multiple solid
#  tumors. 48,231 claims from 12 sources, with strong corroboration
#  for its role in NSCLC (conf=0.94) and colorectal cancer (conf=0.87)."
print(f"Score: {composite.score:.2f}, Sources: {len(composite.source_claim_ids)}")

# Risk assessment
risk = synth.synthesize("EGFR", "risk_assessment")
print(f"Risk score: {risk.score:.2f}")
print(f"Key factors: {risk.key_factors}")

# Batch synthesis for a list of entities
composites = synth.synthesize_batch(
    ["BRCA1", "TP53", "KRAS"],
    composite_type="entity_brief",
)

# Composites auto-invalidate when source claims change
synth.invalidate("EGFR")  # mark all EGFR composites as stale

# Find stale composites for re-synthesis
for stale in synth.get_stale_composites(limit=10):
    synth.synthesize(stale.entity_id, stale.composite_type)

# Track LLM cost
cost = synth.cost_summary()
print(f"Total cost: ${cost['total_cost_usd']:.2f} ({cost['total_calls']} calls)")

Drift Detection

Data sources change. Fields get added, renamed, or removed. Value distributions shift. Drift detection catches these changes before they corrupt your claim system.

Four change types are tracked, each with a severity level:

Change TypeSeverityExample
additiveLowNew field risk_score appeared in the source
value_driftMediumFill rate for email dropped from 95% to 40%
modifiedMediumField amount changed from integer to string
destructiveHighField customer_id was removed entirely
from attestdb.discovery.drift_detector import DriftMonitor

monitor = DriftMonitor(schema_store_path="./schemas")

# Check a single source
report = monitor.check_source("salesforce", sf_connector)
if report:
    print(f"{len(report.changes)} changes detected")

    for change in report.destructive_changes:
        print(f"  DESTRUCTIVE: {change.field_name} removed")
    for change in report.additive_changes:
        print(f"  NEW FIELD: {change.field_name}")
    for change in report.value_drift_changes:
        print(f"  DRIFT: {change.field_name} — {change.details}")

    if report.requires_review:
        print("Review required before resuming ingestion")

# Check all connected sources
reports = monitor.check_all({
    "salesforce": sf_connector,
    "hubspot": hs_connector,
    "postgres": pg_connector,
})
for r in reports:
    print(f"{r.source_id}: {len(r.changes)} changes")

# Review version history
history = monitor.get_history("salesforce")
print(f"Salesforce: {len(history)} schema versions recorded")

Circuit breaker: When a source accumulates more than 20 destructive changes, drift detection automatically pauses ingestion and flags it for manual review. This prevents cascading data corruption from upstream API changes.

Schema Versioning

Every time drift is detected, a new schema version is recorded. You can diff any two versions to see exactly what changed and when.

from attestdb.discovery.schema_versioning import SchemaVersionStore

store = SchemaVersionStore("./schema_versions")

# Record a version after drift detection
version = store.record_version(
    source_id="salesforce",
    schema_map=current_schema,
    changes=drift_report.changes,
)
print(f"Recorded v{version.version_id}")

# Get current version
current = store.get_current("salesforce")
print(f"Current: v{current.version_id}, {len(current.changes_from_previous)} changes")

# Diff between any two versions
changes = store.diff("salesforce", version_a=1, version_b=5)
for c in changes:
    print(f"  {c.change_type}: {c.field_name} ({c.severity})")

# Full version history
for v in store.get_all("salesforce"):
    print(f"  v{v.version_id}: {len(v.changes_from_previous)} changes")

Query Prediction

The access tracker records every query and builds a model of recurring patterns. When a query has fired 3+ times at regular intervals, the system predicts when it will fire next and can pre-compute the result.

from attestdb.intelligence.access_tracker import AccessTracker

tracker = AccessTracker()

# After each query, the tracker records access patterns automatically
# (wired via ProactiveHooks — see below)

# Find hot claims — frequently and recently accessed
hot = tracker.get_hot_claims(top_n=100)
for claim_id, score in hot[:5]:
    print(f"{claim_id}: score={score:.2f}")

# Find hot entities
hot_entities = tracker.get_hot_entities(top_n=50)
for entity_id, score in hot_entities[:5]:
    print(f"{entity_id}: score={score:.2f}")

# Detect recurring query patterns
recurring = tracker.get_recurring_queries(min_occurrences=3)
for rq in recurring:
    print(f"Recurring: {rq.description}")
    print(f"  Fires every {rq.avg_interval_hours:.1f} hours")
    print(f"  Next predicted: {rq.predicted_next}")
    print(f"  Entities: {rq.entity_ids}")

# Stats
stats = tracker.stats()
print(f"Tracking {stats['tracked_claims']} claims, "
      f"{stats['tracked_entities']} entities, "
      f"{stats['tracked_query_plans']} query plans")

Auto-Discovery

When you connect a new data source, auto-discovery samples it, infers the semantic type of each field, and generates a schema mapping. Fields are classified into a shared taxonomy (revenue, satisfaction, contact, timestamp, etc.) using 59 heuristic rules with optional LLM refinement.

from attestdb.discovery.analyzer import infer_semantics, detect_deprecated_fields
from attestdb.discovery.sampler import sample_source, analyze_fields

# Sample and profile a data source
connector = db.connect("postgres", dsn="postgresql://...")
samples = sample_source(connector, sample_size=1000)
profiles = analyze_fields(samples)

# Infer semantics — uses LLM if available, falls back to heuristics
mappings = infer_semantics(profiles)
for m in mappings:
    status = "auto" if m.review_status == "auto_mapped" else "review"
    print(f"[{status}] {m.field_name} → {m.semantic_type} ({m.confidence:.0%})")

# Detect deprecated fields (low fill rate, zero variance, naming patterns)
deprecated = detect_deprecated_fields(profiles)
if deprecated:
    print(f"Deprecated fields to skip: {deprecated}")

Proactive Hooks

Proactive hooks wire the continuous maintenance features into the normal ingest and query paths. Once installed, they run automatically — no code changes needed at call sites.

HookFires onWhat it does
Post-ingestion Every ingest() / ingest_batch() Initializes lifecycle record for the claim, calculates importance score, invalidates any composites for affected entities.
Post-query Every query() Records access in the tracker, touches lifecycle (keeps claim “warm”), checks freshness and attaches warnings to results if the entity is stale.
from attestdb.intelligence.proactive_hooks import ProactiveHooks

# Wire hooks into the database + heartbeat
hooks = ProactiveHooks(db, scheduler)
hooks.install()

# Now every query automatically:
# 1. Records which entities and claims were accessed
# 2. Updates lifecycle tiers (hot/warm/cold)
# 3. Attaches freshness warnings if entity is stale
frame = db.query("EGFR", top_n=50)

# Check if any freshness warnings were attached
for gap in frame.knowledge_gaps:
    if "stale" in gap.lower():
        print(f"Warning: {gap}")

# Clean up
hooks.uninstall()

Putting It All Together

In production, you enable all continuous maintenance features at once. The heartbeat manages the lifecycle; proactive hooks wire it in; autodidact fills gaps; drift detection guards the perimeter. Here’s the full setup:

from attestdb import AttestDB
from attestdb.intelligence.heartbeat import HeartbeatScheduler, HeartbeatConfig
from attestdb.intelligence.proactive_hooks import ProactiveHooks
from attestdb.intelligence.composite_synthesizer import CompositeClaimSynthesizer
from attestdb.discovery.drift_detector import DriftMonitor

db = AttestDB("production.attest")

# 1. Start the heartbeat
scheduler = HeartbeatScheduler(db, config=HeartbeatConfig(
    cycle_interval_seconds=30.0,
    freshness_threshold=0.4,
))
scheduler.start()

# 2. Wire in proactive hooks
hooks = ProactiveHooks(db, scheduler)
hooks.install()

# 3. Enable autodidact for gap-filling
db.enable_autodidact(
    interval=3600,
    max_cost_per_day=1.00,
    sources="auto",
    enabled_triggers=["timer", "retraction", "inquiry"],
)

# 4. Set up drift monitoring
monitor = DriftMonitor(schema_store_path="./schemas")

# Now your database:
# - Tracks access patterns and predicts recurring queries
# - Monitors freshness and warns on stale results
# - Synthesizes and maintains composite summaries
# - Detects schema drift before it corrupts data
# - Fills knowledge gaps autonomously via autodidact
# - Applies confidence decay to aging claims
# - Consolidates near-duplicate claims

How It Differs from a Static Database

CapabilityTraditional DBContinuous Maintenance
Schema changesMigration scripts, manual reviewAuto-detected, versioned, circuit-breaker on destructive changes
Stale dataYou notice when it’s too lateFreshness scores on every query, automatic warnings
SummariesMaterialized views, manual SQLLLM-synthesized composites, auto-invalidated on change
Access patternsSlow query logs, APM toolsBuilt-in hot/cold tracking with recurring query prediction
Knowledge gapsYou don’t know what you don’t knowContinuous gap detection + autodidact fills them
Confidence decayData never expires unless you delete itConfigurable half-life, old claims lose confidence over time