from __future__ import annotations
import contextlib
import logging
import os
from pathlib import Path
from runic.migrate.adapters import GraphAdapter
from runic.migrate.manifest import SchemaManifest
from runic.migrate.operations import GraphOperations
from runic.migrate.script import (
Revision,
RevisionInfo,
RevisionNotFound,
ScriptDirectory,
)
from runic.migrate.version import VersionNode
log = logging.getLogger(__name__)
[docs]
class IrreversibleMigrationError(Exception):
pass
[docs]
class Runic:
"""The single SDK entry point for runic migrations.
Handles both DB-connected operations (upgrade, downgrade, stamp, current)
and offline DAG queries (get_history, get_heads, create_revision, …).
Example::
from pathlib import Path
from runic import Runic
from runic.migrate.adapters.falkordb import FalkorDBAdapter
adapter = FalkorDBAdapter.from_url("falkor://localhost:6379", "my_graph")
runic = Runic(adapter, script_location=Path("runic/"))
runic.migrate.upgrade("head")
"""
def __init__(
self,
adapter: GraphAdapter,
script_location: Path,
*,
preview: bool = False,
target_manifest: SchemaManifest | None = None,
track_checksums: bool = True,
track_installed_by: bool = True,
truncate_slug_length: int = 40,
file_template: str | None = None,
) -> None:
self._adapter = adapter
self._script_location = script_location
self._preview = preview
self._target_manifest = target_manifest
self._track_checksums = track_checksums
self._track_installed_by = track_installed_by
self._truncate_slug_length = truncate_slug_length
self._file_template = file_template
self._version_node = VersionNode(adapter)
self._script_dir = ScriptDirectory.load(script_location)
self._ops = GraphOperations(adapter, preview=preview)
# ------------------------------------------------------------------
# Public properties
# ------------------------------------------------------------------
@property
def adapter(self) -> GraphAdapter:
return self._adapter
@property
def target_manifest(self) -> SchemaManifest | None:
return self._target_manifest
@property
def script_location(self) -> Path:
return self._script_location
def retarget(self, adapter: GraphAdapter) -> None:
"""Point this context at a different adapter (e.g. a forked graph).
Rebuilds the version node and operations wrapper so subsequent commands
run against *adapter*. Used by ``runic baseline --graph`` to introspect
a sibling graph without re-running ``env.py``.
"""
self._adapter = adapter
self._version_node = VersionNode(adapter)
self._ops = GraphOperations(adapter, preview=self._preview)
# ------------------------------------------------------------------
# Preview
# ------------------------------------------------------------------
def enable_preview(self) -> None:
self._preview = True
self._ops._preview = True # noqa: SLF001
@property
def preview_log(self) -> list[str]:
return self._ops.preview_log
# ------------------------------------------------------------------
# DB-connected runtime operations
# ------------------------------------------------------------------
def current(self) -> str | None:
return self._version_node.get_single()
def get_revision_message(self, rev_id: str) -> str | None:
try:
return self._script_dir.get_revision(rev_id).message
except Exception:
return None
def _resolve_upgrade_target(self, target: str) -> str:
"""Resolve +N relative target to an absolute revision id."""
if not target.startswith("+"):
return target
try:
n = int(target[1:])
except ValueError:
return target
if n == 0:
return self._version_node.get_single() or "base"
current_revs = self._version_node.get()
heads = self._script_dir.get_heads()
if not heads:
return "head"
from runic.migrate.exceptions import MultipleHeadsError
if len(heads) > 1:
raise MultipleHeadsError(
"Cannot use +N with multiple heads — specify an explicit revision."
)
head_rev = heads[0].revision
remaining = self._script_dir.topological_upgrade_path(
current_revs or None, head_rev
)
if not remaining:
return head_rev
return remaining[min(n, len(remaining)) - 1].revision
def _resolve_downgrade_target(self, target: str) -> str:
"""Resolve -N relative target to an absolute revision id or 'base'."""
if not target.startswith("-"):
return target
try:
n = int(target[1:])
except ValueError:
return target
if n == 0:
return self._version_node.get_single() or "base"
current = self._version_node.get_single()
if current is None:
return "base"
all_to_head = self._script_dir.iterate_revisions(None, current)
chain = list(reversed(all_to_head))
if n >= len(chain):
return "base"
return chain[n].revision
[docs]
def validate(self) -> list[str]:
"""Check that applied revisions' local files match their stored checksums.
Returns a list of error strings (empty means everything is valid).
Missing checksum entries are skipped (backward compatible with databases
migrated before checksum tracking was introduced).
Returns [] immediately when track_checksums=False.
"""
if not self._track_checksums:
log.debug("checksum tracking disabled — skipping validate()")
return []
from runic.migrate.checksum import file_checksum
current_revs = self._version_node.get()
if not current_revs:
return []
stored = self._adapter.get_checksums()
errors: list[str] = []
checked: set[str] = set()
for rev_id in current_revs:
try:
chain = self._script_dir.iterate_revisions(None, rev_id)
except Exception as exc:
errors.append(f"{rev_id}: could not trace revision chain — {exc}")
continue
for rev in chain:
if rev.revision in checked:
continue
checked.add(rev.revision)
if rev.revision not in stored:
log.debug(
"no checksum stored for %s (pre-checksum deployment)",
rev.revision,
)
continue
current_hash = file_checksum(rev.path)
if current_hash != stored[rev.revision]:
errors.append(
f"{rev.revision} ({rev.message}): "
"checksum mismatch — script was modified after being applied"
)
return errors
def upgrade(
self,
target: str = "head",
*,
validate_on_migrate: bool = False,
installed_by: str | None = None,
) -> None:
if validate_on_migrate:
errors = self.validate()
if errors:
raise ValueError(
"Checksum validation failed before upgrade:\n"
+ "\n".join(f" {e}" for e in errors)
)
if installed_by is None and self._track_installed_by:
installed_by = os.environ.get("RUNIC_INSTALLED_BY")
if installed_by is None:
import getpass
with contextlib.suppress(Exception):
installed_by = getpass.getuser()
from_revs = self._version_node.get()
resolved_target = self._resolve_upgrade_target(target)
revisions = self._script_dir.topological_upgrade_path(
from_revs or None, resolved_target
)
if not revisions:
log.info("already at target revision: %s", resolved_target)
return
for rev in revisions:
snap_name = f"{self._adapter.name}__premig_{rev.revision}"
take_snapshot = rev.snapshot and not self._preview
if take_snapshot and not self._adapter.supports_snapshots():
log.warning(
"revision %s requests a snapshot but adapter %r does not "
"support snapshots; proceeding without one",
rev.revision,
self._adapter.name,
)
take_snapshot = False
if take_snapshot:
self._ops.snapshot(snap_name)
log.info("upgrading to revision: %s — %s", rev.revision, rev.message)
try:
rev.module.upgrade(self._ops)
except Exception:
if take_snapshot:
log.warning(
"upgrade failed, restoring snapshot for revision %s",
rev.revision,
)
self._ops.restore_snapshot(snap_name)
log.error(
"upgrade failed at revision %s; database was at %s",
rev.revision,
from_revs,
)
raise
if not self._preview:
self._version_node.set(rev.revision)
if self._track_checksums:
from runic.migrate.checksum import file_checksum
self._adapter.set_checksum(
rev.revision, file_checksum(rev.path), installed_by
)
from_revs = [rev.revision]
def downgrade(self, target: str, *, force: bool = False) -> None:
current = self._version_node.get_single()
if current is None:
log.info("nothing to downgrade, no current revision")
return
resolved_target = self._resolve_downgrade_target(target)
if resolved_target == "base":
revisions = list(
reversed(self._script_dir.iterate_revisions(None, current))
)
else:
revisions = self._script_dir.iterate_revisions(current, resolved_target)
for rev in revisions:
if rev.irreversible and not force:
raise IrreversibleMigrationError(
f"revision {rev.revision!r} is marked irreversible; "
"use force=True to override"
)
for rev in revisions:
snap_name = f"{self._adapter.name}__premig_{rev.revision}"
used_snapshot = False
if (
rev.snapshot
and not self._preview
and self._adapter.snapshot_exists(snap_name)
):
log.info("restoring snapshot for revision %s", rev.revision)
self._ops.restore_snapshot(snap_name)
used_snapshot = True
if not used_snapshot:
log.info("downgrading revision: %s — %s", rev.revision, rev.message)
try:
rev.module.downgrade(self._ops)
except Exception:
log.error("downgrade failed at revision %s", rev.revision)
raise
if not self._preview:
if rev.down_revision is None:
self._version_node.clear()
else:
parent = (
rev.down_revision
if isinstance(rev.down_revision, str)
else rev.down_revision[0]
)
self._version_node.set(parent)
def stamp(self, target: str, *, purge: bool = False) -> None:
if purge:
self._version_node.clear()
if target == "base":
self._version_node.clear()
log.info("stamped: base (cleared)")
elif target == "heads":
heads = self._script_dir.get_heads()
self._version_node.set_multiple([r.revision for r in heads])
log.info("stamped: heads %s", [r.revision for r in heads])
else:
rev = self._script_dir.get_revision(target)
self._version_node.set(rev.revision)
log.info("stamped: %s", rev.revision)
# ------------------------------------------------------------------
# Offline DAG queries (no DB connection needed)
# ------------------------------------------------------------------
[docs]
def get_history(self, range_: str | None = None) -> list[RevisionInfo]:
"""Return revision history newest-first.
*range_* accepts the ``start:end`` format (either side may be omitted
to mean base / head respectively).
"""
if range_:
parts = range_.split(":")
start = parts[0].strip() or None
end = parts[1].strip() if len(parts) > 1 else None
heads_set = {r.revision for r in self._script_dir.get_heads()}
bp_set = {r.revision for r in self._script_dir.get_branch_points()}
items: list[RevisionInfo] = [
RevisionInfo(
revision=r.revision,
down_revision=r.down_revision,
message=r.message,
create_date=r.create_date,
is_head=r.revision in heads_set,
is_branch_point=r.revision in bp_set,
)
for r in self._script_dir.walk_revisions(start, end, "up")
]
return list(reversed(items))
return list(reversed(self._script_dir.revision_history()))
[docs]
def get_heads(self) -> list[Revision]:
"""Return all head revisions (not referenced as any down_revision)."""
return self._script_dir.get_heads()
[docs]
def get_branch_points(self) -> list[tuple[Revision, list[str]]]:
"""Return each branch-point revision paired with its direct child revision ids."""
return [
(bp, self._script_dir.get_children(bp.revision))
for bp in self._script_dir.get_branch_points()
]
def current_revisions(self) -> list[str]:
"""Return the revision ids currently recorded on the version node."""
return self._version_node.get()
def pending_revisions(
self, from_revs: list[str] | None = None, target: str = "head"
) -> list[Revision]:
"""Return the ordered upgrade path from *from_revs* to *target*.
Raises the usual revision-graph errors (e.g. ``MultipleHeadsError``,
``RevisionNotFound``) when the path cannot be resolved.
"""
return self._script_dir.topological_upgrade_path(from_revs or None, target)
[docs]
def create_revision(
self,
message: str,
head: str | None = None,
rev_id: str | None = None,
branch_labels: list[str] | None = None,
depends_on: list[str] | None = None,
) -> Path:
"""Create a new migration revision script and return its path."""
resolved_head = head if head is not None else self._script_dir.head()
return self._script_dir.create(
message,
resolved_head,
self._script_location,
branch_labels=branch_labels,
depends_on=depends_on,
rev_id=rev_id,
truncate_slug_length=self._truncate_slug_length,
file_template=self._file_template,
)
[docs]
def show_revision(self, rev: str) -> Revision:
"""Return full metadata for a single revision (by id or unique prefix)."""
return self._script_dir.get_revision(rev)
def baseline(
self, message: str = "baseline", *, stamp_only: bool = False
) -> Path | None:
"""Generate an initial migration from the live graph's schema.
Introspects indexes and constraints, writes a revision with
``down_revision = None`` (root of the chain), and stamps the version
node so Runic treats it as already applied on the source graph.
Raises:
GraphAlreadyManagedError: if the version node already records a
revision — use ``runic upgrade`` instead.
Args:
message: Human-readable revision message.
stamp_only: Skip writing a .py file; only stamp the version node
with a freshly generated revision id.
Returns:
Path to the generated .py file, or ``None`` for ``stamp_only``.
"""
from runic.migrate.exceptions import GraphAlreadyManagedError
from runic.migrate.introspect import (
full_downgrade_ops,
full_upgrade_ops,
)
from runic.migrate.script import render_op_body
current = self._version_node.get()
if current:
raise GraphAlreadyManagedError(
"Graph already managed by runic.migrate. Use `runic upgrade` instead."
)
rev_id = ScriptDirectory.generate_revision_id()
if not stamp_only:
snapshot = self._adapter.introspect_schema()
upgrade_ops = full_upgrade_ops(snapshot)
downgrade_ops = full_downgrade_ops(snapshot)
file_path = self._script_dir.create(
message,
None,
self._script_location,
upgrade_body=render_op_body(upgrade_ops),
downgrade_body=render_op_body(downgrade_ops),
rev_id=rev_id,
truncate_slug_length=self._truncate_slug_length,
file_template=self._file_template,
)
log.info("created baseline revision: %s at %s", rev_id, file_path)
self._version_node.set(rev_id)
log.info("stamped baseline revision: %s", rev_id)
return None if stamp_only else file_path
# ---------------------------------------------------------------------------
# Module-level singleton API (called from user's env.py)
# ---------------------------------------------------------------------------
_context: Runic | None = None
[docs]
def get() -> Runic:
if _context is None:
raise RuntimeError("runic context not configured — was env.py executed?")
return _context
[docs]
def is_preview() -> bool:
return _context._preview if _context else False # noqa: SLF001
# Re-export RevisionNotFound so env.py users can import from runic.migrate.context
__all__ = [
"IrreversibleMigrationError",
"RevisionNotFound",
"Runic",
"configure",
"get",
"is_preview",
]