Session & Unit of Work

The Session (and its async twin AsyncSession) is the unit-of-work manager for FalkorDB. It owns all mutations, manages the identity map, and controls the flush/commit lifecycle.

See also

examples/orm/01_simple_crud.py

Session lifecycle, mutations, flush, commit, and rollback in a single runnable file.

examples/orm/04_pagination_and_custom_queries.py

session.execute() for raw write queries; custom repository methods; offset pagination.

Opening a session

from falkordb import FalkorDB
from runic.orm import Session

db = FalkorDB(host="localhost", port=6379)
graph = db.select_graph("myapp")

with Session(graph) as session:
    ...   # commit on success, rollback on exception

# --- Async ---
from runic.orm import AsyncSession

async with AsyncSession(graph) as session:
    ...

Mutations

All writes go through the Session, never the Repository.

from runic.orm import Session

with Session(graph) as session:
    # add: transient → pending; CREATE on flush
    session.add(entity)
    session.add_all([e1, e2])

    # update: set any field → _dirty = True; MERGE SET on flush
    entity.name = "New Name"

    # delete: persistent → deleted; DETACH DELETE on flush
    session.delete(entity)

    session.commit()    # flush + clear pending/deleted sets

Single-entity lookup

session.get() checks the identity map first, then queries the graph. Returns None if not found.

person = session.get(Person, "alice")
person_with_rels = session.get(Person, "alice", fetch=["company"])

Flush and commit

session.flush()     # execute writes; does not clear identity map
session.commit()    # flush + clear pending/deleted sets

FalkorDB transaction model

A single GRAPH.QUERY is fully atomic. flush() sends each pending entity as its own query. Entities with generated=True IDs must be flushed individually so the returned ID can be assigned before the next write.

rollback() discards the un-flushed pending/deleted sets only. Once flush() has executed queries, those writes are permanent.

Rollback

session = Session(graph)
try:
    session.add(Person(id="bob", name="Bob", email="bob@example.com"))
    session.rollback()   # discard pending; nothing written to graph
finally:
    session.close()

The context manager calls rollback() automatically on exception.

Expire and refresh

session.expire(entity)   # clear cached attrs; reloaded on next access
session.refresh(entity)  # immediate re-query from graph

Expunge

session.expunge(entity)   # remove from session → detached; no DB action
session.expunge_all()

Raw Cypher

session.execute() runs a Cypher query and returns a raw QueryResult. No entity mapping is applied.

result = session.execute(
    "MATCH (p:Person)-[:KNOWS]->(f:Person) WHERE p.id = $id RETURN f.name",
    {"id": "alice"},
)
for row in result.result_set:
    print(row[0])

# Write queries require write=True
session.execute(
    "MATCH (t:Trip {status: $old}) SET t.status = $new",
    {"old": "draft", "new": "archived"},
    write=True,
)

Session API summary

Method

Description

add(entity)

Transient/detached → pending

add_all([entities])

Batch add

delete(entity)

Persistent → deleted; DETACH DELETE on flush

get(EntityClass, pk, fetch=[])

Identity map check → graph query; None if not found

flush()

Execute pending/dirty/deleted sets; clear _dirty

commit()

flush() + clear pending/deleted sets

rollback()

Discard un-flushed pending/deleted sets; expire persistent entities

expire(entity)

Invalidate attribute cache; reloaded on next access

refresh(entity)

Immediate re-query from graph

expunge(entity)

Remove from session (→ detached); no graph action

expunge_all()

Expunge all tracked entities

execute(cypher, params, write)

Raw Cypher; returns QueryResult

close()

expunge_all() + release connection

Async parity

AsyncSession mirrors all of the above with async/await:

async with AsyncSession(graph) as session:
    repo = AsyncRepository(session, Trip)
    trips = await repo.find_all()
    for trip in trips:
        trip.status = "archived"
    await session.commit()

Note

Lazy loading is not available in AsyncSession__get__ cannot await. Use fetch=[...] on every read.

Connection management

ConnectionManager and AsyncConnectionManager wrap a FalkorDB graph handle for reuse across sessions:

from runic.orm import ConnectionManager

manager = ConnectionManager(graph)
with manager.session() as session:
    ...