OGM API Reference

runic.ogm is a lightweight graph OGM for Cypher-based graph databases. It follows a SQLAlchemy-style architecture: driver → session → mapper → repository. FalkorDB, ArcadeDB, and any Bolt-compatible database are supported via the GraphDriver abstraction.


runic.ogm.core — Models & Fields

class runic.ogm.core.models.Node[source]

Bases: object

Base class for graph nodes.

Subclass with labels and optional primary_label class keywords:

class Country(
    Location, labels=["Location", "Country"], primary_label="Location"
):
    iso_code: str = Field(unique=True)

__init__ is generated automatically from declared Field descriptors. Setting any field on an existing instance marks it dirty (_dirty = True), which the Mapper interprets as a MERGE/SET on the next flush.

class runic.ogm.core.models.Edge[source]

Bases: object

Base class for graph edge property models.

Subclass with a type class keyword:

class InvitationEdge(Edge, type="INVITED_TO"):
    role: str = Field(required=True)

Edge instances carry the same _new/_dirty lifecycle flags as Node.

runic.ogm.core.descriptors.Field(*, default=MISSING, default_factory=None, init=True, kw_only=True, index=False, index_type=None, unique=False, required=False, primary_key=False, converter=None, generated=False, interned=False)[source]

Declare a property field on a Node or Edge.

Returns a FieldDescriptor typed as Any so that name: str = Field() is accepted by type checkers without error.

Set interned=True to store the value via FalkorDB’s intern() function, which deduplicates repeated strings (e.g. country names, status codes, tags) by keeping a single shared copy in the database.

Example:

class Person(Node, labels=["Person"]):
    id: str = Field(primary_key=True)
    name: str = Field()
    age: int | None = Field(default=None)
    country: str = Field(interned=True)
    email: str = Field(index=True, unique=True)
Parameters:
  • default (Any)

  • default_factory (Callable[[], Any] | None)

  • init (bool)

  • kw_only (bool)

  • index (bool)

  • index_type (Literal['FULLTEXT', 'VECTOR'] | None)

  • unique (bool)

  • required (bool)

  • primary_key (bool)

  • converter (TypeConverter | None)

  • generated (bool)

  • interned (bool)

Return type:

Any

runic.ogm.core.descriptors.Relation(*, relationship, direction, target, edge_model=None, cascade=False, lazy=True, default=None, default_factory=None, init=True)[source]

Declare a relationship field on a Node.

Returns a FieldDescriptor typed as Any so that company: Company | None = Relation(...) is accepted by type checkers.

Example:

class Person(Node, labels=["Person"]):
    id: str = Field()
    company: Company | None = Relation(
        relationship="WORKS_FOR",
        direction="OUTGOING",
        target="Company",
    )
    friends: list["Person"] = Relation(
        relationship="KNOWS",
        direction="OUTGOING",
        target="Person",
    )
Parameters:
Return type:

Any

class runic.ogm.core.descriptors.FieldDescriptor(*, default=MISSING, default_factory=None, init=True, kw_only=True, index=False, index_type=None, unique=False, required=False, primary_key=False, relationship=None, direction=None, target=None, edge_model=None, cascade=False, lazy=True, converter=None, generated=False, interned=False)[source]

Bases: object

Descriptor backing both Field() and Relation() declarations.

Behaves as a data descriptor: values are stored per-instance in __dict__. Writing to an instance attribute via this descriptor sets instance._dirty = True, which the Mapper uses to detect changes on persistent entities.

Query expression operators

When accessed at the class level (User.name, Rated.score), the descriptor returns itself. The comparison operators defined below produce FilterExpr objects for use with where():

session.query(User).where(User.name == "Alice")
session.query(User).where(User.age > 18)
session.query(User).where(User.bio.contains("graph"))
session.query(User).where(User.deleted_at.is_null())

__hash__ is kept as object.__hash__ so descriptors remain hashable and can appear in sets/dicts used internally.

property name: str

Public read-only accessor for the attribute name set by __set_name__.

property has_default: bool

True if a default value or factory is configured.

get_default()[source]

Return the default value, calling default_factory if set.

Return type:

Any

property field_name: str

Public accessor for the descriptor’s attribute name.

property owner: type | None

Public accessor for the class that owns this descriptor.

contains(value)[source]

Return a CONTAINS filter: prop CONTAINS $value.

Parameters:

value (str)

Return type:

Any

startswith(value)[source]

Return a STARTS WITH filter: prop STARTS WITH $value.

Parameters:

value (str)

Return type:

Any

endswith(value)[source]

Return an ENDS WITH filter: prop ENDS WITH $value.

Parameters:

value (str)

Return type:

Any

matches(pattern)[source]

Return a regex filter: prop =~ $pattern.

Parameters:

pattern (str)

Return type:

Any

is_null()[source]

Return an IS NULL filter.

Return type:

Any

is_not_null()[source]

Return an IS NOT NULL filter.

Return type:

Any

in_(values)[source]

Return an IN filter: prop IN $values.

Parameters:

values (list[Any])

Return type:

Any

not_in_(values)[source]

Return a NOT IN filter: NOT prop IN $values.

Parameters:

values (list[Any])

Return type:

Any

Parameters:
  • default (Any)

  • default_factory (Callable[[], Any] | None)

  • init (bool)

  • kw_only (bool)

  • index (bool)

  • index_type (Literal['FULLTEXT', 'VECTOR'] | None)

  • unique (bool)

  • required (bool)

  • primary_key (bool)

  • relationship (str | None)

  • direction (Literal['OUTGOING', 'INCOMING', 'BOTH'] | None)

  • target (str | type | None)

  • edge_model (str | type | None)

  • cascade (bool)

  • lazy (bool)

  • converter (TypeConverter | None)

  • generated (bool)

  • interned (bool)

class runic.ogm.core.descriptors.FieldInfo(name, field, is_collection=False)[source]

Bases: object

Pairs a field name with its FieldDescriptor; used during __init__ generation.

Parameters:

runic.ogm.core — MetaData

class runic.ogm.core.metadata.MetaData[source]

Bases: object

Registry for all Node and Edge subclasses.

Populated automatically when Node/Edge subclasses are defined via __init_subclass__. Also provides forward-reference resolution (string targets on relationship Fields) after all models are imported.

register_node(cls)[source]

Register a Node subclass; called by Node.__init_subclass__.

Parameters:

cls (type)

Return type:

None

register_edge(cls)[source]

Register an Edge subclass; called by Edge.__init_subclass__.

Parameters:

cls (type)

Return type:

None

get_node_meta(cls)[source]

Return metadata for a Node class, or None if not registered.

Parameters:

cls (type)

Return type:

NodeMeta | None

get_edge_meta(cls)[source]

Return metadata for an Edge class, or None if not registered.

Parameters:

cls (type)

Return type:

EdgeMeta | None

resolve_node_by_label(label)[source]

Look up a Node by its primary label string.

Parameters:

label (str)

Return type:

NodeMeta | None

resolve_edge_by_type(edge_type)[source]

Look up an Edge by its type string.

Parameters:

edge_type (str)

Return type:

EdgeMeta | None

resolve_target(target)[source]

Resolve a string forward reference to its registered Node/Edge class.

Parameters:

target (str | type | None)

Return type:

type | None

finalize()[source]

Resolve all string forward references in relationship Fields.

Call once after all model modules have been imported. String targets on relationship Fields are replaced with the actual class objects.

Return type:

None

all_nodes()[source]

Return metadata for all registered Node subclasses.

Return type:

list[NodeMeta]

all_edges()[source]

Return metadata for all registered Edge subclasses.

Return type:

list[EdgeMeta]

snapshot()[source]

Capture the current registry state for later restore.

Return type:

_MetaSnapshot

restore(snap)[source]

Restore the registry to a prior snapshot (used in test fixtures).

Parameters:

snap (_MetaSnapshot)

Return type:

None

clear()[source]

Remove all registrations (use in tests only).

Return type:

None

class runic.ogm.core.metadata.NodeMeta(cls, labels, primary_label, fields, pk_field_name=None)[source]

Bases: object

Metadata snapshot for a registered Node subclass.

Parameters:
class runic.ogm.core.metadata.EdgeMeta(cls, edge_type, fields)[source]

Bases: object

Metadata snapshot for a registered Edge subclass.

Parameters:
runic.ogm.core.metadata.get_metadata()[source]

Return the global MetaData singleton.

Return type:

MetaData


runic.ogm.core — Type Converters

class runic.ogm.core.types.TypeConverter[source]

Bases: ABC

Interface for encoding/decoding custom Python types to/from graph values.

Optionally, set cypher_fn to name a FalkorDB Cypher function that wraps the parameter reference when writing to the graph (e.g. "vecf32"vecf32($field)).

abstractmethod to_graph(value)[source]

Convert a Python value to a graph-compatible representation.

Parameters:

value (Any)

Return type:

Any

abstractmethod from_graph(value)[source]

Convert a graph value back to the Python type.

Parameters:

value (Any)

Return type:

Any

class runic.ogm.core.types.DatetimeConverter[source]

Bases: TypeConverter

Converts between Python datetime objects and ISO-8601 strings.

FalkorDB stores datetimes as strings; this converter handles the round-trip.

to_graph(value)[source]

Convert a Python value to a graph-compatible representation.

Parameters:

value (Any)

Return type:

Any

from_graph(value)[source]

Convert a graph value back to the Python type.

Parameters:

value (Any)

Return type:

Any

class runic.ogm.core.types.EnumConverter(enum_class)[source]

Bases: TypeConverter

Converts between Python Enum members and their string values.

Stores the enum’s .value in the graph and reconstructs on load.

Parameters:

enum_class (type[Enum])

to_graph(value)[source]

Convert a Python value to a graph-compatible representation.

Parameters:

value (Any)

Return type:

Any

from_graph(value)[source]

Convert a graph value back to the Python type.

Parameters:

value (Any)

Return type:

Any

class runic.ogm.core.types.Vector(iterable=(), /)[source]

Bases: list

A typed list of floats representing a graph embedding vector.

Use as an annotation on a Node field to store and query embeddings:

class Article(Node, labels=["Article"]):
    id: str = Field(primary_key=True)
    embedding: Vector = Field(index=True, index_type="VECTOR")

FalkorDB stores vectors via vecf32(), preserving 32-bit float precision.

class runic.ogm.core.types.VectorConverter[source]

Bases: TypeConverter

Converts between Python Vector (list of floats) and FalkorDB’s vecf32 format.

Emits vecf32($field) in Cypher via cypher_fn = "vecf32".

to_graph(value)[source]

Convert a Python value to a graph-compatible representation.

Parameters:

value (Any)

Return type:

Any

from_graph(value)[source]

Convert a graph value back to the Python type.

Parameters:

value (Any)

Return type:

Any

class runic.ogm.core.types.GeoLocation(latitude, longitude)[source]

Bases: object

A geographic point with latitude and longitude.

Maps to FalkorDB’s native point() type:

class Store(Node, labels=["Store"]):
    id: str = Field(primary_key=True)
    location: GeoLocation = Field()

FalkorDB round-trips this as point({latitude: ..., longitude: ...}).

Parameters:
class runic.ogm.core.types.GeoLocationConverter[source]

Bases: TypeConverter

Converts between GeoLocation and FalkorDB’s point() dict format.

Emits point($field) in Cypher via cypher_fn = "point". FalkorDB returns points as {"latitude": ..., "longitude": ...} dicts.

to_graph(value)[source]

Convert a Python value to a graph-compatible representation.

Parameters:

value (Any)

Return type:

Any

from_graph(value)[source]

Convert a graph value back to the Python type.

Parameters:

value (Any)

Return type:

Any


runic.ogm.driver — Drivers & Dialects

class runic.ogm.driver.falkordb.FalkorDBDriver(graph, db=None)[source]

Bases: object

Sync driver wrapping a FalkorDB graph handle.

Parameters:
  • graph (Any)

  • db (Any)

falkordb_connection()[source]

Return (db, graph) for use by the FalkorDB migration adapter.

Return type:

tuple[Any, Any]

class runic.ogm.driver.falkordb.AsyncFalkorDBDriver(graph)[source]

Bases: object

Async driver wrapping an async FalkorDB graph handle.

Parameters:

graph (Any)

class runic.ogm.driver.falkordb.FalkorDBDialect[source]

Bases: object

Strategy implementation for FalkorDB-specific Cypher generation.

class runic.ogm.driver.bolt.BoltDriver(uri, auth, database, dialect, *, encrypted=True)[source]

Bases: object

Sync Bolt driver for ArcadeDB, Neo4j, or any Bolt-compatible graph DB.

Supports explicit ACID transactions via TransactionalGraphDriver. When no transaction is active, each execute() call opens its own Bolt session (auto-commit semantics). Call begin() to start a transaction that spans multiple execute() calls, then commit() or rollback() to end it.

The OGM Session drives this lifecycle automatically via lazy-begin: the first query inside a Session opens a transaction; Session.commit() / Session.rollback() close it.

Parameters:
begin()[source]

Open a Bolt session and begin an explicit transaction.

Raises RuntimeError if a transaction is already active.

Return type:

None

commit()[source]

Commit the active transaction and release the Bolt session.

No-op when no transaction is active.

Return type:

None

rollback()[source]

Roll back the active transaction and release the Bolt session.

No-op when no transaction is active.

Return type:

None

class runic.ogm.driver.arcadedb.ArcadeDBDialect[source]

Bases: object

Strategy for ArcadeDB-specific Cypher generation.

Key differences from FalkorDB: - No toInteger() cast needed for id()-based lookups - No vecf32() or intern() wrappers (raw values stored as-is) - Vector KNN via CALL vector.neighbors(...) - Fulltext search not yet supported (raises NotImplementedError) - SET n.prop = point() is not supported via Bolt; GeoLocation is stored as a {"latitude": x, "longitude": y} map instead.

class runic.ogm.driver.age.AGEDriver(conn, graph_name)[source]

Bases: object

Sync driver for Apache AGE (PostgreSQL graph extension).

Cypher queries are wrapped in the AGE cypher() SQL function and executed via a psycopg (psycopg3) connection. Parameters are serialised as an agtype JSON map and passed as the third argument to cypher(), making them accessible inside Cypher as $param_name.

Supports explicit ACID transactions via TransactionalGraphDriver. psycopg3 starts an implicit BEGIN on the first statement after each commit/rollback (autocommit=False default); this driver’s commit() / rollback() map directly to conn.commit() / conn.rollback(). begin() is a documented no-op because psycopg3 manages the implicit transaction start automatically.

The OGM Session drives this lifecycle: the first query in a Session opens a transaction implicitly; commit() / rollback() finalise it.

AGE stores each vertex label as a separate PostgreSQL table; a vertex belongs to exactly one label fixed at creation time. Multi-label operations (SET n:New REMOVE n:Old) are therefore not supported.

Example

driver = create_age_driver(
    host="localhost",
    port=5432,
    database="postgres",
    graph="my_graph",
    username="postgres",
    password="secret",
)
with Session(driver) as session:
    ...
Parameters:
  • conn (Any)

  • graph_name (str)

begin()[source]

No-op: psycopg3 starts an implicit BEGIN on the first statement.

Exists to satisfy the TransactionalGraphDriver protocol so the OGM Session can detect transaction support via isinstance checks.

Return type:

None

commit()[source]

Commit the active PostgreSQL transaction.

Return type:

None

rollback()[source]

Roll back the active PostgreSQL transaction.

Return type:

None

class runic.ogm.driver.age.AGEDialect[source]

Bases: object

Strategy for Apache AGE-specific Cypher generation.

Key differences from FalkorDB: - No toInteger() cast for id()-based lookups - No vecf32() or intern() wrappers (raw Python values stored as-is) - Fulltext search: not supported natively (raises NotImplementedError) - Vector KNN: not supported natively (raises NotImplementedError) - Multi-label emulation: extra labels stored as _labels property array

labels_clause(labels)[source]

AGE only supports one label per vertex — use the primary label.

Parameters:

labels (list[str])

Return type:

str

subtype_where(alias, labels)[source]

Return a WHERE condition filtering by emulated subtype labels.

Parameters:
Return type:

str | None

needs_labels_property()[source]

Signal to the mapper to inject _labels on CREATE for subtypes.

Return type:

bool

runic.ogm.driver.falkordb.create_falkordb_driver(host, port, graph)[source]

Create a FalkorDBDriver from connection parameters.

Parameters:
  • host (str) – FalkorDB host name or IP address.

  • port (int) – FalkorDB port (default Redis port is 6379).

  • graph (str) – Name of the graph to select.

Return type:

FalkorDBDriver

runic.ogm.driver.arcadedb.create_arcadedb_driver(host, port, database, username, password)[source]

Create a BoltDriver configured for ArcadeDB.

Parameters:
Return type:

BoltDriver

runic.ogm.driver.age.create_age_driver(host, port, database, graph, username, password)[source]

Create an AGEDriver connected to a PostgreSQL+AGE instance.

Parameters:
  • host (str) – PostgreSQL host name or IP address.

  • port (int) – PostgreSQL port (default is 5432).

  • database (str) – PostgreSQL database name.

  • graph (str) – AGE graph name within the database.

  • username (str) – PostgreSQL user name.

  • password (str) – PostgreSQL password.

Return type:

AGEDriver

runic.ogm.driver.factory.create_driver(backend, **kwargs)[source]

Return a GraphDriver for the given backend.

Parameters:
  • backend (str) – "falkordb", "arcadedb", "neo4j", "memgraph", or "age".

  • **kwargs (Any) – Backend-specific keyword arguments forwarded to the driver constructor.

Raises:

ValueError – When backend is unknown.

Return type:

GraphDriver

Examples

FalkorDB:

driver = create_driver(
    "falkordb", host="localhost", port=6379, graph="my_graph"
)

ArcadeDB:

driver = create_driver(
    "arcadedb",
    host="localhost",
    port=7687,
    database="MyDB",
    username="root",
    password="secret",
)

Neo4j:

driver = create_driver(
    "neo4j",
    host="localhost",
    port=7687,
    database="neo4j",
    username="neo4j",
    password="secret",
    encrypted=True,
)

Memgraph:

driver = create_driver(
    "memgraph",
    host="localhost",
    port=7687,
    database="memgraph",
    username="",
    password="",
)

Apache AGE:

driver = create_driver(
    "age",
    host="localhost",
    port=5432,
    database="postgres",
    graph="my_graph",
    username="postgres",
    password="secret",
)

runic.ogm.session — Session

class runic.ogm.session.session.Session(driver, mapper=None, *, log_cypher=False)[source]

Bases: _SessionBase

Sync unit-of-work manager.

Owns all mutations (add, delete), single-entity lookup (get), identity map, and flush/commit lifecycle. Repositories hold a session reference and delegate writes and PK lookups to it. The backend-agnostic bookkeeping lives in _SessionBase.

Transaction model — determined by the injected driver:

  • FalkorDB (no native multi-query transactions): each GRAPH.QUERY is individually atomic. commit() flushes pending writes; rollback() discards un-flushed state only — it cannot undo writes already sent to the graph.

  • Bolt-based drivers (Neo4j, Memgraph, ArcadeDB): full ACID transactions via the Bolt protocol. The first query lazily opens a Bolt transaction; commit() / rollback() commit or discard all changes as a single atomic unit.

  • Apache AGE (psycopg3): full PostgreSQL ACID transactions. psycopg3 starts an implicit BEGIN on the first SQL statement; commit() / rollback() map to conn.commit() / conn.rollback().

Drivers that support explicit transactions implement the TransactionalGraphDriver protocol. The Session detects this via isinstance and wires commit/rollback accordingly.

Parameters:
  • driver (GraphDriver)

  • mapper (Mapper | None)

  • log_cypher (bool)

get(cls, pk, fetch=None)[source]

Return entity from identity map or query graph; None if not found.

Pass fetch=["rel_name", ...] to eager-load relationship fields in the same Cypher query using OPTIONAL MATCH.

Parameters:
Return type:

Any | None

load_relationship(entity, field_name)[source]

Load a lazy relationship field and cache the result on the entity.

Called by Field._trigger_lazy_load when a _NOT_LOADED sentinel is accessed on an entity that is attached to this session. Writes directly to entity.__dict__ to bypass the dirty-tracking descriptor.

Parameters:
  • entity (Any)

  • field_name (str)

Return type:

Any

flush()[source]

Execute all pending/dirty/deleted entities against the graph.

Does not clear the identity map. Each entity write is a separate graph.query() call. Entities with generated=True IDs are handled individually so the returned ID can be assigned before continuing.

Return type:

None

commit()[source]

flush() then clear the pending/deleted tracking sets.

For transactional drivers (Bolt, AGE), also commits the active database transaction so all flushed writes become durable and visible.

Return type:

None

rollback()[source]

Discard un-flushed pending/deleted sets; expire all persistent entities.

For transactional drivers (Bolt, AGE), also rolls back the active database transaction — writes already flushed but not yet committed are discarded atomically. For FalkorDB (no native transactions), only un-flushed in-memory state is cleared; writes already sent to the graph cannot be undone.

Return type:

None

refresh(entity)[source]

Immediately re-query the entity from the graph and update in-place.

Parameters:

entity (Any)

Return type:

None

relate(source, field_name, target, edge=None)[source]

Create or update a relationship between source and target.

Uses MERGE semantics: if the relationship already exists its edge properties are updated; if not, it is created. Pass an Edge model instance as edge to write properties on the relationship itself.

field_name may be a plain string or the class-level descriptor attribute (e.g. User.invited_trips) for type-safe call sites.

The cached value of the relation field on source is invalidated after the write so the next access re-fetches fresh data from the graph.

Parameters:
Return type:

None

unrelate(source, field_name, target)[source]

Delete the relationship between source and target.

field_name may be a plain string or the class-level descriptor attribute (e.g. User.invited_trips) for type-safe call sites.

The cached value of the relation field on source is invalidated after the write so the next access re-fetches fresh data from the graph.

Parameters:
Return type:

None

execute(cypher, params=None, write=False)[source]

Execute raw Cypher; returns QueryResult; no entity mapping.

Parameters:
Return type:

Any

scalars(stmt)[source]

Execute a select() statement; return decoded entities.

Type-safe: session.scalars(select(User).where(...)) infers list[User].

Parameters:

stmt (QueryBuilder[_T]) – An unbound QueryBuilder created via select().

Return type:

list[_T]

scalar(stmt)[source]

Execute a select() statement; return first entity or None.

Adds LIMIT 1 internally without permanently modifying the statement. Type-safe: session.scalar(select(User).where(...)) infers User | None.

Parameters:

stmt (QueryBuilder[_T]) – An unbound QueryBuilder created via select().

Return type:

_T | None

all_rows(stmt)[source]

Execute a select() statement; return column-keyed dicts.

Parameters:

stmt (QueryBuilder[Any]) – An unbound QueryBuilder.

Return type:

list[dict[str, Any]]

all_with_edges(stmt)[source]

Execute a select() statement; return (NodeA, Edge, NodeB) tuples.

Parameters:

stmt (QueryBuilder[Any]) – An unbound QueryBuilder with return_nodes() and return_edge() configured.

Return type:

list[tuple[Any, …]]

count(stmt)[source]

Execute a select() statement; return the row count.

Parameters:

stmt (QueryBuilder[Any]) – An unbound QueryBuilder.

Return type:

int

query(cls)[source]

Return a QueryBuilder for cls.

This is the primary entry point for the fluent query builder API:

users = (
    session.query(User)
    .where(User.active == True)
    .order_by(User.name)
    .limit(20)
    .all()
)
Parameters:

cls (type[Any]) – A registered Node subclass.

Return type:

QueryBuilder[cls]

Return a FulltextQueryBuilder for cls.

Uses FalkorDB’s CALL db.idx.fulltext.queryNodes() procedure. The node label must have a fulltext index created.

Parameters:
  • cls (type[Any]) – A registered Node subclass with at least one field with index_type="FULLTEXT".

  • query (str) – The fulltext search string.

  • fields (list[str] | None) – Optional list of field names to search (informational; the procedure uses the index it finds for the label).

Return type:

Any

Example

posts = (
    session.fulltext_search(Post, query="graph databases")
    .where(Post.published == True)
    .limit(10)
    .all()
)

Return a VectorQueryBuilder for cls.

Performs a K-Nearest-Neighbour search using FalkorDB’s HNSW index.

Parameters:
  • cls (type[Any]) – A registered Node subclass.

  • field (Any) – The FieldDescriptor of the Vector field to search (e.g. Document.embedding).

  • vector (list[float]) – The query embedding as a list of floats.

  • k (int) – Number of nearest neighbours to return (default 10).

Return type:

Any

Example

similar = (
    session.vector_search(
        Document, field=Document.embedding, vector=my_vec, k=5
    )
    .where(Document.active == True)
    .all()
)
close()[source]

Expunge all tracked entities; roll back any orphaned transaction.

If close() is called without a prior commit() or rollback() (e.g. the session was not used as a context manager), any active driver-level transaction is rolled back to release the connection cleanly.

Return type:

None

class runic.ogm.session.async_session.AsyncSession(driver, mapper=None, *, log_cypher=False)[source]

Bases: _SessionBase

Async unit-of-work manager; mirrors Session with async methods.

Shares the backend-agnostic bookkeeping (identity map, pending/deleted tracking, expunge, relation resolution) with the sync session via _SessionBase.

Use as an async context manager:

async with AsyncSession(graph) as session:
    alice = await session.get(Person, "alice-id")
    alice.email = "new@example.com"
    await session.commit()

Lazy relationship loading is not supported in async context because Field.__get__ cannot await. Use fetch=[...] on get() instead.

Parameters:
  • driver (AsyncGraphDriver)

  • mapper (Mapper | None)

  • log_cypher (bool)

async get(cls, pk, fetch=None)[source]

Return entity from identity map or query graph asynchronously.

Pass fetch=["rel_name", ...] to eager-load relationships in the same Cypher query using OPTIONAL MATCH.

Parameters:
Return type:

Any | None

load_relationship(entity, field_name)[source]

Raise LazyLoadError; lazy loading is not supported in async sessions.

Access entity.rel_field from within an async context manager triggers this via Field._trigger_lazy_load. Use fetch=[field_name] on get() for eager loading instead.

Parameters:
  • entity (Any)

  • field_name (str)

Return type:

Any

async flush()[source]

Execute all pending/dirty/deleted entities against the graph.

Return type:

None

async commit()[source]

flush() then clear the pending/deleted tracking sets.

Return type:

None

async rollback()[source]

Discard un-flushed pending/deleted sets; expire all persistent entities.

Return type:

None

async refresh(entity)[source]

Immediately re-query the entity from the graph.

Parameters:

entity (Any)

Return type:

None

async relate(source, field_name, target, edge=None)[source]

Create or update a relationship between source and target.

Uses MERGE semantics: if the relationship already exists its edge properties are updated; if not, it is created. Pass an Edge model instance as edge to write properties on the relationship itself.

field_name may be a plain string or the class-level descriptor attribute (e.g. User.invited_trips) for type-safe call sites.

The cached value of the relation field on source is invalidated after the write so the next access re-fetches fresh data from the graph.

Parameters:
Return type:

None

async unrelate(source, field_name, target)[source]

Delete the relationship between source and target.

field_name may be a plain string or the class-level descriptor attribute (e.g. User.invited_trips) for type-safe call sites.

The cached value of the relation field on source is invalidated after the write so the next access re-fetches fresh data from the graph.

Parameters:
Return type:

None

async execute(cypher, params=None, write=False)[source]

Execute raw Cypher; returns QueryResult; no entity mapping.

Parameters:
Return type:

Any

async scalars(stmt)[source]

Execute a select() statement; return decoded entities.

Type-safe: await session.scalars(select(User).where(...)) infers list[User].

Parameters:

stmt (QueryBuilder[_T]) – An unbound QueryBuilder created via select().

Return type:

list[_T]

async scalar(stmt)[source]

Execute a select() statement; return first entity or None.

Adds LIMIT 1 internally without permanently modifying the statement.

Parameters:

stmt (QueryBuilder[_T]) – An unbound QueryBuilder.

Return type:

_T | None

async all_rows(stmt)[source]

Execute a select() statement; return column-keyed dicts.

Parameters:

stmt (QueryBuilder[Any]) – An unbound QueryBuilder.

Return type:

list[dict[str, Any]]

async all_with_edges(stmt)[source]

Execute a select() statement; return (NodeA, Edge, NodeB) tuples.

Parameters:

stmt (QueryBuilder[Any]) – An unbound QueryBuilder with return_nodes() and return_edge() configured.

Return type:

list[tuple[Any, …]]

async count(stmt)[source]

Execute a select() statement; return the row count.

Parameters:

stmt (QueryBuilder[Any]) – An unbound QueryBuilder.

Return type:

int

query(cls)[source]

Return an AsyncQueryBuilder for cls.

Async entry point for the fluent query builder. Use await on the terminal methods (all(), one(), count(), etc.):

async with AsyncSession(graph) as session:
    users = await (
        session.query(User).where(User.active == True).limit(20).all()
    )
Parameters:

cls (type[Any])

Return type:

Any

Async fulltext search; mirrors fulltext_search().

Parameters:
Return type:

Any

Async vector KNN search; mirrors vector_search().

Parameters:
Return type:

Any

async close()[source]

expunge_all() and release the graph connection.

Return type:

None

class runic.ogm.session.connection_pool.ConnectionManager(db, graph_name)[source]

Bases: object

Manages a FalkorDB connection for use with Session.

Holds the db client and graph name; acquire() returns a FalkorDBDriver.

Parameters:
  • db (Any)

  • graph_name (str)

acquire()[source]

Return a FalkorDBDriver for the configured graph.

Return type:

FalkorDBDriver

release(driver)[source]

Release a driver back to the pool (no-op in current impl).

Parameters:

driver (FalkorDBDriver)

Return type:

None

property graph_name: str

The configured graph name.

class runic.ogm.session.connection_pool.AsyncConnectionManager(db, graph_name)[source]

Bases: object

Async variant of ConnectionManager for AsyncFalkorDB clients.

Parameters:
  • db (Any)

  • graph_name (str)

acquire()[source]

Return an AsyncFalkorDBDriver for the configured graph.

Return type:

AsyncFalkorDBDriver

async release(driver)[source]

Release an async driver (no-op in current impl).

Parameters:

driver (AsyncFalkorDBDriver)

Return type:

None

property graph_name: str

The configured graph name.


runic.ogm.repository — Repository

class runic.ogm.repository.repository.Repository(session, entity_class)[source]

Bases: _RepositoryBase, RepositoryProtocol, Generic

Typed reads and explicit Cypher helpers for one entity type.

Mutations (add, delete) and single-PK lookup (get) belong to the Session. All reads here register returned entities in the session identity map. Shared construction and row decoding live in _RepositoryBase.

Example:

with Session(graph) as session:
    repo = Repository(session, Person)
    all_people = repo.find_all()
    page = repo.find_all(skip=0, limit=25)
Parameters:
find_all(fetch=None, skip=0, limit=None)[source]

Return all entities of this type, with optional eager relationship loading.

Use skip and limit for offset-based pagination (aligns with select(...).skip(n).limit(n) syntax). Combining fetch with skip/limit is not supported — use the QueryBuilder for that.

Parameters:
Return type:

list[T]

find_all_by_ids(pks, fetch=None)[source]

Return entities whose primary keys are in pks.

Parameters:
Return type:

list[T]

count()[source]

Return the total number of entities of this type.

Return type:

int

exists(pk)[source]

Return True if an entity with pk exists in the graph.

Parameters:

pk (Any)

Return type:

bool

query()[source]

Return a QueryBuilder for this repository’s entity type.

Shorthand for select(self._cls) bound to the current session. Prefer the select() + session execution pattern for new code:

repo = Repository(session, User)

# Preferred (select + session execution):
users = session.scalars(select(User).where(User.active == True))

# Also available via repo (bound builder):
users = repo.query().where(User.active == True).all()
Return type:

QueryBuilder[T]

cypher(query, params=None, *, returns=None, write=False)[source]

Execute query and return a typed list.

returns controls decoding: int, str, bool → scalar; dict → column-keyed dicts; any Node subclass → decoded entities registered in the session identity map; None → empty list.

Parameters:
Return type:

list[Any]

cypher_one(query, params=None, *, returns=None, write=False)[source]

Execute query and return the first mapped value, or None.

Parameters:
Return type:

Any | None

cypher_raw(query, params=None, *, write=False)[source]

Execute query and return the raw QueryResult without entity mapping.

Parameters:
Return type:

Any

class runic.ogm.repository.async_repository.AsyncRepository(session, entity_class)[source]

Bases: _RepositoryBase, AsyncRepositoryProtocol, Generic

Async typed reads and explicit Cypher helpers for one entity type.

Mirrors Repository with async methods. Shared construction and row decoding live in _RepositoryBase.

Example:

async with AsyncSession(graph) as session:
    repo = AsyncRepository(session, Trip)
    trips = await repo.find_all()
Parameters:
async find_all(fetch=None, skip=0, limit=None)[source]

Return all entities of this type, with optional eager relationship loading.

Use skip and limit for offset-based pagination (aligns with select(...).skip(n).limit(n) syntax). Combining fetch with skip/limit is not supported — use the QueryBuilder for that.

Parameters:
Return type:

list[T]

async find_all_by_ids(pks, fetch=None)[source]

Return entities whose primary keys are in pks.

Parameters:
Return type:

list[T]

async count()[source]

Return the total number of entities of this type.

Return type:

int

async exists(pk)[source]

Return True if an entity with pk exists in the graph.

Parameters:

pk (Any)

Return type:

bool

query()[source]

Return an AsyncQueryBuilder for this repository’s entity type.

Async counterpart of query(). Use await on the terminal methods:

repo = AsyncRepository(session, User)
users = await repo.query().where(User.active == True).all()
Return type:

Any

async cypher(query, params=None, *, returns=None, write=False)[source]

Execute query and return a typed list.

Parameters:
Return type:

list[Any]

async cypher_one(query, params=None, *, returns=None, write=False)[source]

Execute query and return the first mapped value, or None.

Parameters:
Return type:

Any | None

async cypher_raw(query, params=None, *, write=False)[source]

Execute query and return the raw QueryResult.

Parameters:
Return type:

Any


runic.ogm.schema — Index declarations

class runic.ogm.schema.index_manager.IndexSpec(label, property, index_type)[source]

Bases: object

Normalized description of a single declared or existing index/constraint.

index_type is one of "RANGE", "FULLTEXT", "VECTOR", or "UNIQUE".

Parameters:
  • label (str)

  • property (str)

  • index_type (str)

runic.ogm.schema.index_manager.extract_declared_specs(entity_class)[source]

Return IndexSpecs declared via Field descriptors on entity_class.

Rules: - unique=True → UNIQUE constraint (backing RANGE is auto-created by FalkorDB). - index=True (without unique) → RANGE index. - index_type="FULLTEXT" → FULLTEXT index. - index_type="VECTOR" → VECTOR index. - Relationship fields are skipped. - A field with both unique=True and index=True emits only UNIQUE.

Parameters:

entity_class (type)

Return type:

set[IndexSpec]

runic.migrate.schema — Index & Schema Management

class runic.migrate.schema.IndexManager(adapter_or_graph)[source]

Bases: object

Creates and manages graph indexes and constraints from entity Field declarations.

Accepts any object satisfying the IndexAdapter protocol — a migrate adapter (Neo4j, Memgraph, FalkorDB, ArcadeDB, AGE) or a raw FalkorDB graph handle (auto-wrapped in FalkorDBIndexAdapter for backward compat).

Fulltext batching — Neo4j and Memgraph use a single named fulltext index per label covering all search fields. create_indexes() collapses all FULLTEXT specs for the same label into one create_fulltext_index(label, prop1, prop2, ...) call.

Example:

from runic.migrate import IndexManager, create_adapter

adapter = create_adapter(
    "neo4j", host="localhost", database="neo4j", password="secret"
)
manager = IndexManager(adapter)
manager.create_indexes(Person)
manager.ensure_indexes(Trip)
Parameters:

adapter_or_graph (Any)

create_indexes(entity_class, *, if_not_exists=True)[source]

Create all indexes and constraints declared on entity_class.

FULLTEXT specs sharing a label are batched into a single create_fulltext_index(label, *props) call.

When if_not_exists is True (default), existing non-FULLTEXT specs are skipped. FULLTEXT creation is always attempted — adapters must handle idempotency.

Parameters:
  • entity_class (type)

  • if_not_exists (bool)

Return type:

None

ensure_indexes(entity_class)[source]

Create missing indexes for entity_class; skip those that already exist.

Parameters:

entity_class (type)

Return type:

None

create_spec(spec)[source]

Issue the appropriate adapter call to create a single IndexSpec.

Parameters:

spec (IndexSpec)

Return type:

None

drop_spec(spec)[source]

Issue the appropriate adapter call to drop a single IndexSpec.

Parameters:

spec (IndexSpec)

Return type:

None

class runic.migrate.schema.ValidationResult(is_valid, missing_indexes=<factory>, extra_indexes=<factory>, errors=<factory>)[source]

Bases: object

Result of a SchemaManager.validate_schema() call.

Parameters:
is_valid

True when declared and existing indexes match exactly.

Type:

bool

missing_indexes

Declared but not yet created in the live graph.

Type:

list[runic.ogm.schema.index_manager.IndexSpec]

extra_indexes

Present in the graph but not declared on any entity.

Type:

list[runic.ogm.schema.index_manager.IndexSpec]

errors

Non-fatal messages collected during validation.

Type:

list[str]

class runic.migrate.schema.SchemaManager(adapter_or_graph)[source]

Bases: object

Validates and synchronizes graph indexes against entity Field declarations.

Accepts any object satisfying the IndexAdapter protocol (a migrate adapter or a raw FalkorDB graph handle for backward compat).

Example:

from runic.migrate import SchemaManager, create_adapter

adapter = create_adapter(
    "neo4j", host="localhost", database="neo4j", password="secret"
)
schema = SchemaManager(adapter)
result = schema.validate_schema([Person, KnowsEdge])
schema.sync_schema([Person, KnowsEdge])
Parameters:

adapter_or_graph (Any)

ensure_entity_types(entity_classes)[source]

Create vertex/edge types for entity_classes on adapters that require them.

No-op for schemaless backends. Issues CREATE VERTEX TYPE / CREATE EDGE TYPE DDL for ArcadeDB.

Parameters:

entity_classes (list[type])

Return type:

None

validate_schema(entity_classes)[source]

Compare declared indexes against the live graph state.

Returns a ValidationResult describing missing and extra indexes. is_valid is True only when both sets are empty and no errors occurred.

Parameters:

entity_classes (list[type])

Return type:

ValidationResult

sync_schema(entity_classes, *, drop_extra=False)[source]

Create entity types and missing indexes; drop extras when drop_extra is True.

Calls ensure_entity_types first (required for ArcadeDB empty collections), then delegates to IndexManager.create_indexes() per class.

Parameters:
Return type:

None

get_schema_diff(entity_classes)[source]

Return a human-readable diff of declared vs existing indexes.

Lines are prefixed with MISSING or EXTRA; returns a single “in sync” message when no differences exist.

Parameters:

entity_classes (list[type])

Return type:

str

get_schema_info(entity_classes)[source]

Return a SchemaInfo snapshot of the current schema state.

Parameters:

entity_classes (list[type])

Return type:

SchemaInfo


runic.ogm.exceptions

exception runic.ogm.exceptions.OrmError[source]

Bases: Exception

Base exception for runic OGM errors.

exception runic.ogm.exceptions.EntityNotFoundError[source]

Bases: OrmError

Raised when an entity cannot be found by its primary key.

exception runic.ogm.exceptions.DetachedEntityError[source]

Bases: OrmError

Raised when an operation is attempted on a detached entity.

exception runic.ogm.exceptions.LazyLoadError[source]

Bases: OrmError

Raised when lazy relationship loading cannot be performed.

Occurs when accessing a lazy field on an entity in an async session (where __get__ cannot await) or when the session is unavailable. Use fetch=[field_name] on session.get() for eager loading instead.

exception runic.ogm.exceptions.FieldValidationError[source]

Bases: OrmError

Raised when a field value fails validation.

exception runic.ogm.exceptions.MetadataError[source]

Bases: OrmError

Raised for metadata registry errors (e.g. duplicate labels).

See also

Migration API Reference — Migration API reference (runic.migrate)


runic.ogm.query

runic.ogm.query.select(cls)[source]

Create a session-independent query statement for cls.

Mirrors the SQLAlchemy 2.0 select() pattern — compose the statement freely (including conditional filters), then execute via the session:

from runic.ogm import select

stmt = select(User).where(User.active == True)
if min_age > 0:
    stmt = stmt.where(User.age >= min_age)

users: list[User] = session.scalars(stmt)
user: User | None = session.scalar(stmt)
n: int = session.count(stmt)

The returned QueryBuilder is unbound — calling terminal methods like .all() directly will raise RuntimeError. Use the session execution methods instead.

Parameters:

cls (type[_T]) – A registered Node subclass.

Return type:

QueryBuilder[_T]

class runic.ogm.query.builder.QueryBuilder(session, root_cls)[source]

Bases: _CypherCompiler[T]

Fluent Cypher query builder for a single root Node class.

Construct via Session.query():

q = session.query(User)

All non-terminal methods return self so calls can be chained:

users = session.query(User).where(User.active == True).limit(10).all()
Parameters:
  • session (Any | None) – The Session (or AsyncSession) this builder is bound to.

  • root_cls (type[T]) – The root Node subclass to query.

alias(name)[source]

Set the Cypher variable for the root (most recent) node.

Call immediately after Session.query() to name the root variable, or after TraversalStep.alias() has already been called to rename the last registered target.

Example:

session.query(User).alias("u").where(User.active == True, on="u")
Parameters:

name (str)

Return type:

QueryBuilder[T]

where(expr, *, on=None)[source]

Add a WHERE predicate.

Parameters:
  • expr (Expr) –

    A FilterExpr, CompoundExpr, or NegatedExpr. Created via field descriptor operators:

    User.name == "Alice"
    (User.age > 18) & (User.active == True)
    

  • on (str | None) –

    Override the Cypher variable for this predicate. Useful when the same Node class appears under multiple aliases, or when filtering on edge properties:

    .where(Rated.score > 4.0, on="r")
    

Return type:

QueryBuilder[T]

Notes

Multiple .where() calls are combined with AND. To express OR, use the | operator on the expressions before passing:

.where((User.role == "admin") | (User.role == "mod"))
traverse(relation_field, *, edge_alias=None, optional=True)[source]

Traverse a declared Relation() field.

Returns a TraversalStep; call .alias("f") on it to name the target node and return to the builder.

Parameters:
  • relation_field (FieldDescriptor) –

    The Relation-backed field descriptor accessed at class level:

    User.friends  # list[User] = Relation(...)
    User.rated  # list[Movie] = Relation(edge_model=Rated)
    

  • edge_alias (str | None) –

    When given, a named relationship variable is emitted in the pattern:

    (u)-[r:RATED]->(m)
    

    This enables filtering on edge properties via .where(Rated.score > 4, on="r") and retrieving edge instances via .all_with_edges().

  • optional (bool) – True (default) → OPTIONAL MATCH (left-join; keeps source nodes that have no such relationship). FalseMATCH (inner join; drops source nodes without a matching relationship).

Returns:

Call .alias("name") on the return value to complete the step.

Return type:

TraversalStep

Examples

# Basic traversal
q = session.query(User).alias("u")
q = q.traverse(User.friends).alias("f")

# Traversal with edge properties
q = session.query(User).alias("u")
q = q.traverse(User.rated, edge_alias="r").alias("m")
q = q.where(Rated.score >= 4.0, on="r")
repeat(relation_field, *, min_hops=1, max_hops=None, optional=False)[source]

Traverse a relation with variable-length path quantifier *min..max.

Generates a Cypher pattern like:

(p)-[:PARENT*1..5]->(ancestor:Person)
Parameters:
  • relation_field (FieldDescriptor) – The Relation field to traverse repeatedly.

  • min_hops (int) – Minimum number of hops (default 1).

  • max_hops (int | None) – Maximum number of hops. None means unbounded (*min..).

  • optional (bool) – False (default for repeat) — required traversal. TrueOPTIONAL MATCH.

Returns:

Call .alias("name") to complete the step.

Return type:

TraversalStep

Examples

# All ancestors up to depth 5
ancestors = (
    session.query(Person)
    .alias("p")
    .where(Person.id == start_id)
    .repeat(Person.parent, min_hops=1, max_hops=5)
    .alias("anc")
    .all()
)

# All reachable nodes (unbounded)
reachable = (
    session.query(Node)
    .alias("s")
    .repeat(Node.connected_to)
    .alias("t")
    .all()
)
with_(*aliases)[source]

Insert a WITH clause to pipeline results between query stages.

Use when you want to filter/aggregate in one stage before continuing a traversal in the next:

(
    session.query(User)
    .alias("u")
    .where(User.active == True)
    .with_("u")  # WITH u
    .traverse(User.posts)
    .alias("p")
    .return_target("p")
    .all()
)
Parameters:

*aliases (str) – Cypher variable names to carry forward (e.g. "u", "f").

Return type:

QueryBuilder[T]

order_by(field, *, desc=False)[source]

Add an ORDER BY term.

Parameters:
  • field (FieldDescriptor | str) – A field descriptor (User.name) or a raw Cypher expression string ("n.created_at DESC").

  • desc (bool) – True for descending order (default False).

Return type:

QueryBuilder[T]

Examples

q.order_by(User.age)  # ORDER BY n.age ASC
q.order_by(User.created_at, desc=True)  # ORDER BY n.created_at DESC
q.order_by("score ASC")  # raw string
limit(n)[source]

Set LIMIT n on the query.

Parameters:

n (int)

Return type:

QueryBuilder[T]

skip(n)[source]

Set SKIP n (offset) on the query.

Parameters:

n (int)

Return type:

QueryBuilder[T]

distinct()[source]

Add DISTINCT to the RETURN clause.

Return type:

QueryBuilder[T]

return_target(alias)[source]

Set the single alias to return decoded Node instances from.

When a traversal is involved, this selects which alias’s nodes constitute the result of .all():

q.return_target("f")  # returns f-nodes as list[FriendType]
Parameters:

alias (str)

Return type:

QueryBuilder[T]

return_nodes(*aliases)[source]

Declare multiple node aliases to include in the RETURN clause.

Used with return_edge() and all_with_edges() to return structured tuples:

q.return_nodes("u", "m").return_edge("r").all_with_edges()
Parameters:

aliases (str)

Return type:

QueryBuilder[T]

return_edge(alias)[source]

Declare an edge alias to include in the RETURN clause.

Requires that the traversal was created with edge_alias=alias. The edge is decoded via decode_edge() and included as the middle element of tuples returned by all_with_edges().

Parameters:

alias (str)

Return type:

QueryBuilder[T]

project(*fields)[source]

Return only specific property values (scalar projection).

Terminal method .scalars() returns the projected values as a flat list; .all_rows() returns a list of dicts:

# Scalar list
names = session.query(User).project(User.name).scalars()

# Dict list
rows = session.query(User).project(User.name, User.age).all_rows()
Parameters:

fields (FieldDescriptor | str)

Return type:

QueryBuilder[T]

aggregate(*agg_exprs, group_by=None)[source]

Add aggregation expressions to the RETURN clause.

Parameters:
  • *agg_exprs (AggExpr) – One or more AggExpr instances created by the helper functions count(), avg(), etc.

  • group_by (str | None) –

    Alias to keep in the RETURN clause alongside the aggregations (Cypher grouping is implicit — any non-aggregated return term acts as a GROUP BY key):

    .aggregate(count("*").as_("friend_count"), group_by="u")
    # RETURN u, count(*) AS friend_count
    

Return type:

QueryBuilder[T]

Examples

from runic.ogm.query import count, avg

result = (
    session.query(User)
    .alias("u")
    .traverse(User.friends)
    .aggregate(count("*").as_("friend_count"), group_by="u")
    .all_rows()  # list[dict] with {"u": ..., "friend_count": int}
)

avg_age = (
    session.query(User).aggregate(avg(User.age).as_("average_age")).scalar()
)
build()[source]

Compile the accumulated builder state to a (cypher, params) pair.

This is the core compilation step; all terminal methods call it internally. You can also call it directly for debugging or to integrate with custom execution logic:

cypher, params = session.query(User).where(User.active == True).build()
print(cypher)
# MATCH (n:User)
# WHERE n.active = $p0
# RETURN n
Returns:

A (cypher_string, params_dict) pair ready to pass to execute().

Return type:

tuple[str, dict[str, Any]]

all()[source]

Execute and return all matching Node instances.

The return type is the root class (or the alias set by return_target()). Results are decoded and registered in the session identity map.

Returns:

Decoded Node instances of the root type (or target type when return_target() was called).

Return type:

list[T]

one()[source]

Execute and return the first matching Node instance, or None.

Internally calls .limit(1).all() and returns the first element.

Return type:

T | None

all_with_edges()[source]

Execute and return tuples of (NodeA, EdgeModel, NodeB).

Requires return_nodes() to specify node aliases and return_edge() to specify the edge alias. The edge is decoded via decode_edge().

Returns:

Each element is a tuple whose order matches the aliases given to return_nodes() with the edge inserted at its position in return_edge().

Return type:

list[tuple]

Example

rows = (
    session.query(User)
    .alias("u")
    .traverse(User.rated, edge_alias="r")
    .alias("m")
    .return_nodes("u", "m")
    .return_edge("r")
    .all_with_edges()
)
for user, rated_edge, movie in rows:
    print(f"{user.name} rated {movie.title} with {rated_edge.score}")
all_rows()[source]

Execute and return raw column-keyed dicts.

Useful for multi-alias returns, aggregations, or scalar projections where mixed types are in the result set:

rows = q.aggregate(count("*").as_("n"), group_by="u").all_rows()
# [{"u": <User>, "n": 5}, ...]
Return type:

list[dict[str, Any]]

count()[source]

Execute a count(*) variant and return the integer count.

Overrides any existing RETURN spec to emit RETURN count(*). Ignores limit() and skip().

Return type:

int

scalar()[source]

Execute and return the first column of the first row, or None.

Return type:

Any

scalars()[source]

Execute and return the first column of every row as a flat list.

Return type:

list[Any]

register_traversal(fd, source_alias, target_alias, *, optional, edge_alias, min_hops, max_hops)[source]

Append a MATCH clause for one traversal step and register aliases.

Called by TraversalStep.alias() to complete a traversal step.

Parameters:
Return type:

QueryBuilder[T]

class runic.ogm.query.specialised.AsyncQueryBuilder(session, root_cls)[source]

Bases: QueryBuilder[T]

Async variant of QueryBuilder for use with AsyncSession.

All intermediate (chainable) methods are identical to the sync version. Only the terminal methods are replaced with async def equivalents.

Example

async with AsyncSession(graph) as session:
    users = await (
        session.query(User)
        .where(User.active == True)
        .order_by(User.name)
        .limit(50)
        .all()
    )
Parameters:
  • session (Any | None)

  • root_cls (type[T])

async all()[source]

Async version of all().

Return type:

list[T]

async one()[source]

Async version of one().

Return type:

T | None

async all_with_edges()[source]

Async version of all_with_edges().

Return type:

list[tuple[Any, …]]

async all_rows()[source]

Async version of all_rows().

Return type:

list[dict[str, Any]]

async count()[source]

Async version of count().

Return type:

int

async scalar()[source]

Async version of scalar().

Return type:

Any

async scalars()[source]

Async version of scalars().

Return type:

list[Any]

class runic.ogm.query.specialised.FulltextQueryBuilder(session, root_cls, query, fields=None)[source]

Bases: QueryBuilder[T]

QueryBuilder variant for FalkorDB fulltext search queries.

Constructed via fulltext_search(). The root MATCH is replaced with a CALL db.idx.fulltext.queryNodes(...) invocation that uses the declared fulltext index.

The fulltext index must have been created for the node’s label, e.g.:

class Post(Node, labels=["Post"]):
    title: str = Field(index_type="FULLTEXT")

Example

posts = (
    session.fulltext_search(Post, query="graph databases", fields=["title"])
    .where(Post.published == True)
    .order_by(Post.created_at, desc=True)
    .limit(20)
    .all()
)

Cypher emitted:

CALL db.idx.fulltext.queryNodes('Post', $__fts_query) YIELD node AS n
WHERE n.published = $p0
RETURN n
ORDER BY n.created_at DESC
LIMIT 20
Parameters:
  • session (Any)

  • root_cls (type[T])

  • query (str)

  • fields (list[str] | None)

build()[source]

Compile to Cypher, replacing MATCH with CALL fulltext procedure.

Return type:

tuple[str, dict[str, Any]]

class runic.ogm.query.specialised.VectorQueryBuilder(session, root_cls, field, vector, k)[source]

Bases: QueryBuilder[T]

QueryBuilder variant for vector KNN search.

Constructed via vector_search(). Appends a KNN distance expression to the ORDER BY and RETURN clauses.

The field must have index_type="VECTOR" and an HNSW vector index must be created via SchemaManager():

class Document(Node, labels=["Document"]):
    embedding: Vector = Field(index_type="VECTOR")

Example

similar = (
    session.vector_search(
        Document,
        field=Document.embedding,
        vector=[0.1, 0.2, 0.3],
        k=10,
    )
    .where(Document.active == True)
    .all()
)

Cypher emitted (FalkorDB KNN syntax):

MATCH (n:Document)
WHERE n.active = $p0
RETURN n, vecf32(n.embedding) <-> vecf32($__knn_vec) AS __score
ORDER BY __score ASC
LIMIT 10
Parameters:
build()[source]

Compile to Cypher with KNN ORDER BY.

Return type:

tuple[str, dict[str, Any]]

class runic.ogm.query.traversal.TraversalStep(builder, field_descriptor, source_alias, *, optional=True, edge_alias=None, min_hops=1, max_hops=1)[source]

Bases: object

Pending traversal hop; returned by QueryBuilder.traverse() and QueryBuilder.repeat().

Call alias() to complete the step and resume the builder chain.

Parameters:
  • builder (QueryBuilder[Any]) – The owning QueryBuilder instance.

  • field_descriptor (Any) – The FieldDescriptor for the Relation field being traversed.

  • source_alias (str) – The Cypher variable name of the source node.

  • optional (bool) – When True (the default), the traversal emits OPTIONAL MATCH (left-join: source nodes without the relationship are still returned). When False, emits MATCH (inner-join: drops source nodes that have no such relationship).

  • edge_alias (str | None) – Optional Cypher variable name for the relationship itself. When set, the generated pattern is (src)-[edge_alias:TYPE]->(tgt) instead of the anonymous (src)-[:TYPE]->(tgt), enabling edge property filtering and retrieval.

  • min_hops (int) – Minimum number of hops for variable-length paths (default 1). Values > 1 only take effect when combined with max_hops to produce a *min..max quantifier.

  • max_hops (int | None) – Maximum number of hops. None means unbounded (*min..). A value of 1 with min_hops=1 produces a fixed single-hop pattern.

alias(name)[source]

Register the target node alias and append the traversal to the builder.

Calling this method:

  1. Resolves the target Node class from the Relation field’s target.

  2. Appends the appropriate (OPTIONAL) MATCH clause to the builder.

  3. Registers name target_cls in the builder’s alias map.

  4. Registers edge_alias Edge class if an edge alias was given.

  5. Sets the builder’s last alias (used as the default RETURN target when no explicit return_target() is called).

Parameters:

name (str) – Cypher variable name for the target node (e.g. "f", "m").

Returns:

The owning builder, ready for continued chaining.

Return type:

QueryBuilder

class runic.ogm.query.expressions.Expr[source]

Bases: object

Abstract base for all filter/compound/negated expressions.

Subclass instances are passed to QueryBuilder.where(). They support & (AND), | (OR), and ~ (NOT) to build composite predicates:

q.where((User.age > 18) & (User.active == True))
class runic.ogm.query.expressions.FilterExpr(cls, prop, op, value=None, alias=None, negate=False)[source]

Bases: Expr

A single WHERE predicate: alias.prop OP $pN.

Created automatically by the FieldDescriptor operator overloads; you do not normally instantiate this class directly.

Parameters:
cls

The OGM Node or Edge subclass that owns the field. Used by the query builder to look up the Cypher variable (alias) for the class in the current query context.

Type:

type

prop

The property name as declared on the OGM class.

Type:

str

op

A Cypher operator string: "=", "<>", ">", ">=", "<", "<=", "CONTAINS", "STARTS WITH", "=~" (regex), "IN", "IS NULL", "IS NOT NULL".

Type:

str

value

The Python value to compare against. None for null-check ops. TypeConverter.to_graph() is applied during Cypher compilation if the field has a converter; cypher_fn wraps the param reference.

Type:

Any

alias

Optional explicit Cypher variable override (set via on= in QueryBuilder.where()). When None, the builder derives the alias from cls.

Type:

str | None

negate

When True, wraps the predicate in NOT (...). Used for not_in_(); prefer the ~ operator for general negation.

Type:

bool

with_alias(alias)[source]

Return a copy of this expression with alias as the explicit variable.

Parameters:

alias (str)

Return type:

FilterExpr

class runic.ogm.query.expressions.CompoundExpr(op, operands=<factory>)[source]

Bases: Expr

AND / OR combination of multiple sub-expressions.

Example:

(User.age > 18) & (User.active == True)
# → CompoundExpr(op="AND", operands=[FilterExpr(...), FilterExpr(...)])
Parameters:
class runic.ogm.query.expressions.NegatedExpr(operand)[source]

Bases: Expr

NOT wrapper around another expression.

Example:

~(User.banned == True)
# → NegatedExpr(operand=FilterExpr(...))
Parameters:

operand (Expr)

class runic.ogm.query.expressions.OrderExpr(alias, prop, raw=None, desc=False)[source]

Bases: object

Represents a single ORDER BY term.

Created by QueryBuilder.order_by(); not usually instantiated directly.

Parameters:
  • alias (str | None)

  • prop (str | None)

  • raw (str | None)

  • desc (bool)

alias

The Cypher variable (e.g. "n", "u").

Type:

str | None

prop

The property name (e.g. "age"). None when raw is set.

Type:

str | None

raw

A raw Cypher expression string (e.g. "score ASC"). Used when the user passes a string directly to order_by().

Type:

str | None

desc

True for descending order; False (default) for ascending.

Type:

bool

to_cypher()[source]

Render to a Cypher ORDER BY term string.

Return type:

str

class runic.ogm.query.expressions.AggExpr(func, field='*', result_alias=None, distinct=False)[source]

Bases: object

An aggregation function expression for use in RETURN clauses.

Created via the helper functions count(), avg(), sum_(), min_(), max_(), collect().

Parameters:
  • func (str)

  • field (Any)

  • result_alias (str | None)

  • distinct (bool)

func

Cypher aggregation function name: "count", "avg", "sum", "min", "max", "collect".

Type:

str

field

A FieldDescriptor or raw string ("*" for count(*)).

Type:

Any

result_alias

The AS name alias in the RETURN clause.

Type:

str | None

distinct

When True, emits count(DISTINCT n.prop) etc.

Type:

bool

as_(alias)[source]

Return a copy with a RETURN alias set.

Parameters:

alias (str)

Return type:

AggExpr

to_cypher(alias_map)[source]

Render to a Cypher aggregation expression string.

Parameters:

alias_map (dict[type, str]) – Mapping from OGM class to Cypher variable (provided by the builder during compilation).

Return type:

str

runic.ogm.query.expressions.count(field='*', *, distinct=False)[source]

Create a count(...) aggregation expression.

Parameters:
  • field (Any) – The field to count, or "*" (default) for count(*).

  • distinct (bool) – When True, emits count(DISTINCT field).

Return type:

AggExpr

Examples

q.aggregate(count())  # count(*)
q.aggregate(count(User.name, distinct=True))  # count(DISTINCT n.name)
runic.ogm.query.expressions.avg(field)[source]

Create an avg(...) aggregation expression.

Example:

q.aggregate(avg(User.age).as_("average_age"))
Parameters:

field (Any)

Return type:

AggExpr

runic.ogm.query.expressions.sum_(field)[source]

Create a sum(...) aggregation expression.

Example:

q.aggregate(sum_(Order.amount).as_("total"))
Parameters:

field (Any)

Return type:

AggExpr

runic.ogm.query.expressions.min_(field)[source]

Create a min(...) aggregation expression.

Parameters:

field (Any)

Return type:

AggExpr

runic.ogm.query.expressions.max_(field)[source]

Create a max(...) aggregation expression.

Parameters:

field (Any)

Return type:

AggExpr

runic.ogm.query.expressions.collect(field, *, distinct=False)[source]

Create a collect(...) aggregation expression.

Collects values from multiple rows into a list.

Example:

q.aggregate(collect(Post.title).as_("post_titles"))
Parameters:
Return type:

AggExpr