Session & Unit of Work¶
The Session (and its async twin
AsyncSession) is the unit-of-work
manager for FalkorDB. It owns all mutations, manages the identity map, and
controls the flush/commit lifecycle.
See also
- examples/orm/01_simple_crud.py
Session lifecycle, mutations, flush, commit, and rollback in a single runnable file.
- examples/orm/04_pagination_and_custom_queries.py
session.execute()for raw write queries; custom repository methods; offset pagination.
Opening a session¶
from falkordb import FalkorDB
from runic.orm import Session
db = FalkorDB(host="localhost", port=6379)
graph = db.select_graph("myapp")
with Session(graph) as session:
... # commit on success, rollback on exception
# --- Async ---
from runic.orm import AsyncSession
async with AsyncSession(graph) as session:
...
Mutations¶
All writes go through the Session, never the Repository.
from runic.orm import Session
with Session(graph) as session:
# add: transient → pending; CREATE on flush
session.add(entity)
session.add_all([e1, e2])
# update: set any field → _dirty = True; MERGE SET on flush
entity.name = "New Name"
# delete: persistent → deleted; DETACH DELETE on flush
session.delete(entity)
session.commit() # flush + clear pending/deleted sets
Single-entity lookup¶
session.get() checks the identity map first, then queries the graph.
Returns None if not found.
person = session.get(Person, "alice")
person_with_rels = session.get(Person, "alice", fetch=["company"])
Flush and commit¶
session.flush() # execute writes; does not clear identity map
session.commit() # flush + clear pending/deleted sets
FalkorDB transaction model¶
A single GRAPH.QUERY is fully atomic. flush() sends each pending
entity as its own query. Entities with generated=True IDs must be
flushed individually so the returned ID can be assigned before the next
write.
rollback() discards the un-flushed pending/deleted sets only. Once
flush() has executed queries, those writes are permanent.
Rollback¶
session = Session(graph)
try:
session.add(Person(id="bob", name="Bob", email="bob@example.com"))
session.rollback() # discard pending; nothing written to graph
finally:
session.close()
The context manager calls rollback() automatically on exception.
Expire and refresh¶
session.expire(entity) # clear cached attrs; reloaded on next access
session.refresh(entity) # immediate re-query from graph
Expunge¶
session.expunge(entity) # remove from session → detached; no DB action
session.expunge_all()
Raw Cypher¶
session.execute() runs a Cypher query and returns a raw
QueryResult. No entity mapping is applied.
result = session.execute(
"MATCH (p:Person)-[:KNOWS]->(f:Person) WHERE p.id = $id RETURN f.name",
{"id": "alice"},
)
for row in result.result_set:
print(row[0])
# Write queries require write=True
session.execute(
"MATCH (t:Trip {status: $old}) SET t.status = $new",
{"old": "draft", "new": "archived"},
write=True,
)
Session API summary¶
Method |
Description |
|---|---|
|
Transient/detached → pending |
|
Batch |
|
Persistent → deleted; |
|
Identity map check → graph query; |
|
Execute pending/dirty/deleted sets; clear |
|
|
|
Discard un-flushed pending/deleted sets; expire persistent entities |
|
Invalidate attribute cache; reloaded on next access |
|
Immediate re-query from graph |
|
Remove from session (→ detached); no graph action |
|
Expunge all tracked entities |
|
Raw Cypher; returns |
|
|
Async parity¶
AsyncSession mirrors all of the
above with async/await:
async with AsyncSession(graph) as session:
repo = AsyncRepository(session, Trip)
trips = await repo.find_all()
for trip in trips:
trip.status = "archived"
await session.commit()
Note
Lazy loading is not available in AsyncSession — __get__
cannot await. Use fetch=[...] on every read.
Connection management¶
ConnectionManager and
AsyncConnectionManager wrap a
FalkorDB graph handle for reuse across sessions:
from runic.orm import ConnectionManager
manager = ConnectionManager(graph)
with manager.session() as session:
...