"""BoltDriver for neo4j-compatible databases (ArcadeDB, Neo4j) via neo4j Python driver."""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from runic.orm.driver import GraphDialect
log = logging.getLogger(__name__)
class BoltNode:
"""Wraps a ``neo4j.Node`` to conform to ``GraphNode``."""
__slots__ = ("_raw",)
def __init__(self, raw: Any) -> None:
self._raw = raw
@property
def element_id(self) -> Any:
# Use deprecated .id (int) for compatibility with ArcadeDB Bolt and neo4j <5
return self._raw.id
@property
def labels(self) -> list[str]:
return list(self._raw.labels)
@property
def properties(self) -> dict[str, Any]:
return dict(self._raw)
class BoltEdge:
"""Wraps a ``neo4j.Relationship`` to conform to ``GraphEdge``."""
__slots__ = ("_raw",)
def __init__(self, raw: Any) -> None:
self._raw = raw
@property
def type(self) -> str:
return self._raw.type
@property
def properties(self) -> dict[str, Any]:
return dict(self._raw)
class BoltResult:
"""Eagerly-collected Bolt query result conforming to ``GraphResult``."""
__slots__ = ("_columns", "_rows")
def __init__(self, rows: list[list[Any]], columns: list[str]) -> None:
self._rows = rows
self._columns = columns
@property
def rows(self) -> list[list[Any]]:
return self._rows
@property
def columns(self) -> list[str]:
return self._columns
[docs]
class BoltDriver:
"""Sync Bolt driver for ArcadeDB, Neo4j, or any Bolt-compatible graph DB.
Supports explicit ACID transactions via :class:`~runic.orm.driver.TransactionalGraphDriver`.
When no transaction is active, each ``execute()`` call opens its own Bolt
session (auto-commit semantics). Call ``begin()`` to start a transaction
that spans multiple ``execute()`` calls, then ``commit()`` or ``rollback()``
to end it.
The ORM :class:`~runic.orm.session.session.Session` drives this lifecycle
automatically via lazy-begin: the first query inside a Session opens a
transaction; ``Session.commit()`` / ``Session.rollback()`` close it.
"""
def __init__(
self,
uri: str,
auth: tuple[str, str],
database: str,
dialect: GraphDialect,
*,
encrypted: bool = True,
) -> None:
import neo4j
# NOTE: neo4j Python driver >= 5 removed the `encrypted=` kwarg; TLS is controlled by the URI scheme.
if encrypted and uri.startswith("bolt://"):
uri = uri.replace("bolt://", "bolt+s://", 1)
elif not encrypted and uri.startswith(("bolt+s://", "bolt+ssc://")):
uri = uri.replace("bolt+s://", "bolt://", 1).replace(
"bolt+ssc://", "bolt://", 1
)
self._uri = uri
self._auth = auth
self._neo4j_driver = neo4j.GraphDatabase.driver(uri, auth=auth)
self._database = database
self._dialect = dialect
# Active transaction state (None when outside a transaction)
self._bolt_session: Any = None
self._tx: Any = None
@property
def uri(self) -> str:
return self._uri
@property
def auth(self) -> tuple[str, str]:
return self._auth
@property
def dialect(self) -> GraphDialect:
return self._dialect
# ------------------------------------------------------------------
# Transaction support (TransactionalGraphDriver)
# ------------------------------------------------------------------
[docs]
def begin(self) -> None:
"""Open a Bolt session and begin an explicit transaction.
Raises ``RuntimeError`` if a transaction is already active.
"""
if self._tx is not None:
raise RuntimeError(
"BoltDriver: transaction already active; "
"call commit() or rollback() before beginning a new one."
)
self._bolt_session = self._neo4j_driver.session(database=self._database)
self._tx = self._bolt_session.begin_transaction()
log.debug("BoltDriver: begun transaction")
[docs]
def commit(self) -> None:
"""Commit the active transaction and release the Bolt session.
No-op when no transaction is active.
"""
if self._tx is None:
return
try:
self._tx.commit()
log.debug("BoltDriver: transaction committed")
finally:
self._tx.close()
self._tx = None
if self._bolt_session is not None:
self._bolt_session.close()
self._bolt_session = None
[docs]
def rollback(self) -> None:
"""Roll back the active transaction and release the Bolt session.
No-op when no transaction is active.
"""
if self._tx is None:
return
try:
self._tx.rollback()
log.debug("BoltDriver: transaction rolled back")
finally:
self._tx.close()
self._tx = None
if self._bolt_session is not None:
self._bolt_session.close()
self._bolt_session = None
# ------------------------------------------------------------------
# Execute
# ------------------------------------------------------------------
def execute(self, cypher: str, params: dict[str, Any]) -> BoltResult:
if self._tx is not None:
# Within an explicit transaction — use the active tx.
result = self._tx.run(cypher, params)
columns = list(result.keys())
rows = [list(record.values()) for record in result]
log.debug("BoltDriver executed (tx); %d row(s)", len(rows))
return BoltResult(rows, columns)
# No active transaction — open a per-query auto-commit session.
with self._neo4j_driver.session(database=self._database) as session:
result = session.run(cypher, params) # ty: ignore[invalid-argument-type]
columns = list(result.keys())
rows = [list(record.values()) for record in result]
log.debug("BoltDriver executed (auto); %d row(s)", len(rows))
return BoltResult(rows, columns)
def close(self) -> None:
self._neo4j_driver.close()
@classmethod
def from_params(
cls,
host: str,
port: int,
database: str,
username: str,
password: str,
dialect: GraphDialect,
*,
encrypted: bool = True,
) -> BoltDriver:
uri = f"bolt://{host}:{port}"
return cls(uri, (username, password), database, dialect, encrypted=encrypted)