"""IndexManager and SchemaManager: create and validate graph indexes via migrate adapters."""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from typing import Any
from runic.orm.schema.index_manager import IndexSpec, extract_declared_specs
log = logging.getLogger(__name__)
_SPEC_SORT_KEY = lambda s: (s.label, s.property, s.index_type) # noqa: E731
def _resolve_adapter(adapter_or_graph: Any) -> Any:
"""Wrap a raw FalkorDB graph handle; pass through real adapters unchanged."""
if hasattr(adapter_or_graph, "create_node_range_index"):
from runic.migrate.adapters.falkordb import FalkorDBIndexAdapter
return FalkorDBIndexAdapter(adapter_or_graph)
return adapter_or_graph
@dataclass
class _SpecsData:
declared: set[IndexSpec]
existing: set[IndexSpec]
missing: list[IndexSpec]
extra: list[IndexSpec]
errors: list[str]
def _collect_specs(entity_classes: list[type], adapter: Any) -> _SpecsData:
declared: set[IndexSpec] = set()
errors: list[str] = []
for cls in entity_classes:
try:
declared |= extract_declared_specs(cls)
except Exception as exc:
errors.append(f"Failed to extract specs for {cls.__name__!r}: {exc}")
try:
existing: set[IndexSpec] = adapter.get_existing_specs()
except Exception as exc:
errors.append(f"Failed to read live indexes: {exc}")
existing = set()
return _SpecsData(
declared=declared,
existing=existing,
missing=sorted(declared - existing, key=_SPEC_SORT_KEY),
extra=sorted(existing - declared, key=_SPEC_SORT_KEY),
errors=errors,
)
[docs]
@dataclass
class ValidationResult:
"""Result of a :meth:`SchemaManager.validate_schema` call.
Attributes:
is_valid: ``True`` when declared and existing indexes match exactly.
missing_indexes: Declared but not yet created in the live graph.
extra_indexes: Present in the graph but not declared on any entity.
errors: Non-fatal messages collected during validation.
"""
is_valid: bool
missing_indexes: list[IndexSpec] = field(default_factory=list)
extra_indexes: list[IndexSpec] = field(default_factory=list)
errors: list[str] = field(default_factory=list)
@dataclass
class SchemaInfo:
"""Diagnostic snapshot of the current schema state.
Attributes:
is_valid: ``True`` when declared and existing indexes match exactly.
declared_count: Number of indexes declared across all entity classes.
existing_count: Number of indexes found in the live graph.
missing_count: Number of declared indexes not yet created.
extra_count: Number of live indexes not declared on any entity.
missing: Serialisable list of missing index specs.
extra: Serialisable list of extra index specs.
errors: Non-fatal messages collected during introspection.
"""
is_valid: bool
declared_count: int
existing_count: int
missing_count: int
extra_count: int
missing: list[dict[str, str]]
extra: list[dict[str, str]]
errors: list[str]
[docs]
class IndexManager:
"""Creates and manages graph indexes and constraints from entity Field declarations.
Accepts any object satisfying the :class:`~runic.migrate.adapters._base.IndexAdapter`
protocol — a migrate adapter (Neo4j, Memgraph, FalkorDB, ArcadeDB, AGE) or a raw
FalkorDB graph handle (auto-wrapped in ``FalkorDBIndexAdapter`` for backward compat).
**Fulltext batching** — Neo4j and Memgraph use a single named fulltext index per
label covering all search fields. ``create_indexes()`` collapses all FULLTEXT specs
for the same label into one ``create_fulltext_index(label, prop1, prop2, ...)`` call.
Example::
from runic.migrate import IndexManager, create_adapter
adapter = create_adapter(
"neo4j", host="localhost", database="neo4j", password="secret"
)
manager = IndexManager(adapter)
manager.create_indexes(Person)
manager.ensure_indexes(Trip)
"""
def __init__(self, adapter_or_graph: Any) -> None:
self._adapter: Any = _resolve_adapter(adapter_or_graph)
[docs]
def create_indexes(
self,
entity_class: type,
*,
if_not_exists: bool = True,
) -> None:
"""Create all indexes and constraints declared on *entity_class*.
FULLTEXT specs sharing a label are batched into a single
``create_fulltext_index(label, *props)`` call.
When *if_not_exists* is ``True`` (default), existing non-FULLTEXT specs are
skipped. FULLTEXT creation is always attempted — adapters must handle idempotency.
"""
from runic.orm.core.models import Edge, Node
if issubclass(entity_class, Node):
label: str = getattr(entity_class, "_primary_label", entity_class.__name__)
log.debug("Ensuring vertex type for %s", label)
self._adapter.create_vertex_type(label)
elif issubclass(entity_class, Edge):
edge_type: str = getattr(entity_class, "_edge_type", entity_class.__name__)
log.debug("Ensuring edge type for %s", edge_type)
self._adapter.create_edge_type(edge_type)
declared = extract_declared_specs(entity_class)
existing = self._adapter.get_existing_specs() if if_not_exists else set()
ft_by_label: dict[str, list[str]] = {}
non_ft: list[IndexSpec] = []
for spec in declared:
if spec.index_type == "FULLTEXT":
ft_by_label.setdefault(spec.label, []).append(spec.property)
else:
non_ft.append(spec)
for lbl, props in sorted(ft_by_label.items()):
existing_ft = {
s.property
for s in existing
if s.label == lbl and s.index_type == "FULLTEXT"
}
new_props = [p for p in props if p not in existing_ft]
if not new_props:
log.debug("All fulltext props already exist for %s, skipping", lbl)
continue
log.debug("Creating fulltext index on %s covering %s", lbl, props)
self._adapter.create_fulltext_index(lbl, *props)
for spec in sorted(non_ft, key=lambda s: (s.label, s.property, s.index_type)):
if spec in existing:
log.debug("Index already exists, skipping: %r", spec)
continue
self.create_spec(spec)
[docs]
def ensure_indexes(self, entity_class: type) -> None:
"""Create missing indexes for *entity_class*; skip those that already exist."""
self.create_indexes(entity_class, if_not_exists=True)
[docs]
def create_spec(self, spec: IndexSpec) -> None:
"""Issue the appropriate adapter call to create a single IndexSpec."""
if spec.index_type == "UNIQUE":
log.debug("Creating unique constraint: %r", spec)
self._adapter.create_constraint(
"UNIQUE", "NODE", spec.label, [spec.property]
)
elif spec.index_type == "RANGE":
log.debug("Creating range index: %r", spec)
self._adapter.create_range_index(spec.label, spec.property)
elif spec.index_type == "FULLTEXT":
log.debug("Creating fulltext index: %r", spec)
self._adapter.create_fulltext_index(spec.label, spec.property)
elif spec.index_type == "VECTOR":
log.debug("Creating vector index: %r", spec)
self._adapter.create_vector_index(spec.label, spec.property, 0, "cosine")
else:
log.warning(
"Unknown index type %r for %r — skipping", spec.index_type, spec
)
[docs]
def drop_spec(self, spec: IndexSpec) -> None:
"""Issue the appropriate adapter call to drop a single IndexSpec."""
if spec.index_type == "UNIQUE":
log.debug("Dropping unique constraint: %r", spec)
self._adapter.drop_constraint("UNIQUE", "NODE", spec.label, [spec.property])
elif spec.index_type == "RANGE":
log.debug("Dropping range index: %r", spec)
self._adapter.drop_range_index(spec.label, spec.property)
elif spec.index_type == "FULLTEXT":
log.debug("Dropping fulltext index: %r", spec)
self._adapter.drop_fulltext_index(spec.label, spec.property)
elif spec.index_type == "VECTOR":
log.debug("Dropping vector index: %r", spec)
self._adapter.drop_vector_index(spec.label, spec.property)
else:
log.warning(
"Unknown index type %r for %r — cannot drop", spec.index_type, spec
)
[docs]
class SchemaManager:
"""Validates and synchronizes graph indexes against entity Field declarations.
Accepts any object satisfying the :class:`~runic.migrate.adapters._base.IndexAdapter`
protocol (a migrate adapter or a raw FalkorDB graph handle for backward compat).
Example::
from runic.migrate import SchemaManager, create_adapter
adapter = create_adapter(
"neo4j", host="localhost", database="neo4j", password="secret"
)
schema = SchemaManager(adapter)
result = schema.validate_schema([Person, KnowsEdge])
schema.sync_schema([Person, KnowsEdge])
"""
def __init__(self, adapter_or_graph: Any) -> None:
self._adapter: Any = _resolve_adapter(adapter_or_graph)
self._index_manager = IndexManager(self._adapter)
[docs]
def ensure_entity_types(self, entity_classes: list[type]) -> None:
"""Create vertex/edge types for *entity_classes* on adapters that require them.
No-op for schemaless backends. Issues ``CREATE VERTEX TYPE`` / ``CREATE EDGE TYPE``
DDL for ArcadeDB.
"""
from runic.orm.core.models import Edge, Node
for cls in entity_classes:
if issubclass(cls, Node):
label: str = getattr(cls, "_primary_label", cls.__name__)
self._adapter.create_vertex_type(label)
elif issubclass(cls, Edge):
edge_type: str = getattr(cls, "_edge_type", cls.__name__)
self._adapter.create_edge_type(edge_type)
[docs]
def validate_schema(self, entity_classes: list[type]) -> ValidationResult:
"""Compare declared indexes against the live graph state.
Returns a :class:`ValidationResult` describing missing and extra indexes.
``is_valid`` is ``True`` only when both sets are empty and no errors occurred.
"""
data = _collect_specs(entity_classes, self._adapter)
return ValidationResult(
is_valid=not data.missing and not data.extra and not data.errors,
missing_indexes=data.missing,
extra_indexes=data.extra,
errors=data.errors,
)
[docs]
def sync_schema(
self,
entity_classes: list[type],
*,
drop_extra: bool = False,
) -> None:
"""Create entity types and missing indexes; drop extras when *drop_extra* is ``True``.
Calls ``ensure_entity_types`` first (required for ArcadeDB empty collections),
then delegates to :meth:`IndexManager.create_indexes` per class.
"""
self.ensure_entity_types(entity_classes)
for cls in entity_classes:
self._index_manager.create_indexes(cls)
if drop_extra:
result = self.validate_schema(entity_classes)
for spec in result.extra_indexes:
log.info("Dropping extra index: %r", spec)
self._index_manager.drop_spec(spec)
[docs]
def get_schema_diff(self, entity_classes: list[type]) -> str:
"""Return a human-readable diff of declared vs existing indexes.
Lines are prefixed with ``MISSING`` or ``EXTRA``; returns a single
"in sync" message when no differences exist.
"""
result = self.validate_schema(entity_classes)
if not result.missing_indexes and not result.extra_indexes:
return "Schema is in sync — no differences found."
lines: list[str] = [
f" MISSING {s.index_type:<10} {s.label}.{s.property}"
for s in result.missing_indexes
]
lines.extend(
f" EXTRA {s.index_type:<10} {s.label}.{s.property}"
for s in result.extra_indexes
)
return "\n".join(lines)
[docs]
def get_schema_info(self, entity_classes: list[type]) -> SchemaInfo:
"""Return a :class:`SchemaInfo` snapshot of the current schema state."""
data = _collect_specs(entity_classes, self._adapter)
return SchemaInfo(
is_valid=not data.missing and not data.extra and not data.errors,
declared_count=len(data.declared),
existing_count=len(data.existing),
missing_count=len(data.missing),
extra_count=len(data.extra),
missing=[
{"label": s.label, "property": s.property, "type": s.index_type}
for s in data.missing
],
extra=[
{"label": s.label, "property": s.property, "type": s.index_type}
for s in data.extra
],
errors=data.errors,
)