Async Guide¶
runic.ogm ships an async-native session —
AsyncSession — that is a direct
parallel of Session. Use it wherever you
need async/await throughout your application stack.
Important
Only FalkorDB has an async driver. All other backends (ArcadeDB, Neo4j, Memgraph, Apache AGE) are sync-only. See the Supported Drivers page for the full feature matrix.
Opening an async session¶
AsyncSession accepts an
AsyncGraphDriver. For FalkorDB, build one from an
async FalkorDB graph handle:
import asyncio
from falkordb import FalkorDB
from runic.ogm import AsyncSession
from runic.ogm.driver.falkordb import AsyncFalkorDBDriver
async def main() -> None:
db = FalkorDB(host="localhost", port=6379)
graph = db.select_graph("myapp")
driver = AsyncFalkorDBDriver(graph)
async with AsyncSession(driver) as session:
... # commit on success, rollback on exception
asyncio.run(main())
Note
There is no create_async_falkordb_driver factory. Build the handle
directly and pass it to AsyncFalkorDBDriver.
Which methods are coroutines¶
add(), add_all(), and delete() stage mutations in memory and are
synchronous. Every method that touches the database is a coroutine:
Method |
Type |
Notes |
|---|---|---|
|
sync |
Stages an insert; no I/O |
|
sync |
Stages a batch insert; no I/O |
|
sync |
Stages a deletion; no I/O |
|
coroutine |
Flush + clear tracking |
|
coroutine |
Write pending changes; keep tracking |
|
coroutine |
Discard un-flushed pending/deleted sets |
|
coroutine |
Identity-map check → graph query |
|
coroutine |
Execute statement; return |
|
coroutine |
Execute statement; return |
|
coroutine |
Execute statement; return |
|
coroutine |
Execute statement; return |
|
coroutine |
Execute statement; return |
|
coroutine |
Raw Cypher |
|
coroutine |
Re-query entity from graph |
|
coroutine |
Create an edge (MERGE semantics) |
|
coroutine |
Delete an edge |
|
coroutine |
Expunge all + release connection |
No lazy loading¶
AsyncSession cannot perform lazy
loading — the __get__ descriptor is synchronous and cannot await a
query. Accessing a relationship attribute that hasn’t been loaded raises
LazyLoadError.
Always design every read to supply the data you need up front:
async with AsyncSession(driver) as session:
# BAD — accessing .articles will raise LazyLoadError
user = await session.get(User, "alice")
print(user.articles) # LazyLoadError
# GOOD — eager-fetch with fetch=
user = await session.get(User, "alice", fetch=["articles"])
for article in user.articles: # already loaded
print(article.title)
# GOOD — traversal query for a collection
from runic.ogm import select
stmt = (
select(User).where(User.id == "alice").alias("u")
.traverse(User.articles).alias("a")
.return_target()
)
articles = await session.scalars(stmt)
Async CRUD¶
from runic.ogm import AsyncSession, select
from runic.ogm.driver.falkordb import AsyncFalkorDBDriver
async with AsyncSession(driver) as session:
# Create
session.add(User(id="alice", name="Alice", email="alice@example.com"))
await session.commit()
# Read — always use fetch= for related data
user = await session.get(User, "alice", fetch=["articles"])
# Update
user.name = "Alice Smith"
await session.commit()
# Delete
session.delete(user)
await session.commit()
Async querying¶
Composable statements work identically in async — only the execution step is awaited:
from runic.ogm import select
stmt = (
select(Article)
.where(Article.published == True)
.order_by(Article.published_at, desc=True)
.limit(20)
)
articles = await session.scalars(stmt)
total = await session.count(stmt)
For projections and aggregations, use all_rows() instead of scalars():
from runic.ogm.query import count
stmt = (
select(User)
.aggregate(count("*").as_("total"), group_by="n.city")
)
rows = await session.all_rows(stmt)
# [{"n.city": "Berlin", "total": 3}, ...]
Async repositories¶
AsyncRepository wraps an
AsyncSession:
from runic.ogm import AsyncRepository
async with AsyncSession(driver) as session:
repo = AsyncRepository(session, User)
users = await repo.find_all(skip=0, limit=20)
count = await repo.count()
exists = await repo.exists("alice")
Subclass for domain queries:
class UserRepository(AsyncRepository[User]):
async def active_in_region(self, region: str) -> list[User]:
return await (
self.query()
.where((User.active == True) & (User.region == region))
.order_by(User.name)
.all()
)
Connection management (async)¶
AsyncConnectionManager wraps a
FalkorDB graph handle for reuse across sessions:
from runic.ogm import AsyncConnectionManager
manager = AsyncConnectionManager(async_graph_handle)
async with manager.session() as session:
...
Create the manager once at application startup; share it across request handlers rather than creating a new driver per request.
Testing async code¶
Use embedded FalkorDB (redislite) with pytest-asyncio:
# conftest.py
import pytest
import pytest_asyncio
from redislite import FalkorDB
from runic.ogm.driver.falkordb import AsyncFalkorDBDriver
@pytest_asyncio.fixture
async def async_driver():
db = FalkorDB(protocol=2)
graph = db.select_graph("test_async")
driver = AsyncFalkorDBDriver(graph)
yield driver
# test_myfeature.py
import pytest
@pytest.mark.asyncio
async def test_create_user(async_driver):
from runic.ogm import AsyncSession
async with AsyncSession(async_driver) as session:
session.add(User(id="u1", name="Alice", email="a@example.com"))
await session.commit()
user = await session.get(User, "u1")
assert user is not None
See also
Test your OGM code — embedded FalkorDB setup for sync tests; gotchas around unique graph names.
Summary of async gotchas¶
Lazy loading raises
LazyLoadError— always usefetch=[]or a traversal query.Detached entity access also raises
DetachedEntityErrorafter the session closes. Load related data before closing.add()/add_all()/delete()are sync — call them withoutawait.All query builder methods called via
session.query()return async builders —await qb.all().fulltext_search()andvector_search()return async builders too.
See also
Read and write data — full sync session reference and API summary