Session & Unit of Work¶
The Session (and its async twin
AsyncSession) is the unit-of-work
manager for Cypher-based graph databases. It owns all mutations, manages
the identity map, and controls the flush/commit lifecycle.
See also
- examples/orm/01_simple_crud.py
Session lifecycle, mutations, flush, commit, and rollback in a single runnable file.
- examples/orm/04_pagination_and_custom_queries.py
session.execute()for raw write queries; custom repository methods; offset pagination.
Opening a session¶
Session accepts a GraphDriver (or
AsyncGraphDriver for the async variant).
Use the helpers in runic.ogm.driver to build one:
from runic.ogm import Session, create_driver
# FalkorDB
driver = create_driver("falkordb", host="localhost", port=6379, graph="myapp")
with Session(driver) as session:
... # commit on success, rollback on exception
# ArcadeDB (via Bolt)
driver = create_driver(
"arcadedb",
host="localhost", port=7687, database="mydb",
username="root", password="playwithdata",
)
with Session(driver) as session:
...
Mutations¶
All writes go through the Session, never the Repository.
from runic.ogm import Session
with Session(driver) as session:
# add: transient → pending; CREATE on flush
session.add(entity)
session.add_all([e1, e2])
# update: set any field → _dirty = True; MERGE SET on flush
entity.name = "New Name"
# delete: persistent → deleted; DETACH DELETE on flush
session.delete(entity)
session.commit() # flush + clear pending/deleted sets
Single-entity lookup¶
session.get() checks the identity map first, then queries the graph.
Returns None if not found.
person = session.get(Person, "alice")
person_with_rels = session.get(Person, "alice", fetch=["company"])
Flush and commit¶
session.flush() # execute writes; does not clear identity map
session.commit() # flush + clear pending/deleted sets
Transaction model¶
Each flush() sends each pending entity as its own query. Entities
with generated=True IDs must be flushed individually so the returned
ID can be assigned before the next write.
rollback() discards the un-flushed pending/deleted sets only. Once
flush() has executed queries, those writes are permanent.
Rollback¶
session = Session(driver)
try:
session.add(Person(id="bob", name="Bob", email="bob@example.com"))
session.rollback() # discard pending; nothing written to graph
finally:
session.close()
The context manager calls rollback() automatically on exception.
Expire and refresh¶
session.expire(entity) # clear cached attrs; reloaded on next access
session.refresh(entity) # immediate re-query from graph
Expunge¶
session.expunge(entity) # remove from session → detached; no DB action
session.expunge_all()
Composable statement execution¶
select() creates a
QueryBuilder that is not bound to a
session. Build the statement freely — including conditional filters — then
pass it to one of the session execution methods:
from runic.ogm import select
stmt = select(Person).where(Person.active == True)
if min_age > 0:
stmt = stmt.where(Person.age >= min_age)
# All five execution methods accept a QueryBuilder
people: list[Person] = session.scalars(stmt)
person: Person | None = session.scalar(stmt)
n: int = session.count(stmt)
rows: list[dict] = session.all_rows(stmt)
# Async sessions accept the same stmt
people = await async_session.scalars(stmt)
The same stmt object is reusable — execute it multiple times, against
different sessions if needed. Each execution restores the session binding to
None afterwards.
Method |
Returns |
|---|---|
|
|
|
|
|
|
|
|
|
|
Tip
session.query(Person).where(...).all() is still fully supported.
Prefer select() when you need to compose the query across multiple
code paths before executing.
Raw Cypher¶
For the common cases prefer the query builder.
session.execute() is the escape hatch for write mutations and Cypher
features not covered by the builder.
from runic.ogm import select
# Prefer select() + session.scalars() for reads
stmt = (
select(Person)
.where(Person.id == "alice")
.alias("p")
.traverse(Person.knows).alias("f")
)
friends: list[Person] = session.scalars(stmt)
# Write mutations (SET, REMOVE, …) require session.execute(write=True)
session.execute(
"MATCH (t:Trip {status: $old}) SET t.status = $new",
{"old": "draft", "new": "archived"},
write=True,
)
Session API summary¶
Method |
Description |
|---|---|
|
Transient/detached → pending |
|
Batch |
|
Persistent → deleted; |
|
Identity map check → graph query; |
|
Execute pending/dirty/deleted sets; clear |
|
|
|
Discard un-flushed pending/deleted sets; expire persistent entities |
|
Invalidate attribute cache; reloaded on next access |
|
Immediate re-query from graph |
|
Remove from session (→ detached); no graph action |
|
Expunge all tracked entities |
|
Execute a |
|
Execute a statement; return first |
|
Execute a statement; return row count as |
|
Execute a statement; return |
|
Execute a statement; return |
|
Raw Cypher; returns |
|
|
Async parity¶
AsyncSession mirrors all of the
above with async/await:
async with AsyncSession(AsyncFalkorDBDriver(graph)) as session:
repo = AsyncRepository(session, Trip)
trips = await repo.find_all()
for trip in trips:
trip.status = "archived"
await session.commit()
Note
Lazy loading is not available in AsyncSession — __get__
cannot await. Use fetch=[...] on every read.
Connection management¶
ConnectionManager and
AsyncConnectionManager wrap a
FalkorDB graph handle for reuse across sessions:
from runic.ogm import ConnectionManager
manager = ConnectionManager(graph)
with manager.session() as session:
...