ORM API Reference¶
runic.orm is a lightweight graph ORM for Cypher-based graph databases.
It follows a SQLAlchemy-style architecture: driver → session → mapper →
repository. FalkorDB, ArcadeDB, and any Bolt-compatible database are
supported via the GraphDriver abstraction.
runic.orm.core — Models & Fields¶
- class runic.orm.core.models.Node[source]¶
Bases:
objectBase class for graph nodes.
Subclass with
labelsand optionalprimary_labelclass keywords:class Country( Location, labels=["Location", "Country"], primary_label="Location" ): iso_code: str = Field(unique=True)
__init__is generated automatically from declaredFielddescriptors. Setting any field on an existing instance marks it dirty (_dirty = True), which the Mapper interprets as a MERGE/SET on the next flush.
- class runic.orm.core.models.Edge[source]¶
Bases:
objectBase class for graph edge property models.
Subclass with a
typeclass keyword:class InvitationEdge(Edge, type="INVITED_TO"): role: str = Field(required=True)
Edge instances carry the same
_new/_dirtylifecycle flags as Node.
- runic.orm.core.descriptors.Field(*, default=MISSING, default_factory=None, init=True, kw_only=True, index=False, index_type=None, unique=False, required=False, primary_key=False, converter=None, generated=False, interned=False)[source]¶
Declare a property field on a Node or Edge.
Returns a
FieldDescriptortyped asAnyso thatname: str = Field()is accepted by type checkers without error.Set
interned=Trueto store the value via FalkorDB’sintern()function, which deduplicates repeated strings (e.g. country names, status codes, tags) by keeping a single shared copy in the database.Example:
class Person(Node, labels=["Person"]): id: str = Field(primary_key=True) name: str = Field() age: int | None = Field(default=None) country: str = Field(interned=True) email: str = Field(index=True, unique=True)
- runic.orm.core.descriptors.Relation(*, relationship, direction, target, edge_model=None, cascade=False, lazy=True, default=None, default_factory=None, init=True)[source]¶
Declare a relationship field on a Node.
Returns a
FieldDescriptortyped asAnyso thatcompany: Company | None = Relation(...)is accepted by type checkers.Example:
class Person(Node, labels=["Person"]): id: str = Field() company: Company | None = Relation( relationship="WORKS_FOR", direction="OUTGOING", target="Company", ) friends: list["Person"] = Relation( relationship="KNOWS", direction="OUTGOING", target="Person", )
- class runic.orm.core.descriptors.FieldDescriptor(*, default=MISSING, default_factory=None, init=True, kw_only=True, index=False, index_type=None, unique=False, required=False, primary_key=False, relationship=None, direction=None, target=None, edge_model=None, cascade=False, lazy=True, converter=None, generated=False, interned=False)[source]¶
Bases:
objectDescriptor backing both Field() and Relation() declarations.
Behaves as a data descriptor: values are stored per-instance in
__dict__. Writing to an instance attribute via this descriptor setsinstance._dirty = True, which the Mapper uses to detect changes on persistent entities.Query expression operators¶
When accessed at the class level (
User.name,Rated.score), the descriptor returns itself. The comparison operators defined below produceFilterExprobjects for use withwhere():session.query(User).where(User.name == "Alice") session.query(User).where(User.age > 18) session.query(User).where(User.bio.contains("graph")) session.query(User).where(User.deleted_at.is_null())
__hash__is kept asobject.__hash__so descriptors remain hashable and can appear in sets/dicts used internally.- Parameters:
default (Any)
default_factory (Callable[[], Any] | None)
init (bool)
kw_only (bool)
index (bool)
index_type (Literal['FULLTEXT', 'VECTOR'] | None)
unique (bool)
required (bool)
primary_key (bool)
relationship (str | None)
direction (Literal['OUTGOING', 'INCOMING', 'BOTH'] | None)
cascade (bool)
lazy (bool)
converter (TypeConverter | None)
generated (bool)
interned (bool)
runic.orm.core — MetaData¶
- class runic.orm.core.metadata.MetaData[source]¶
Bases:
objectRegistry for all Node and Edge subclasses.
Populated automatically when Node/Edge subclasses are defined via
__init_subclass__. Also provides forward-reference resolution (string targets on relationship Fields) after all models are imported.- register_node(cls)[source]¶
Register a Node subclass; called by Node.__init_subclass__.
- Parameters:
cls (type)
- Return type:
None
- register_edge(cls)[source]¶
Register an Edge subclass; called by Edge.__init_subclass__.
- Parameters:
cls (type)
- Return type:
None
- resolve_target(target)[source]¶
Resolve a string forward reference to its registered Node/Edge class.
- finalize()[source]¶
Resolve all string forward references in relationship Fields.
Call once after all model modules have been imported. String targets on relationship Fields are replaced with the actual class objects.
- Return type:
None
- snapshot()[source]¶
Capture the current registry state for later restore.
- Return type:
_MetaSnapshot
- class runic.orm.core.metadata.NodeMeta(cls, labels, primary_label, fields, pk_field_name=None)[source]¶
Bases:
objectMetadata snapshot for a registered Node subclass.
runic.orm.core — Type Converters¶
- class runic.orm.core.types.TypeConverter[source]¶
Bases:
ABCInterface for encoding/decoding custom Python types to/from graph values.
Optionally, set
cypher_fnto name a FalkorDB Cypher function that wraps the parameter reference when writing to the graph (e.g."vecf32"→vecf32($field)).
- class runic.orm.core.types.DatetimeConverter[source]¶
Bases:
TypeConverterConverts between Python datetime objects and ISO-8601 strings.
FalkorDB stores datetimes as strings; this converter handles the round-trip.
- class runic.orm.core.types.EnumConverter(enum_class)[source]¶
Bases:
TypeConverterConverts between Python Enum members and their string values.
Stores the enum’s .value in the graph and reconstructs on load.
- Parameters:
enum_class (type[Enum])
- class runic.orm.core.types.Vector(iterable=(), /)[source]¶
Bases:
listA typed list of floats representing a graph embedding vector.
Use as an annotation on a Node field to store and query embeddings:
class Article(Node, labels=["Article"]): id: str = Field(primary_key=True) embedding: Vector = Field(index=True, index_type="VECTOR")
FalkorDB stores vectors via
vecf32(), preserving 32-bit float precision.
- class runic.orm.core.types.VectorConverter[source]¶
Bases:
TypeConverterConverts between Python Vector (list of floats) and FalkorDB’s vecf32 format.
Emits
vecf32($field)in Cypher viacypher_fn = "vecf32".
- class runic.orm.core.types.GeoLocation(latitude, longitude)[source]¶
Bases:
objectA geographic point with latitude and longitude.
Maps to FalkorDB’s native
point()type:class Store(Node, labels=["Store"]): id: str = Field(primary_key=True) location: GeoLocation = Field()
FalkorDB round-trips this as
point({latitude: ..., longitude: ...}).
- class runic.orm.core.types.GeoLocationConverter[source]¶
Bases:
TypeConverterConverts between GeoLocation and FalkorDB’s point() dict format.
Emits
point($field)in Cypher viacypher_fn = "point". FalkorDB returns points as{"latitude": ..., "longitude": ...}dicts.
runic.orm.driver — Drivers & Dialects¶
- class runic.orm.driver.falkordb.FalkorDBDriver(graph, db=None)[source]¶
Bases:
objectSync driver wrapping a FalkorDB graph handle.
- Parameters:
graph (Any)
db (Any)
- class runic.orm.driver.falkordb.AsyncFalkorDBDriver(graph)[source]¶
Bases:
objectAsync driver wrapping an async FalkorDB graph handle.
- Parameters:
graph (Any)
- class runic.orm.driver.falkordb.FalkorDBDialect[source]¶
Bases:
objectStrategy implementation for FalkorDB-specific Cypher generation.
- class runic.orm.driver.bolt.BoltDriver(uri, auth, database, dialect, *, encrypted=True)[source]¶
Bases:
objectSync Bolt driver for ArcadeDB, Neo4j, or any Bolt-compatible graph DB.
Supports explicit ACID transactions via
TransactionalGraphDriver. When no transaction is active, eachexecute()call opens its own Bolt session (auto-commit semantics). Callbegin()to start a transaction that spans multipleexecute()calls, thencommit()orrollback()to end it.The ORM
Sessiondrives this lifecycle automatically via lazy-begin: the first query inside a Session opens a transaction;Session.commit()/Session.rollback()close it.- begin()[source]¶
Open a Bolt session and begin an explicit transaction.
Raises
RuntimeErrorif a transaction is already active.- Return type:
None
- class runic.orm.driver.arcadedb.ArcadeDBDialect[source]¶
Bases:
objectStrategy for ArcadeDB-specific Cypher generation.
Key differences from FalkorDB: - No
toInteger()cast needed forid()-based lookups - Novecf32()orintern()wrappers (raw values stored as-is) - Vector KNN viaCALL vector.neighbors(...)- Fulltext search not yet supported (raisesNotImplementedError) -SET n.prop = point()is not supported via Bolt; GeoLocation is stored as a{"latitude": x, "longitude": y}map instead.
- class runic.orm.driver.age.AGEDriver(conn, graph_name)[source]¶
Bases:
objectSync driver for Apache AGE (PostgreSQL graph extension).
Cypher queries are wrapped in the AGE
cypher()SQL function and executed via apsycopg(psycopg3) connection. Parameters are serialised as an agtype JSON map and passed as the third argument tocypher(), making them accessible inside Cypher as$param_name.Supports explicit ACID transactions via
TransactionalGraphDriver. psycopg3 starts an implicitBEGINon the first statement after each commit/rollback (autocommit=Falsedefault); this driver’scommit()/rollback()map directly toconn.commit()/conn.rollback().begin()is a documented no-op because psycopg3 manages the implicit transaction start automatically.The ORM
Sessiondrives this lifecycle: the first query in a Session opens a transaction implicitly;commit()/rollback()finalise it.AGE stores each vertex label as a separate PostgreSQL table; a vertex belongs to exactly one label fixed at creation time. Multi-label operations (
SET n:New REMOVE n:Old) are therefore not supported.Example
driver = create_age_driver( host="localhost", port=5432, database="postgres", graph="my_graph", username="postgres", password="secret", ) with Session(driver) as session: ...
- Parameters:
conn (Any)
graph_name (str)
- class runic.orm.driver.age.AGEDialect[source]¶
Bases:
objectStrategy for Apache AGE-specific Cypher generation.
Key differences from FalkorDB: - No
toInteger()cast forid()-based lookups - Novecf32()orintern()wrappers (raw Python values stored as-is) - Fulltext search: not supported natively (raisesNotImplementedError) - Vector KNN: not supported natively (raisesNotImplementedError) - Multi-label emulation: extra labels stored as_labelsproperty array
- runic.orm.driver.falkordb.create_falkordb_driver(host, port, graph)[source]¶
Create a
FalkorDBDriverfrom connection parameters.- Parameters:
- Return type:
- runic.orm.driver.arcadedb.create_arcadedb_driver(host, port, database, username, password)[source]¶
Create a
BoltDriverconfigured for ArcadeDB.
- runic.orm.driver.age.create_age_driver(host, port, database, graph, username, password)[source]¶
Create an
AGEDriverconnected to a PostgreSQL+AGE instance.- Parameters:
- Return type:
- runic.orm.driver.factory.create_driver(backend, **kwargs)[source]¶
Return a
GraphDriverfor the given backend.- Parameters:
- Raises:
ValueError – When backend is unknown.
- Return type:
GraphDriver
Examples
FalkorDB:
driver = create_driver( "falkordb", host="localhost", port=6379, graph="my_graph" )
ArcadeDB:
driver = create_driver( "arcadedb", host="localhost", port=7687, database="MyDB", username="root", password="secret", )
Neo4j:
driver = create_driver( "neo4j", host="localhost", port=7687, database="neo4j", username="neo4j", password="secret", encrypted=True, )
Memgraph:
driver = create_driver( "memgraph", host="localhost", port=7687, database="memgraph", username="", password="", )
Apache AGE:
driver = create_driver( "age", host="localhost", port=5432, database="postgres", graph="my_graph", username="postgres", password="secret", )
runic.orm.session — Session¶
- class runic.orm.session.session.Session(driver, mapper=None, *, log_cypher=False)[source]¶
Bases:
objectSync unit-of-work manager.
Owns all mutations (
add,delete), single-entity lookup (get), identity map, and flush/commit lifecycle. Repositories hold a session reference and delegate writes and PK lookups to it.Transaction model — determined by the injected driver:
FalkorDB (no native multi-query transactions): each
GRAPH.QUERYis individually atomic.commit()flushes pending writes;rollback()discards un-flushed state only — it cannot undo writes already sent to the graph.Bolt-based drivers (Neo4j, Memgraph, ArcadeDB): full ACID transactions via the Bolt protocol. The first query lazily opens a Bolt transaction;
commit()/rollback()commit or discard all changes as a single atomic unit.Apache AGE (psycopg3): full PostgreSQL ACID transactions. psycopg3 starts an implicit
BEGINon the first SQL statement;commit()/rollback()map toconn.commit()/conn.rollback().
Drivers that support explicit transactions implement the
TransactionalGraphDriverprotocol. The Session detects this viaisinstanceand wires commit/rollback accordingly.- Parameters:
driver (GraphDriver)
mapper (Mapper | None)
log_cypher (bool)
- add(entity)[source]¶
Register a transient/detached entity as pending (staged for CREATE).
- Parameters:
entity (Any)
- Return type:
None
- delete(entity)[source]¶
Mark a persistent entity for DETACH DELETE on next flush.
Raises
DetachedEntityErrorif the entity is not known to this session.- Parameters:
entity (Any)
- Return type:
None
- property mapper: Mapper¶
Return the Mapper used by this session.
- property rel_loader: RelationshipLoader¶
Return the RelationshipLoader used by this session.
- register_or_get(entity)[source]¶
Register entity in the identity map; return existing instance if present.
Used by Repository reads to deduplicate against entities already loaded in this session (fulfilling the identity-map guarantee).
- get(cls, pk, fetch=None)[source]¶
Return entity from identity map or query graph;
Noneif not found.Pass
fetch=["rel_name", ...]to eager-load relationship fields in the same Cypher query usingOPTIONAL MATCH.
- load_relationship(entity, field_name)[source]¶
Load a lazy relationship field and cache the result on the entity.
Called by
Field._trigger_lazy_loadwhen a_NOT_LOADEDsentinel is accessed on an entity that is attached to this session. Writes directly toentity.__dict__to bypass the dirty-tracking descriptor.
- flush()[source]¶
Execute all pending/dirty/deleted entities against the graph.
Does not clear the identity map. Each entity write is a separate
graph.query()call. Entities withgenerated=TrueIDs are handled individually so the returned ID can be assigned before continuing.- Return type:
None
- commit()[source]¶
flush()then clear the pending/deleted tracking sets.For transactional drivers (Bolt, AGE), also commits the active database transaction so all flushed writes become durable and visible.
- Return type:
None
- rollback()[source]¶
Discard un-flushed pending/deleted sets; expire all persistent entities.
For transactional drivers (Bolt, AGE), also rolls back the active database transaction — writes already flushed but not yet committed are discarded atomically. For FalkorDB (no native transactions), only un-flushed in-memory state is cleared; writes already sent to the graph cannot be undone.
- Return type:
None
- expire(entity)[source]¶
Invalidate cached attributes; they will be reloaded on next
refresh.- Parameters:
entity (Any)
- Return type:
None
- refresh(entity)[source]¶
Immediately re-query the entity from the graph and update in-place.
- Parameters:
entity (Any)
- Return type:
None
- expunge(entity)[source]¶
Remove entity from session (→ detached); no graph action.
- Parameters:
entity (Any)
- Return type:
None
- relate(source, field_name, target, edge=None)[source]¶
Create or update a relationship between source and target.
Uses
MERGEsemantics: if the relationship already exists its edge properties are updated; if not, it is created. Pass anEdgemodel instance as edge to write properties on the relationship itself.field_name may be a plain string or the class-level descriptor attribute (e.g.
User.invited_trips) for type-safe call sites.The cached value of the relation field on source is invalidated after the write so the next access re-fetches fresh data from the graph.
- Parameters:
source (Any)
field_name (str | FieldDescriptor)
target (Any)
edge (Any | None)
- Return type:
None
- unrelate(source, field_name, target)[source]¶
Delete the relationship between source and target.
field_name may be a plain string or the class-level descriptor attribute (e.g.
User.invited_trips) for type-safe call sites.The cached value of the relation field on source is invalidated after the write so the next access re-fetches fresh data from the graph.
- Parameters:
source (Any)
field_name (str | FieldDescriptor)
target (Any)
- Return type:
None
- execute(cypher, params=None, write=False)[source]¶
Execute raw Cypher; returns
QueryResult; no entity mapping.
- scalars(stmt)[source]¶
Execute a
select()statement; return decoded entities.Type-safe:
session.scalars(select(User).where(...))inferslist[User].- Parameters:
stmt (QueryBuilder[_T]) – An unbound
QueryBuildercreated viaselect().- Return type:
list[_T]
- scalar(stmt)[source]¶
Execute a
select()statement; return first entity orNone.Adds
LIMIT 1internally without permanently modifying the statement. Type-safe:session.scalar(select(User).where(...))infersUser | None.- Parameters:
stmt (QueryBuilder[_T]) – An unbound
QueryBuildercreated viaselect().- Return type:
_T | None
- all_rows(stmt)[source]¶
Execute a
select()statement; return column-keyed dicts.- Parameters:
stmt (QueryBuilder[Any]) – An unbound
QueryBuilder.- Return type:
- all_with_edges(stmt)[source]¶
Execute a
select()statement; return(NodeA, Edge, NodeB)tuples.- Parameters:
stmt (QueryBuilder[Any]) – An unbound
QueryBuilderwithreturn_nodes()andreturn_edge()configured.- Return type:
- count(stmt)[source]¶
Execute a
select()statement; return the row count.- Parameters:
stmt (QueryBuilder[Any]) – An unbound
QueryBuilder.- Return type:
- query(cls)[source]¶
Return a
QueryBuilderfor cls.This is the primary entry point for the fluent query builder API:
users = ( session.query(User) .where(User.active == True) .order_by(User.name) .limit(20) .all() )
- Parameters:
- Return type:
- fulltext_search(cls, *, query, fields=None)[source]¶
Return a
FulltextQueryBuilderfor cls.Uses FalkorDB’s
CALL db.idx.fulltext.queryNodes()procedure. The node label must have a fulltext index created.- Parameters:
- Return type:
Example
posts = ( session.fulltext_search(Post, query="graph databases") .where(Post.published == True) .limit(10) .all() )
- vector_search(cls, *, field, vector, k=10)[source]¶
Return a
VectorQueryBuilderfor cls.Performs a K-Nearest-Neighbour search using FalkorDB’s HNSW index.
- Parameters:
- Return type:
Example
similar = ( session.vector_search( Document, field=Document.embedding, vector=my_vec, k=5 ) .where(Document.active == True) .all() )
- close()[source]¶
Expunge all tracked entities; roll back any orphaned transaction.
If
close()is called without a priorcommit()orrollback()(e.g. the session was not used as a context manager), any active driver-level transaction is rolled back to release the connection cleanly.- Return type:
None
- class runic.orm.session.async_session.AsyncSession(driver, mapper=None, *, log_cypher=False)[source]¶
Bases:
objectAsync unit-of-work manager; mirrors
Sessionwithasyncmethods.Use as an async context manager:
async with AsyncSession(graph) as session: alice = await session.get(Person, "alice-id") alice.email = "new@example.com" await session.commit()
Lazy relationship loading is not supported in async context because
Field.__get__cannotawait. Usefetch=[...]onget()instead.- Parameters:
driver (AsyncGraphDriver)
mapper (Mapper | None)
log_cypher (bool)
- add(entity)[source]¶
Register a transient/detached entity as pending.
- Parameters:
entity (Any)
- Return type:
None
- delete(entity)[source]¶
Mark a persistent entity for DETACH DELETE on next flush.
- Parameters:
entity (Any)
- Return type:
None
- property mapper: Mapper¶
Return the Mapper used by this session.
- property rel_loader: RelationshipLoader¶
Return the RelationshipLoader used by this session.
- register_or_get(entity)[source]¶
Register entity in the identity map; return existing instance if present.
- async get(cls, pk, fetch=None)[source]¶
Return entity from identity map or query graph asynchronously.
Pass
fetch=["rel_name", ...]to eager-load relationships in the same Cypher query usingOPTIONAL MATCH.
- load_relationship(entity, field_name)[source]¶
Raise
LazyLoadError; lazy loading is not supported in async sessions.Access
entity.rel_fieldfrom within an async context manager triggers this viaField._trigger_lazy_load. Usefetch=[field_name]onget()for eager loading instead.
- async flush()[source]¶
Execute all pending/dirty/deleted entities against the graph.
- Return type:
None
- async rollback()[source]¶
Discard un-flushed pending/deleted sets; expire all persistent entities.
- Return type:
None
- async refresh(entity)[source]¶
Immediately re-query the entity from the graph.
- Parameters:
entity (Any)
- Return type:
None
- expunge(entity)[source]¶
Remove entity from session; no graph action.
- Parameters:
entity (Any)
- Return type:
None
- async relate(source, field_name, target, edge=None)[source]¶
Create or update a relationship between source and target.
Uses
MERGEsemantics: if the relationship already exists its edge properties are updated; if not, it is created. Pass anEdgemodel instance as edge to write properties on the relationship itself.field_name may be a plain string or the class-level descriptor attribute (e.g.
User.invited_trips) for type-safe call sites.The cached value of the relation field on source is invalidated after the write so the next access re-fetches fresh data from the graph.
- Parameters:
source (Any)
field_name (str | FieldDescriptor)
target (Any)
edge (Any | None)
- Return type:
None
- async unrelate(source, field_name, target)[source]¶
Delete the relationship between source and target.
field_name may be a plain string or the class-level descriptor attribute (e.g.
User.invited_trips) for type-safe call sites.The cached value of the relation field on source is invalidated after the write so the next access re-fetches fresh data from the graph.
- Parameters:
source (Any)
field_name (str | FieldDescriptor)
target (Any)
- Return type:
None
- async execute(cypher, params=None, write=False)[source]¶
Execute raw Cypher; returns
QueryResult; no entity mapping.
- async scalars(stmt)[source]¶
Execute a
select()statement; return decoded entities.Type-safe:
await session.scalars(select(User).where(...))inferslist[User].- Parameters:
stmt (QueryBuilder[_T]) – An unbound
QueryBuildercreated viaselect().- Return type:
list[_T]
- async scalar(stmt)[source]¶
Execute a
select()statement; return first entity orNone.Adds
LIMIT 1internally without permanently modifying the statement.- Parameters:
stmt (QueryBuilder[_T]) – An unbound
QueryBuilder.- Return type:
_T | None
- async all_rows(stmt)[source]¶
Execute a
select()statement; return column-keyed dicts.- Parameters:
stmt (QueryBuilder[Any]) – An unbound
QueryBuilder.- Return type:
- async all_with_edges(stmt)[source]¶
Execute a
select()statement; return(NodeA, Edge, NodeB)tuples.- Parameters:
stmt (QueryBuilder[Any]) – An unbound
QueryBuilderwithreturn_nodes()andreturn_edge()configured.- Return type:
- async count(stmt)[source]¶
Execute a
select()statement; return the row count.- Parameters:
stmt (QueryBuilder[Any]) – An unbound
QueryBuilder.- Return type:
- query(cls)[source]¶
Return an
AsyncQueryBuilderfor cls.Async entry point for the fluent query builder. Use
awaiton the terminal methods (all(),one(),count(), etc.):async with AsyncSession(graph) as session: users = await ( session.query(User).where(User.active == True).limit(20).all() )
- fulltext_search(cls, *, query, fields=None)[source]¶
Async fulltext search; mirrors
fulltext_search().
- vector_search(cls, *, field, vector, k=10)[source]¶
Async vector KNN search; mirrors
vector_search().
- class runic.orm.session.connection_pool.ConnectionManager(db, graph_name)[source]¶
Bases:
objectManages a FalkorDB connection for use with Session.
Holds the db client and graph name;
acquire()returns aFalkorDBDriver.- Parameters:
db (Any)
graph_name (str)
- acquire()[source]¶
Return a
FalkorDBDriverfor the configured graph.- Return type:
- release(driver)[source]¶
Release a driver back to the pool (no-op in current impl).
- Parameters:
driver (FalkorDBDriver)
- Return type:
None
- class runic.orm.session.connection_pool.AsyncConnectionManager(db, graph_name)[source]¶
Bases:
objectAsync variant of
ConnectionManagerfor AsyncFalkorDB clients.- Parameters:
db (Any)
graph_name (str)
- acquire()[source]¶
Return an
AsyncFalkorDBDriverfor the configured graph.- Return type:
- async release(driver)[source]¶
Release an async driver (no-op in current impl).
- Parameters:
driver (AsyncFalkorDBDriver)
- Return type:
None
runic.orm.repository — Repository¶
- class runic.orm.repository.repository.Repository(session, entity_class)[source]¶
Bases:
RepositoryProtocol,GenericTyped reads and explicit Cypher helpers for one entity type.
Mutations (
add,delete) and single-PK lookup (get) belong to theSession. All reads here register returned entities in the session identity map.Example:
with Session(graph) as session: repo = Repository(session, Person) all_people = repo.find_all() page = repo.find_all(skip=0, limit=25)
- find_all(fetch=None, skip=0, limit=None)[source]¶
Return all entities of this type, with optional eager relationship loading.
Use skip and limit for offset-based pagination (aligns with
select(...).skip(n).limit(n)syntax). Combining fetch with skip/limit is not supported — use the QueryBuilder for that.
- query()[source]¶
Return a
QueryBuilderfor this repository’s entity type.Shorthand for
select(self._cls)bound to the current session. Prefer theselect()+ session execution pattern for new code:repo = Repository(session, User) # Preferred (select + session execution): users = session.scalars(select(User).where(User.active == True)) # Also available via repo (bound builder): users = repo.query().where(User.active == True).all()
- Return type:
QueryBuilder[T]
- cypher(query, params=None, *, returns=None, write=False)[source]¶
Execute query and return a typed list.
returnscontrols decoding:int,str,bool→ scalar;dict→ column-keyed dicts; anyNodesubclass → decoded entities registered in the session identity map;None→ empty list.
- class runic.orm.repository.async_repository.AsyncRepository(session, entity_class)[source]¶
Bases:
AsyncRepositoryProtocol,GenericAsync typed reads and explicit Cypher helpers for one entity type.
Mirrors
Repositorywithasyncmethods.Example:
async with AsyncSession(graph) as session: repo = AsyncRepository(session, Trip) trips = await repo.find_all()
- Parameters:
session (AsyncSession)
entity_class (type[T])
- async find_all(fetch=None, skip=0, limit=None)[source]¶
Return all entities of this type, with optional eager relationship loading.
Use skip and limit for offset-based pagination (aligns with
select(...).skip(n).limit(n)syntax). Combining fetch with skip/limit is not supported — use the QueryBuilder for that.
- query()[source]¶
Return an
AsyncQueryBuilderfor this repository’s entity type.Async counterpart of
query(). Useawaiton the terminal methods:repo = AsyncRepository(session, User) users = await repo.query().where(User.active == True).all()
- Return type:
- async cypher(query, params=None, *, returns=None, write=False)[source]¶
Execute query and return a typed list.
runic.orm.schema — Index declarations¶
- class runic.orm.schema.index_manager.IndexSpec(label, property, index_type)[source]¶
Bases:
objectNormalized description of a single declared or existing index/constraint.
index_typeis one of"RANGE","FULLTEXT","VECTOR", or"UNIQUE".
- runic.orm.schema.index_manager.extract_declared_specs(entity_class)[source]¶
Return IndexSpecs declared via Field descriptors on entity_class.
Rules: -
unique=True→ UNIQUE constraint (backing RANGE is auto-created by FalkorDB). -index=True(withoutunique) → RANGE index. -index_type="FULLTEXT"→ FULLTEXT index. -index_type="VECTOR"→ VECTOR index. - Relationship fields are skipped. - A field with bothunique=Trueandindex=Trueemits only UNIQUE.
runic.migrate.schema — Index & Schema Management¶
- class runic.migrate.schema.IndexManager(adapter_or_graph)[source]¶
Bases:
objectCreates and manages graph indexes and constraints from entity Field declarations.
Accepts any object satisfying the
IndexAdapterprotocol — a migrate adapter (Neo4j, Memgraph, FalkorDB, ArcadeDB, AGE) or a raw FalkorDB graph handle (auto-wrapped inFalkorDBIndexAdapterfor backward compat).Fulltext batching — Neo4j and Memgraph use a single named fulltext index per label covering all search fields.
create_indexes()collapses all FULLTEXT specs for the same label into onecreate_fulltext_index(label, prop1, prop2, ...)call.Example:
from runic.migrate import IndexManager, create_adapter adapter = create_adapter( "neo4j", host="localhost", database="neo4j", password="secret" ) manager = IndexManager(adapter) manager.create_indexes(Person) manager.ensure_indexes(Trip)
- Parameters:
adapter_or_graph (Any)
- create_indexes(entity_class, *, if_not_exists=True)[source]¶
Create all indexes and constraints declared on entity_class.
FULLTEXT specs sharing a label are batched into a single
create_fulltext_index(label, *props)call.When if_not_exists is
True(default), existing non-FULLTEXT specs are skipped. FULLTEXT creation is always attempted — adapters must handle idempotency.
- ensure_indexes(entity_class)[source]¶
Create missing indexes for entity_class; skip those that already exist.
- Parameters:
entity_class (type)
- Return type:
None
- class runic.migrate.schema.ValidationResult(is_valid, missing_indexes=<factory>, extra_indexes=<factory>, errors=<factory>)[source]¶
Bases:
objectResult of a
SchemaManager.validate_schema()call.- Parameters:
- missing_indexes¶
Declared but not yet created in the live graph.
- extra_indexes¶
Present in the graph but not declared on any entity.
- class runic.migrate.schema.SchemaManager(adapter_or_graph)[source]¶
Bases:
objectValidates and synchronizes graph indexes against entity Field declarations.
Accepts any object satisfying the
IndexAdapterprotocol (a migrate adapter or a raw FalkorDB graph handle for backward compat).Example:
from runic.migrate import SchemaManager, create_adapter adapter = create_adapter( "neo4j", host="localhost", database="neo4j", password="secret" ) schema = SchemaManager(adapter) result = schema.validate_schema([Person, KnowsEdge]) schema.sync_schema([Person, KnowsEdge])
- Parameters:
adapter_or_graph (Any)
- ensure_entity_types(entity_classes)[source]¶
Create vertex/edge types for entity_classes on adapters that require them.
No-op for schemaless backends. Issues
CREATE VERTEX TYPE/CREATE EDGE TYPEDDL for ArcadeDB.
- validate_schema(entity_classes)[source]¶
Compare declared indexes against the live graph state.
Returns a
ValidationResultdescribing missing and extra indexes.is_validisTrueonly when both sets are empty and no errors occurred.- Parameters:
- Return type:
- sync_schema(entity_classes, *, drop_extra=False)[source]¶
Create entity types and missing indexes; drop extras when drop_extra is
True.Calls
ensure_entity_typesfirst (required for ArcadeDB empty collections), then delegates toIndexManager.create_indexes()per class.
runic.orm.exceptions¶
- exception runic.orm.exceptions.OrmError[source]¶
Bases:
ExceptionBase exception for runic ORM errors.
- exception runic.orm.exceptions.EntityNotFoundError[source]¶
Bases:
OrmErrorRaised when an entity cannot be found by its primary key.
- exception runic.orm.exceptions.DetachedEntityError[source]¶
Bases:
OrmErrorRaised when an operation is attempted on a detached entity.
- exception runic.orm.exceptions.LazyLoadError[source]¶
Bases:
OrmErrorRaised when lazy relationship loading cannot be performed.
Occurs when accessing a lazy field on an entity in an async session (where
__get__cannot await) or when the session is unavailable. Usefetch=[field_name]onsession.get()for eager loading instead.
- exception runic.orm.exceptions.FieldValidationError[source]¶
Bases:
OrmErrorRaised when a field value fails validation.
- exception runic.orm.exceptions.MetadataError[source]¶
Bases:
OrmErrorRaised for metadata registry errors (e.g. duplicate labels).
See also
Migration API Reference — Migration API reference (runic.migrate)
runic.orm.query¶
- runic.orm.query.select(cls)[source]¶
Create a session-independent query statement for cls.
Mirrors the SQLAlchemy 2.0
select()pattern — compose the statement freely (including conditional filters), then execute via the session:from runic.orm import select stmt = select(User).where(User.active == True) if min_age > 0: stmt = stmt.where(User.age >= min_age) users: list[User] = session.scalars(stmt) user: User | None = session.scalar(stmt) n: int = session.count(stmt)
The returned
QueryBuilderis unbound — calling terminal methods like.all()directly will raiseRuntimeError. Use the session execution methods instead.- Parameters:
- Return type:
QueryBuilder[_T]
- class runic.orm.query.builder.QueryBuilder(session, root_cls)[source]¶
Bases:
Generic[T]Fluent Cypher query builder for a single root Node class.
Construct via
Session.query():q = session.query(User)
All non-terminal methods return
selfso calls can be chained:users = session.query(User).where(User.active == True).limit(10).all()
- Parameters:
session (Any | None) – The
Session(orAsyncSession) this builder is bound to.root_cls (type[T]) – The root Node subclass to query.
- alias(name)[source]¶
Set the Cypher variable for the root (most recent) node.
Call immediately after
Session.query()to name the root variable, or afterTraversalStep.alias()has already been called to rename the last registered target.Example:
session.query(User).alias("u").where(User.active == True, on="u")
- Parameters:
name (str)
- Return type:
QueryBuilder[T]
- where(expr, *, on=None)[source]¶
Add a WHERE predicate.
- Parameters:
expr (Expr) –
A
FilterExpr,CompoundExpr, orNegatedExpr. Created via field descriptor operators:User.name == "Alice" (User.age > 18) & (User.active == True)
on (str | None) –
Override the Cypher variable for this predicate. Useful when the same Node class appears under multiple aliases, or when filtering on edge properties:
.where(Rated.score > 4.0, on="r")
- Return type:
QueryBuilder[T]
Notes
Multiple
.where()calls are combined withAND. To expressOR, use the|operator on the expressions before passing:.where((User.role == "admin") | (User.role == "mod"))
- traverse(relation_field, *, edge_alias=None, optional=True)[source]¶
Traverse a declared
Relation()field.Returns a
TraversalStep; call.alias("f")on it to name the target node and return to the builder.- Parameters:
relation_field (FieldDescriptor) –
The
Relation-backed field descriptor accessed at class level:User.friends # list[User] = Relation(...) User.rated # list[Movie] = Relation(edge_model=Rated)
edge_alias (str | None) –
When given, a named relationship variable is emitted in the pattern:
(u)-[r:RATED]->(m)
This enables filtering on edge properties via
.where(Rated.score > 4, on="r")and retrieving edge instances via.all_with_edges().optional (bool) –
True(default) →OPTIONAL MATCH(left-join; keeps source nodes that have no such relationship).False→MATCH(inner join; drops source nodes without a matching relationship).
- Returns:
Call
.alias("name")on the return value to complete the step.- Return type:
Examples
# Basic traversal q = session.query(User).alias("u") q = q.traverse(User.friends).alias("f") # Traversal with edge properties q = session.query(User).alias("u") q = q.traverse(User.rated, edge_alias="r").alias("m") q = q.where(Rated.score >= 4.0, on="r")
- repeat(relation_field, *, min_hops=1, max_hops=None, optional=False)[source]¶
Traverse a relation with variable-length path quantifier
*min..max.Generates a Cypher pattern like:
(p)-[:PARENT*1..5]->(ancestor:Person)
- Parameters:
relation_field (FieldDescriptor) – The
Relationfield to traverse repeatedly.min_hops (int) – Minimum number of hops (default
1).max_hops (int | None) – Maximum number of hops.
Nonemeans unbounded (*min..).optional (bool) –
False(default for repeat) — required traversal.True→OPTIONAL MATCH.
- Returns:
Call
.alias("name")to complete the step.- Return type:
Examples
# All ancestors up to depth 5 ancestors = ( session.query(Person) .alias("p") .where(Person.id == start_id) .repeat(Person.parent, min_hops=1, max_hops=5) .alias("anc") .all() ) # All reachable nodes (unbounded) reachable = ( session.query(Node) .alias("s") .repeat(Node.connected_to) .alias("t") .all() )
- with_(*aliases)[source]¶
Insert a
WITHclause to pipeline results between query stages.Use when you want to filter/aggregate in one stage before continuing a traversal in the next:
( session.query(User) .alias("u") .where(User.active == True) .with_("u") # WITH u .traverse(User.posts) .alias("p") .return_target("p") .all() )
- Parameters:
*aliases (str) – Cypher variable names to carry forward (e.g.
"u","f").- Return type:
QueryBuilder[T]
- order_by(field, *, desc=False)[source]¶
Add an
ORDER BYterm.- Parameters:
field (FieldDescriptor | str) – A field descriptor (
User.name) or a raw Cypher expression string ("n.created_at DESC").desc (bool) –
Truefor descending order (defaultFalse).
- Return type:
QueryBuilder[T]
Examples
q.order_by(User.age) # ORDER BY n.age ASC q.order_by(User.created_at, desc=True) # ORDER BY n.created_at DESC q.order_by("score ASC") # raw string
- limit(n)[source]¶
Set
LIMIT non the query.- Parameters:
n (int)
- Return type:
QueryBuilder[T]
- skip(n)[source]¶
Set
SKIP n(offset) on the query.- Parameters:
n (int)
- Return type:
QueryBuilder[T]
- distinct()[source]¶
Add
DISTINCTto theRETURNclause.- Return type:
QueryBuilder[T]
- return_target(alias)[source]¶
Set the single alias to return decoded Node instances from.
When a traversal is involved, this selects which alias’s nodes constitute the result of
.all():q.return_target("f") # returns f-nodes as list[FriendType]
- Parameters:
alias (str)
- Return type:
QueryBuilder[T]
- return_nodes(*aliases)[source]¶
Declare multiple node aliases to include in the
RETURNclause.Used with
return_edge()andall_with_edges()to return structured tuples:q.return_nodes("u", "m").return_edge("r").all_with_edges()
- Parameters:
aliases (str)
- Return type:
QueryBuilder[T]
- return_edge(alias)[source]¶
Declare an edge alias to include in the
RETURNclause.Requires that the traversal was created with
edge_alias=alias. The edge is decoded viadecode_edge()and included as the middle element of tuples returned byall_with_edges().- Parameters:
alias (str)
- Return type:
QueryBuilder[T]
- project(*fields)[source]¶
Return only specific property values (scalar projection).
Terminal method
.scalars()returns the projected values as a flat list;.all_rows()returns a list of dicts:# Scalar list names = session.query(User).project(User.name).scalars() # Dict list rows = session.query(User).project(User.name, User.age).all_rows()
- Parameters:
fields (FieldDescriptor | str)
- Return type:
QueryBuilder[T]
- aggregate(*agg_exprs, group_by=None)[source]¶
Add aggregation expressions to the
RETURNclause.- Parameters:
*agg_exprs (AggExpr) – One or more
AggExprinstances created by the helper functionscount(),avg(), etc.group_by (str | None) –
Alias to keep in the
RETURNclause alongside the aggregations (Cypher grouping is implicit — any non-aggregated return term acts as a GROUP BY key):.aggregate(count("*").as_("friend_count"), group_by="u") # RETURN u, count(*) AS friend_count
- Return type:
QueryBuilder[T]
Examples
from runic.orm.query import count, avg result = ( session.query(User) .alias("u") .traverse(User.friends) .aggregate(count("*").as_("friend_count"), group_by="u") .all_rows() # list[dict] with {"u": ..., "friend_count": int} ) avg_age = ( session.query(User).aggregate(avg(User.age).as_("average_age")).scalar() )
- build()[source]¶
Compile the accumulated builder state to a
(cypher, params)pair.This is the core compilation step; all terminal methods call it internally. You can also call it directly for debugging or to integrate with custom execution logic:
cypher, params = session.query(User).where(User.active == True).build() print(cypher) # MATCH (n:User) # WHERE n.active = $p0 # RETURN n
- all()[source]¶
Execute and return all matching Node instances.
The return type is the root class (or the alias set by
return_target()). Results are decoded and registered in the session identity map.- Returns:
Decoded Node instances of the root type (or target type when
return_target()was called).- Return type:
list[T]
- one()[source]¶
Execute and return the first matching Node instance, or
None.Internally calls
.limit(1).all()and returns the first element.- Return type:
T | None
- all_with_edges()[source]¶
Execute and return tuples of
(NodeA, EdgeModel, NodeB).Requires
return_nodes()to specify node aliases andreturn_edge()to specify the edge alias. The edge is decoded viadecode_edge().- Returns:
Each element is a tuple whose order matches the aliases given to
return_nodes()with the edge inserted at its position inreturn_edge().- Return type:
Example
rows = ( session.query(User) .alias("u") .traverse(User.rated, edge_alias="r") .alias("m") .return_nodes("u", "m") .return_edge("r") .all_with_edges() ) for user, rated_edge, movie in rows: print(f"{user.name} rated {movie.title} with {rated_edge.score}")
- all_rows()[source]¶
Execute and return raw column-keyed dicts.
Useful for multi-alias returns, aggregations, or scalar projections where mixed types are in the result set:
rows = q.aggregate(count("*").as_("n"), group_by="u").all_rows() # [{"u": <User>, "n": 5}, ...]
- count()[source]¶
Execute a
count(*)variant and return the integer count.Overrides any existing RETURN spec to emit
RETURN count(*). Ignoreslimit()andskip().- Return type:
- register_traversal(fd, source_alias, target_alias, *, optional, edge_alias, min_hops, max_hops)[source]¶
Append a MATCH clause for one traversal step and register aliases.
Called by
TraversalStep.alias()to complete a traversal step.- Parameters:
- Return type:
QueryBuilder[T]
- class runic.orm.query.specialised.AsyncQueryBuilder(session, root_cls)[source]¶
Bases:
QueryBuilder[T]Async variant of
QueryBuilderfor use withAsyncSession.All intermediate (chainable) methods are identical to the sync version. Only the terminal methods are replaced with
async defequivalents.Example
async with AsyncSession(graph) as session: users = await ( session.query(User) .where(User.active == True) .order_by(User.name) .limit(50) .all() )
- Parameters:
session (Any | None)
root_cls (type[T])
- class runic.orm.query.specialised.FulltextQueryBuilder(session, root_cls, query, fields=None)[source]¶
Bases:
QueryBuilder[T]QueryBuilder variant for FalkorDB fulltext search queries.
Constructed via
fulltext_search(). The root MATCH is replaced with aCALL db.idx.fulltext.queryNodes(...)invocation that uses the declared fulltext index.The fulltext index must have been created for the node’s label, e.g.:
class Post(Node, labels=["Post"]): title: str = Field(index_type="FULLTEXT")
Example
posts = ( session.fulltext_search(Post, query="graph databases", fields=["title"]) .where(Post.published == True) .order_by(Post.created_at, desc=True) .limit(20) .all() )
Cypher emitted:
CALL db.idx.fulltext.queryNodes('Post', $__fts_query) YIELD node AS n WHERE n.published = $p0 RETURN n ORDER BY n.created_at DESC LIMIT 20
- class runic.orm.query.specialised.VectorQueryBuilder(session, root_cls, field, vector, k)[source]¶
Bases:
QueryBuilder[T]QueryBuilder variant for vector KNN search.
Constructed via
vector_search(). Appends a KNN distance expression to the ORDER BY and RETURN clauses.The field must have
index_type="VECTOR"and an HNSW vector index must be created viaSchemaManager():class Document(Node, labels=["Document"]): embedding: Vector = Field(index_type="VECTOR")
Example
similar = ( session.vector_search( Document, field=Document.embedding, vector=[0.1, 0.2, 0.3], k=10, ) .where(Document.active == True) .all() )
Cypher emitted (FalkorDB KNN syntax):
MATCH (n:Document) WHERE n.active = $p0 RETURN n, vecf32(n.embedding) <-> vecf32($__knn_vec) AS __score ORDER BY __score ASC LIMIT 10
- Parameters:
session (Any)
root_cls (type[T])
field (FieldDescriptor)
k (int)
- class runic.orm.query.traversal.TraversalStep(builder, field_descriptor, source_alias, *, optional=True, edge_alias=None, min_hops=1, max_hops=1)[source]¶
Bases:
objectPending traversal hop; returned by
QueryBuilder.traverse()andQueryBuilder.repeat().Call
alias()to complete the step and resume the builder chain.- Parameters:
builder (QueryBuilder[Any]) – The owning
QueryBuilderinstance.field_descriptor (Any) – The
FieldDescriptorfor theRelationfield being traversed.source_alias (str) – The Cypher variable name of the source node.
optional (bool) – When
True(the default), the traversal emitsOPTIONAL MATCH(left-join: source nodes without the relationship are still returned). WhenFalse, emitsMATCH(inner-join: drops source nodes that have no such relationship).edge_alias (str | None) – Optional Cypher variable name for the relationship itself. When set, the generated pattern is
(src)-[edge_alias:TYPE]->(tgt)instead of the anonymous(src)-[:TYPE]->(tgt), enabling edge property filtering and retrieval.min_hops (int) – Minimum number of hops for variable-length paths (default
1). Values > 1 only take effect when combined with max_hops to produce a*min..maxquantifier.max_hops (int | None) – Maximum number of hops.
Nonemeans unbounded (*min..). A value of1withmin_hops=1produces a fixed single-hop pattern.
- alias(name)[source]¶
Register the target node alias and append the traversal to the builder.
Calling this method:
Resolves the target Node class from the Relation field’s
target.Appends the appropriate
(OPTIONAL) MATCHclause to the builder.Registers
name → target_clsin the builder’s alias map.Registers
edge_alias → Edge classif an edge alias was given.Sets the builder’s last alias (used as the default
RETURNtarget when no explicitreturn_target()is called).
- Parameters:
name (str) – Cypher variable name for the target node (e.g.
"f","m").- Returns:
The owning builder, ready for continued chaining.
- Return type:
- class runic.orm.query.expressions.Expr[source]¶
Bases:
objectAbstract base for all filter/compound/negated expressions.
Subclass instances are passed to
QueryBuilder.where(). They support&(AND),|(OR), and~(NOT) to build composite predicates:q.where((User.age > 18) & (User.active == True))
- class runic.orm.query.expressions.FilterExpr(cls, prop, op, value=None, alias=None, negate=False)[source]¶
Bases:
ExprA single WHERE predicate:
alias.prop OP $pN.Created automatically by the FieldDescriptor operator overloads; you do not normally instantiate this class directly.
- cls¶
The ORM Node or Edge subclass that owns the field. Used by the query builder to look up the Cypher variable (alias) for the class in the current query context.
- Type:
- op¶
A Cypher operator string:
"=","<>",">",">=","<","<=","CONTAINS","STARTS WITH","=~"(regex),"IN","IS NULL","IS NOT NULL".- Type:
- value¶
The Python value to compare against.
Nonefor null-check ops. TypeConverter.to_graph() is applied during Cypher compilation if the field has a converter;cypher_fnwraps the param reference.- Type:
Any
- alias¶
Optional explicit Cypher variable override (set via
on=inQueryBuilder.where()). WhenNone, the builder derives the alias from cls.- Type:
str | None
- negate¶
When
True, wraps the predicate inNOT (...). Used fornot_in_(); prefer the~operator for general negation.- Type:
- class runic.orm.query.expressions.CompoundExpr(op, operands=<factory>)[source]¶
Bases:
ExprAND / OR combination of multiple sub-expressions.
Example:
(User.age > 18) & (User.active == True) # → CompoundExpr(op="AND", operands=[FilterExpr(...), FilterExpr(...)])
- class runic.orm.query.expressions.NegatedExpr(operand)[source]¶
Bases:
ExprNOT wrapper around another expression.
Example:
~(User.banned == True) # → NegatedExpr(operand=FilterExpr(...))
- Parameters:
operand (Expr)
- class runic.orm.query.expressions.OrderExpr(alias, prop, raw=None, desc=False)[source]¶
Bases:
objectRepresents a single ORDER BY term.
Created by
QueryBuilder.order_by(); not usually instantiated directly.
- class runic.orm.query.expressions.AggExpr(func, field='*', result_alias=None, distinct=False)[source]¶
Bases:
objectAn aggregation function expression for use in RETURN clauses.
Created via the helper functions
count(),avg(),sum_(),min_(),max_(),collect().- field¶
A
FieldDescriptoror raw string ("*"forcount(*)).- Type:
Any
- runic.orm.query.expressions.count(field='*', *, distinct=False)[source]¶
Create a
count(...)aggregation expression.- Parameters:
- Return type:
Examples
q.aggregate(count()) # count(*) q.aggregate(count(User.name, distinct=True)) # count(DISTINCT n.name)
- runic.orm.query.expressions.avg(field)[source]¶
Create an
avg(...)aggregation expression.Example:
q.aggregate(avg(User.age).as_("average_age"))
- runic.orm.query.expressions.sum_(field)[source]¶
Create a
sum(...)aggregation expression.Example:
q.aggregate(sum_(Order.amount).as_("total"))