Skip to content

Interfaces (ABCs)

Medha is built around three abstract base classes. Implement these to add new backends, embedders, or L1 cache implementations.


BaseEmbedder

BaseEmbedder defines the contract for any embedding provider. Implement embed() at minimum; override embed_batch() for efficiency.

from medha.interfaces.embedder import BaseEmbedder

class MyEmbedder(BaseEmbedder):
    async def embed(self, text: str) -> list[float]:
        ...

    async def embed_batch(self, texts: list[str]) -> list[list[float]]:
        return [await self.embed(t) for t in texts]

Bases: ABC

Abstract base class for all embedding providers.

Source code in src/medha/interfaces/embedder.py
class BaseEmbedder(ABC):
    """Abstract base class for all embedding providers."""

    @property
    @abstractmethod
    def dimension(self) -> int:
        """Return the vector dimension (e.g. 384, 768, 1536)."""
        ...

    @property
    @abstractmethod
    def model_name(self) -> str:
        """Return the model identifier for logging/debugging."""
        ...

    @abstractmethod
    async def aembed(self, text: str) -> list[float]:
        """Generate an embedding for a single text string.

        Args:
            text: Input text to embed. Must be non-empty.

        Returns:
            A list of floats with length == self.dimension.

        Raises:
            EmbeddingError: If the embedding generation fails.
        """
        ...

    @abstractmethod
    async def aembed_batch(self, texts: list[str], **kwargs: Any) -> list[list[float]]:
        """Generate embeddings for multiple texts.

        Args:
            texts: List of non-empty strings.

        Returns:
            A list of embeddings, one per input text, each with length == self.dimension.
            Order is preserved: result[i] corresponds to texts[i].

        Raises:
            EmbeddingError: If any embedding generation fails.
        """
        ...

    # --- Sync convenience wrappers ---

    def embed(self, text: str) -> list[float]:
        """Synchronous wrapper for aembed."""
        return self._run_sync(self.aembed(text))

    def embed_batch(self, texts: list[str], **kwargs: Any) -> list[list[float]]:
        """Synchronous wrapper for aembed_batch."""
        return self._run_sync(self.aembed_batch(texts, **kwargs))

    @staticmethod
    def _run_sync(coro: Coroutine[Any, Any, _T]) -> _T:
        """Run an async coroutine synchronously.

        Handles the case where an event loop is already running
        (e.g., inside Jupyter notebooks).
        """
        try:
            asyncio.get_running_loop()
        except RuntimeError:
            return asyncio.run(coro)
        else:
            # If a loop is running (Jupyter, etc.), use thread
            import concurrent.futures
            with concurrent.futures.ThreadPoolExecutor() as pool:
                return pool.submit(asyncio.run, coro).result()

dimension abstractmethod property

Return the vector dimension (e.g. 384, 768, 1536).

model_name abstractmethod property

Return the model identifier for logging/debugging.

aembed(text) abstractmethod async

Generate an embedding for a single text string.

Parameters:

Name Type Description Default
text str

Input text to embed. Must be non-empty.

required

Returns:

Type Description
list[float]

A list of floats with length == self.dimension.

Raises:

Type Description
EmbeddingError

If the embedding generation fails.

Source code in src/medha/interfaces/embedder.py
@abstractmethod
async def aembed(self, text: str) -> list[float]:
    """Generate an embedding for a single text string.

    Args:
        text: Input text to embed. Must be non-empty.

    Returns:
        A list of floats with length == self.dimension.

    Raises:
        EmbeddingError: If the embedding generation fails.
    """
    ...

aembed_batch(texts, **kwargs) abstractmethod async

Generate embeddings for multiple texts.

Parameters:

Name Type Description Default
texts list[str]

List of non-empty strings.

required

Returns:

Type Description
list[list[float]]

A list of embeddings, one per input text, each with length == self.dimension.

list[list[float]]

Order is preserved: result[i] corresponds to texts[i].

Raises:

Type Description
EmbeddingError

If any embedding generation fails.

Source code in src/medha/interfaces/embedder.py
@abstractmethod
async def aembed_batch(self, texts: list[str], **kwargs: Any) -> list[list[float]]:
    """Generate embeddings for multiple texts.

    Args:
        texts: List of non-empty strings.

    Returns:
        A list of embeddings, one per input text, each with length == self.dimension.
        Order is preserved: result[i] corresponds to texts[i].

    Raises:
        EmbeddingError: If any embedding generation fails.
    """
    ...

embed(text)

Synchronous wrapper for aembed.

Source code in src/medha/interfaces/embedder.py
def embed(self, text: str) -> list[float]:
    """Synchronous wrapper for aembed."""
    return self._run_sync(self.aembed(text))

embed_batch(texts, **kwargs)

Synchronous wrapper for aembed_batch.

Source code in src/medha/interfaces/embedder.py
def embed_batch(self, texts: list[str], **kwargs: Any) -> list[list[float]]:
    """Synchronous wrapper for aembed_batch."""
    return self._run_sync(self.aembed_batch(texts, **kwargs))

VectorStorageBackend

VectorStorageBackend defines the contract for any vector store. Implement all abstract methods, then register your class in medha/backends/__init__.py.

from medha.interfaces.storage import VectorStorageBackend

class MyBackend(VectorStorageBackend):
    async def initialize(self, collection: str, dimension: int) -> None: ...
    async def upsert(self, entries: list[CacheEntry]) -> None: ...
    async def query(self, vector: list[float], top_k: int) -> list[tuple[CacheEntry, float]]: ...
    async def delete(self, entry_ids: list[str]) -> None: ...
    async def count(self) -> int: ...
    async def close(self) -> None: ...

Bases: ABC

Abstract base class for vector storage backends.

Source code in src/medha/interfaces/storage.py
class VectorStorageBackend(ABC):
    """Abstract base class for vector storage backends."""

    @abstractmethod
    async def initialize(self, collection_name: str, dimension: int, **kwargs: Any) -> None:
        """Set up the storage backend (create collection, indexes, quantization).

        This method is idempotent: calling it twice with the same arguments
        must not raise or duplicate data.

        Args:
            collection_name: Name of the vector collection.
            dimension: Vector dimensionality (must match the embedder).
            **kwargs: Backend-specific configuration (quantization, HNSW params, etc.).

        Raises:
            StorageInitializationError: If setup fails.
        """
        ...

    @abstractmethod
    async def search(
        self,
        collection_name: str,
        vector: list[float],
        limit: int = 5,
        score_threshold: float = 0.0,
    ) -> list[CacheResult]:
        """Search for similar vectors.

        Args:
            collection_name: Collection to search.
            vector: Query vector.
            limit: Max number of results.
            score_threshold: Minimum similarity score (0.0 - 1.0).

        Returns:
            List of CacheResult, sorted by descending score.

        Raises:
            StorageError: If the search fails.
        """
        ...

    @abstractmethod
    async def upsert(self, collection_name: str, entries: list[CacheEntry]) -> None:
        """Insert or update cache entries.

        Args:
            collection_name: Target collection.
            entries: List of CacheEntry objects to upsert.

        Raises:
            StorageError: If the upsert fails.
        """
        ...

    @abstractmethod
    async def scroll(
        self,
        collection_name: str,
        limit: int = 100,
        offset: str | None = None,
        with_vectors: bool = False,
    ) -> tuple[list[CacheResult], str | None]:
        """Iterate over all points in a collection.

        Used by fuzzy search (Tier 4) and admin operations.

        Args:
            collection_name: Collection to scroll.
            limit: Batch size per scroll.
            offset: Pagination token from a previous scroll.
            with_vectors: Whether to include vectors in results.

        Returns:
            Tuple of (results, next_offset). next_offset is None when done.

        Raises:
            StorageError: If the scroll fails.
        """
        ...

    @abstractmethod
    async def count(self, collection_name: str) -> int:
        """Return the number of points in a collection.

        Raises:
            StorageError: If the count fails.
        """
        ...

    @abstractmethod
    async def delete(self, collection_name: str, ids: list[str]) -> None:
        """Delete points by ID.

        Args:
            collection_name: Target collection.
            ids: List of point IDs to delete.

        Raises:
            StorageError: If the delete fails.
        """
        ...

    @abstractmethod
    async def find_expired(self, collection_name: str) -> list[str]:
        """Return IDs of entries with expires_at < now(UTC).

        Raises:
            StorageError: If the query fails.
        """
        ...

    @abstractmethod
    async def search_by_normalized_question(
        self, collection_name: str, normalized_question: str
    ) -> CacheResult | None:
        """Find a single entry by exact normalized_question match.

        Returns:
            CacheResult if found, None otherwise.
        """
        ...

    @abstractmethod
    async def find_by_query_hash(
        self, collection_name: str, query_hash: str
    ) -> list[str]:
        """Return all point IDs whose payload.query_hash matches *query_hash*.

        Returns:
            List of string IDs (may be empty).
        """
        ...

    @abstractmethod
    async def find_by_template_id(
        self, collection_name: str, template_id: str
    ) -> list[str]:
        """Return all point IDs whose payload.template_id matches *template_id*.

        Returns:
            List of string IDs (may be empty).
        """
        ...

    @abstractmethod
    async def drop_collection(self, collection_name: str) -> None:
        """Permanently delete the entire collection and all its data.

        Raises:
            StorageError: If the drop fails.
        """
        ...

    @abstractmethod
    async def close(self) -> None:
        """Release resources (close connections, etc.)."""
        ...

    async def connect(self) -> None:
        """Establish connection. No-op for backends that don't require it."""
        return

    # --- Context manager support ---

    async def __aenter__(self) -> VectorStorageBackend:
        return self

    async def __aexit__(
        self,
        exc_type: type[BaseException] | None,
        exc_val: BaseException | None,
        exc_tb: object,
    ) -> bool:
        await self.close()
        return False

initialize(collection_name, dimension, **kwargs) abstractmethod async

Set up the storage backend (create collection, indexes, quantization).

This method is idempotent: calling it twice with the same arguments must not raise or duplicate data.

Parameters:

Name Type Description Default
collection_name str

Name of the vector collection.

required
dimension int

Vector dimensionality (must match the embedder).

required
**kwargs Any

Backend-specific configuration (quantization, HNSW params, etc.).

{}

Raises:

Type Description
StorageInitializationError

If setup fails.

Source code in src/medha/interfaces/storage.py
@abstractmethod
async def initialize(self, collection_name: str, dimension: int, **kwargs: Any) -> None:
    """Set up the storage backend (create collection, indexes, quantization).

    This method is idempotent: calling it twice with the same arguments
    must not raise or duplicate data.

    Args:
        collection_name: Name of the vector collection.
        dimension: Vector dimensionality (must match the embedder).
        **kwargs: Backend-specific configuration (quantization, HNSW params, etc.).

    Raises:
        StorageInitializationError: If setup fails.
    """
    ...

search(collection_name, vector, limit=5, score_threshold=0.0) abstractmethod async

Search for similar vectors.

Parameters:

Name Type Description Default
collection_name str

Collection to search.

required
vector list[float]

Query vector.

required
limit int

Max number of results.

5
score_threshold float

Minimum similarity score (0.0 - 1.0).

0.0

Returns:

Type Description
list[CacheResult]

List of CacheResult, sorted by descending score.

Raises:

Type Description
StorageError

If the search fails.

Source code in src/medha/interfaces/storage.py
@abstractmethod
async def search(
    self,
    collection_name: str,
    vector: list[float],
    limit: int = 5,
    score_threshold: float = 0.0,
) -> list[CacheResult]:
    """Search for similar vectors.

    Args:
        collection_name: Collection to search.
        vector: Query vector.
        limit: Max number of results.
        score_threshold: Minimum similarity score (0.0 - 1.0).

    Returns:
        List of CacheResult, sorted by descending score.

    Raises:
        StorageError: If the search fails.
    """
    ...

upsert(collection_name, entries) abstractmethod async

Insert or update cache entries.

Parameters:

Name Type Description Default
collection_name str

Target collection.

required
entries list[CacheEntry]

List of CacheEntry objects to upsert.

required

Raises:

Type Description
StorageError

If the upsert fails.

Source code in src/medha/interfaces/storage.py
@abstractmethod
async def upsert(self, collection_name: str, entries: list[CacheEntry]) -> None:
    """Insert or update cache entries.

    Args:
        collection_name: Target collection.
        entries: List of CacheEntry objects to upsert.

    Raises:
        StorageError: If the upsert fails.
    """
    ...

scroll(collection_name, limit=100, offset=None, with_vectors=False) abstractmethod async

Iterate over all points in a collection.

Used by fuzzy search (Tier 4) and admin operations.

Parameters:

Name Type Description Default
collection_name str

Collection to scroll.

required
limit int

Batch size per scroll.

100
offset str | None

Pagination token from a previous scroll.

None
with_vectors bool

Whether to include vectors in results.

False

Returns:

Type Description
tuple[list[CacheResult], str | None]

Tuple of (results, next_offset). next_offset is None when done.

Raises:

Type Description
StorageError

If the scroll fails.

Source code in src/medha/interfaces/storage.py
@abstractmethod
async def scroll(
    self,
    collection_name: str,
    limit: int = 100,
    offset: str | None = None,
    with_vectors: bool = False,
) -> tuple[list[CacheResult], str | None]:
    """Iterate over all points in a collection.

    Used by fuzzy search (Tier 4) and admin operations.

    Args:
        collection_name: Collection to scroll.
        limit: Batch size per scroll.
        offset: Pagination token from a previous scroll.
        with_vectors: Whether to include vectors in results.

    Returns:
        Tuple of (results, next_offset). next_offset is None when done.

    Raises:
        StorageError: If the scroll fails.
    """
    ...

count(collection_name) abstractmethod async

Return the number of points in a collection.

Raises:

Type Description
StorageError

If the count fails.

Source code in src/medha/interfaces/storage.py
@abstractmethod
async def count(self, collection_name: str) -> int:
    """Return the number of points in a collection.

    Raises:
        StorageError: If the count fails.
    """
    ...

delete(collection_name, ids) abstractmethod async

Delete points by ID.

Parameters:

Name Type Description Default
collection_name str

Target collection.

required
ids list[str]

List of point IDs to delete.

required

Raises:

Type Description
StorageError

If the delete fails.

Source code in src/medha/interfaces/storage.py
@abstractmethod
async def delete(self, collection_name: str, ids: list[str]) -> None:
    """Delete points by ID.

    Args:
        collection_name: Target collection.
        ids: List of point IDs to delete.

    Raises:
        StorageError: If the delete fails.
    """
    ...

find_expired(collection_name) abstractmethod async

Return IDs of entries with expires_at < now(UTC).

Raises:

Type Description
StorageError

If the query fails.

Source code in src/medha/interfaces/storage.py
@abstractmethod
async def find_expired(self, collection_name: str) -> list[str]:
    """Return IDs of entries with expires_at < now(UTC).

    Raises:
        StorageError: If the query fails.
    """
    ...

search_by_normalized_question(collection_name, normalized_question) abstractmethod async

Find a single entry by exact normalized_question match.

Returns:

Type Description
CacheResult | None

CacheResult if found, None otherwise.

Source code in src/medha/interfaces/storage.py
@abstractmethod
async def search_by_normalized_question(
    self, collection_name: str, normalized_question: str
) -> CacheResult | None:
    """Find a single entry by exact normalized_question match.

    Returns:
        CacheResult if found, None otherwise.
    """
    ...

find_by_query_hash(collection_name, query_hash) abstractmethod async

Return all point IDs whose payload.query_hash matches query_hash.

Returns:

Type Description
list[str]

List of string IDs (may be empty).

Source code in src/medha/interfaces/storage.py
@abstractmethod
async def find_by_query_hash(
    self, collection_name: str, query_hash: str
) -> list[str]:
    """Return all point IDs whose payload.query_hash matches *query_hash*.

    Returns:
        List of string IDs (may be empty).
    """
    ...

find_by_template_id(collection_name, template_id) abstractmethod async

Return all point IDs whose payload.template_id matches template_id.

Returns:

Type Description
list[str]

List of string IDs (may be empty).

Source code in src/medha/interfaces/storage.py
@abstractmethod
async def find_by_template_id(
    self, collection_name: str, template_id: str
) -> list[str]:
    """Return all point IDs whose payload.template_id matches *template_id*.

    Returns:
        List of string IDs (may be empty).
    """
    ...

drop_collection(collection_name) abstractmethod async

Permanently delete the entire collection and all its data.

Raises:

Type Description
StorageError

If the drop fails.

Source code in src/medha/interfaces/storage.py
@abstractmethod
async def drop_collection(self, collection_name: str) -> None:
    """Permanently delete the entire collection and all its data.

    Raises:
        StorageError: If the drop fails.
    """
    ...

close() abstractmethod async

Release resources (close connections, etc.).

Source code in src/medha/interfaces/storage.py
@abstractmethod
async def close(self) -> None:
    """Release resources (close connections, etc.)."""
    ...

connect() async

Establish connection. No-op for backends that don't require it.

Source code in src/medha/interfaces/storage.py
async def connect(self) -> None:
    """Establish connection. No-op for backends that don't require it."""
    return

L1CacheBackend

L1CacheBackend defines the contract for the fast in-process cache layer (Tier 0 of the waterfall).

from medha.interfaces.l1_cache import L1CacheBackend

class MyL1Cache(L1CacheBackend):
    async def get(self, key: str) -> CacheEntry | None: ...
    async def set(self, key: str, entry: CacheEntry, ttl: int | None = None) -> None: ...
    async def delete(self, key: str) -> None: ...
    async def clear(self) -> None: ...

Bases: ABC

Interface for L1 (fast-lookup) cache backends.

L1 cache sits in front of the vector backend and provides sub-millisecond responses for recently seen questions. The default implementation is in-memory (InMemoryL1Cache); a Redis-backed implementation (RedisL1Cache) enables sharing the cache across multiple service instances in a horizontally-scaled deployment.

Source code in src/medha/interfaces/l1_cache.py
class L1CacheBackend(ABC):
    """Interface for L1 (fast-lookup) cache backends.

    L1 cache sits in front of the vector backend and provides sub-millisecond
    responses for recently seen questions.  The default implementation is
    in-memory (``InMemoryL1Cache``); a Redis-backed implementation
    (``RedisL1Cache``) enables sharing the cache across multiple service
    instances in a horizontally-scaled deployment.
    """

    @abstractmethod
    async def get(self, key: str) -> CacheHit | None:
        """Return the cached hit for *key*, or ``None`` on a miss."""
        ...

    @abstractmethod
    async def set(self, key: str, value: CacheHit) -> None:
        """Store *value* under *key*.  Implementations handle eviction internally."""
        ...

    @abstractmethod
    async def clear(self) -> None:
        """Remove all entries from the cache."""
        ...

    @abstractmethod
    async def invalidate(self, key: str) -> None:
        """Remove a single entry by *key*. No-op if key is absent."""
        ...

    async def invalidate_all(self) -> None:
        """Remove all entries. Delegates to :meth:`clear` by default."""
        await self.clear()

    @property
    @abstractmethod
    def size(self) -> int:
        """Current number of entries.  May be approximate for distributed backends."""
        ...

    async def close(self) -> None:
        """Release any resources held by this backend. No-op by default."""
        return

size abstractmethod property

Current number of entries. May be approximate for distributed backends.

get(key) abstractmethod async

Return the cached hit for key, or None on a miss.

Source code in src/medha/interfaces/l1_cache.py
@abstractmethod
async def get(self, key: str) -> CacheHit | None:
    """Return the cached hit for *key*, or ``None`` on a miss."""
    ...

set(key, value) abstractmethod async

Store value under key. Implementations handle eviction internally.

Source code in src/medha/interfaces/l1_cache.py
@abstractmethod
async def set(self, key: str, value: CacheHit) -> None:
    """Store *value* under *key*.  Implementations handle eviction internally."""
    ...

clear() abstractmethod async

Remove all entries from the cache.

Source code in src/medha/interfaces/l1_cache.py
@abstractmethod
async def clear(self) -> None:
    """Remove all entries from the cache."""
    ...

invalidate(key) abstractmethod async

Remove a single entry by key. No-op if key is absent.

Source code in src/medha/interfaces/l1_cache.py
@abstractmethod
async def invalidate(self, key: str) -> None:
    """Remove a single entry by *key*. No-op if key is absent."""
    ...

invalidate_all() async

Remove all entries. Delegates to :meth:clear by default.

Source code in src/medha/interfaces/l1_cache.py
async def invalidate_all(self) -> None:
    """Remove all entries. Delegates to :meth:`clear` by default."""
    await self.clear()

close() async

Release any resources held by this backend. No-op by default.

Source code in src/medha/interfaces/l1_cache.py
async def close(self) -> None:
    """Release any resources held by this backend. No-op by default."""
    return