Source code for runic.ogm.driver.falkordb

"""FalkorDB driver, dialect, and result wrappers."""

from __future__ import annotations

from typing import TYPE_CHECKING, Any

if TYPE_CHECKING:
    from runic.ogm.core.descriptors import FieldInfo


class FalkorDBNode:
    """Wraps a ``falkordb.Node`` to conform to ``GraphNode``."""

    __slots__ = ("_raw",)

    def __init__(self, raw: Any) -> None:
        self._raw = raw

    @property
    def element_id(self) -> Any:
        return self._raw.id

    @property
    def labels(self) -> list[str]:
        return list(self._raw.labels)

    @property
    def properties(self) -> dict[str, Any]:
        return self._raw.properties


class FalkorDBEdge:
    """Wraps a ``falkordb.Edge`` to conform to ``GraphEdge``."""

    __slots__ = ("_raw",)

    def __init__(self, raw: Any) -> None:
        self._raw = raw

    @property
    def type(self) -> str:
        return self._raw.type

    @property
    def properties(self) -> dict[str, Any]:
        return self._raw.properties


class FalkorDBResult:
    """Wraps a ``falkordb.QueryResult`` to conform to ``GraphResult``."""

    __slots__ = ("_raw",)

    def __init__(self, raw: Any) -> None:
        self._raw = raw

    @property
    def rows(self) -> list[list[Any]]:
        return self._raw.result_set

    @property
    def columns(self) -> list[str]:
        header = getattr(self._raw, "header", None) or []
        cols: list[str] = []
        for col in header:
            if isinstance(col, (list, tuple)) and len(col) >= 2:
                cols.append(str(col[1]))
            else:
                cols.append(str(col))
        return cols


[docs] class FalkorDBDialect: """Strategy implementation for FalkorDB-specific Cypher generation.""" # FalkorDB only supports directed edges; undirected MERGE/CREATE is rejected. supports_undirected_merge: bool = False def generated_id_where(self, alias: str, param: str) -> str: return f"WHERE id({alias}) = toInteger(${param})" def cypher_fn_for_field(self, fi: FieldInfo) -> str | None: if fi.field.converter is not None: fn = getattr(fi.field.converter, "cypher_fn", None) if fn: return fn if fi.field.interned: return "intern" return None def fulltext_call(self, label: str, alias: str, query_param: str) -> str: return ( f"CALL db.idx.fulltext.queryNodes('{label}', ${query_param}) " f"YIELD node AS {alias}" ) def vector_knn_start( self, alias: str, labels_str: str, type_name: str, # noqa: ARG002 field_name: str, # noqa: ARG002 ) -> str: return f"MATCH ({alias}:{labels_str})" def vector_knn_score_expr(self, alias: str, field_name: str) -> str: return f"vecf32({alias}.{field_name}) <-> vecf32($__knn_vec) AS __score" def wrap_node(self, raw: Any) -> FalkorDBNode: return FalkorDBNode(raw) def wrap_edge(self, raw: Any) -> FalkorDBEdge: return FalkorDBEdge(raw)
_DIALECT = FalkorDBDialect()
[docs] class FalkorDBDriver: """Sync driver wrapping a FalkorDB graph handle.""" def __init__(self, graph: Any, db: Any = None) -> None: self._graph = graph self._db = db
[docs] def falkordb_connection(self) -> tuple[Any, Any]: """Return (db, graph) for use by the FalkorDB migration adapter.""" db = self._db if self._db is not None else self._graph.connection return db, self._graph
@property def dialect(self) -> FalkorDBDialect: return _DIALECT def execute(self, cypher: str, params: dict[str, Any]) -> FalkorDBResult: return FalkorDBResult(self._graph.query(cypher, params)) def close(self) -> None: pass
[docs] class AsyncFalkorDBDriver: """Async driver wrapping an async FalkorDB graph handle.""" def __init__(self, graph: Any) -> None: self._graph = graph @property def dialect(self) -> FalkorDBDialect: return _DIALECT async def execute(self, cypher: str, params: dict[str, Any]) -> FalkorDBResult: return FalkorDBResult(await self._graph.query(cypher, params)) async def close(self) -> None: pass
[docs] def create_falkordb_driver(host: str, port: int, graph: str) -> FalkorDBDriver: """Create a :class:`FalkorDBDriver` from connection parameters. Parameters ---------- host: FalkorDB host name or IP address. port: FalkorDB port (default Redis port is 6379). graph: Name of the graph to select. """ from falkordb import FalkorDB db = FalkorDB(host=host, port=port) return FalkorDBDriver(db.select_graph(graph), db)