Skip to content

Embedders

Embedder classes implement the BaseEmbedder ABC. Pass any embedder to Medha at construction time.

See the Embedders guide for comparison, install instructions, and usage examples.


FastEmbedAdapter

Bases: BaseEmbedder

Embedding adapter using Onnx Runtime via FastEmbed.

Supports any model available in the fastembed registry or custom HuggingFace models via ONNX export.

Parameters:

Name Type Description Default
model_name str

Model identifier. Defaults to "BAAI/bge-small-en-v1.5".

'BAAI/bge-small-en-v1.5'
max_length int

Maximum token length. Defaults to 512.

512
cache_dir str | None

Optional directory for model cache.

None

Raises:

Type Description
EmbeddingError

If the model cannot be loaded.

Source code in src/medha/embeddings/fastembed_adapter.py
class FastEmbedAdapter(BaseEmbedder):
    """Embedding adapter using Onnx Runtime via FastEmbed.

    Supports any model available in the fastembed registry or
    custom HuggingFace models via ONNX export.

    Args:
        model_name: Model identifier. Defaults to "BAAI/bge-small-en-v1.5".
        max_length: Maximum token length. Defaults to 512.
        cache_dir: Optional directory for model cache.

    Raises:
        EmbeddingError: If the model cannot be loaded.
    """

    def __init__(
        self,
        model_name: str = "BAAI/bge-small-en-v1.5",
        max_length: int = 512,
        cache_dir: str | None = None,
    ):
        if not _FASTEMBED_AVAILABLE:
            raise ImportError(
                "FastEmbed is required for FastEmbedAdapter. "
                "Install it with: pip install medha[fastembed]"
            )
        self._model_name = model_name
        self._max_length = max_length
        self._dimension: int | None = None
        self._model: TextEmbedding | None = None

        self._load_model(model_name, max_length, cache_dir)

    @property
    def dimension(self) -> int:
        if self._dimension is None:
            raise EmbeddingError(
                "Model dimension not available. The model may not have loaded correctly."
            )
        return self._dimension

    @property
    def model_name(self) -> str:
        return self._model_name

    async def aembed(self, text: str) -> list[float]:
        """Generate embedding for a single text.

        FastEmbed is synchronous under the hood; we run it in a thread
        to avoid blocking the event loop.
        """
        try:
            logger.debug("FastEmbed aembed: text_len=%d", len(text))
            result = await asyncio.to_thread(self._embed_sync, text)
            logger.debug("FastEmbed aembed: done, dim=%d", len(result))
            return result
        except EmbeddingError:
            raise
        except Exception as e:
            logger.error("FastEmbed aembed failed: %s", e)
            raise EmbeddingError(
                f"Failed to embed text with model '{self._model_name}': {e}"
            ) from e

    async def aembed_batch(self, texts: list[str], **kwargs: Any) -> list[list[float]]:
        """Generate embeddings for multiple texts.

        Uses fastembed's native batching for efficiency.
        """
        try:
            logger.debug("FastEmbed aembed_batch: %d texts", len(texts))
            results = await asyncio.to_thread(self._embed_batch_sync, texts)
            logger.debug("FastEmbed aembed_batch: done, %d vectors", len(results))
            return results
        except EmbeddingError:
            raise
        except Exception as e:
            logger.error("FastEmbed aembed_batch failed: %s", e)
            raise EmbeddingError(
                f"Failed to embed batch with model '{self._model_name}': {e}"
            ) from e

    def _embed_sync(self, text: str) -> list[float]:
        """Synchronous single-text embedding."""
        if self._model is None:
            raise EmbeddingError("Embedding model is not initialized.")
        embeddings = list(self._model.embed([text]))
        vector = embeddings[0]
        return vector.tolist() if hasattr(vector, "tolist") else list(vector)

    def _embed_batch_sync(self, texts: list[str]) -> list[list[float]]:
        """Synchronous batch embedding using native fastembed batching."""
        if self._model is None:
            raise EmbeddingError("Embedding model is not initialized.")
        embeddings = list(self._model.embed(texts))
        return [
            vec.tolist() if hasattr(vec, "tolist") else list(vec)
            for vec in embeddings
        ]

    def _load_model(
        self, model_name: str, max_length: int, cache_dir: str | None
    ) -> None:
        """Load or register the embedding model.

        Logic:
            1. Check if model_name is in fastembed's supported models list.
            2. If yes, load directly.
            3. If no, attempt to register as a custom HuggingFace ONNX model.
            4. Probe dimension by embedding a test string.

        Raises:
            EmbeddingError: If model loading fails.
        """
        try:
            supported_models = [
                m["model"] for m in TextEmbedding.list_supported_models()
            ]

            if model_name in supported_models:
                logger.info("Using registered model: %s", model_name)
                self._model = TextEmbedding(
                    model_name=model_name,
                    max_length=max_length,
                    cache_dir=cache_dir,
                )
            else:
                logger.info("Registering custom model: %s", model_name)
                TextEmbedding.add_custom_model(
                    model=model_name,
                    pooling=PoolingType.MEAN,
                    normalization=True,
                    sources=ModelSource(hf=model_name),
                    model_file="onnx/model.onnx",
                )
                self._model = TextEmbedding(
                    model_name=model_name,
                    max_length=max_length,
                    cache_dir=cache_dir,
                )

            # Probe dimension by embedding a test string
            probe = list(self._model.embed(["dimension probe"]))
            self._dimension = len(probe[0])
            logger.info(
                "Model '%s' loaded, dimension=%d", model_name, self._dimension
            )

        except Exception as e:
            raise EmbeddingError(
                f"Failed to load embedding model '{model_name}': {e}"
            ) from e

aembed(text) async

Generate embedding for a single text.

FastEmbed is synchronous under the hood; we run it in a thread to avoid blocking the event loop.

Source code in src/medha/embeddings/fastembed_adapter.py
async def aembed(self, text: str) -> list[float]:
    """Generate embedding for a single text.

    FastEmbed is synchronous under the hood; we run it in a thread
    to avoid blocking the event loop.
    """
    try:
        logger.debug("FastEmbed aembed: text_len=%d", len(text))
        result = await asyncio.to_thread(self._embed_sync, text)
        logger.debug("FastEmbed aembed: done, dim=%d", len(result))
        return result
    except EmbeddingError:
        raise
    except Exception as e:
        logger.error("FastEmbed aembed failed: %s", e)
        raise EmbeddingError(
            f"Failed to embed text with model '{self._model_name}': {e}"
        ) from e

aembed_batch(texts, **kwargs) async

Generate embeddings for multiple texts.

Uses fastembed's native batching for efficiency.

Source code in src/medha/embeddings/fastembed_adapter.py
async def aembed_batch(self, texts: list[str], **kwargs: Any) -> list[list[float]]:
    """Generate embeddings for multiple texts.

    Uses fastembed's native batching for efficiency.
    """
    try:
        logger.debug("FastEmbed aembed_batch: %d texts", len(texts))
        results = await asyncio.to_thread(self._embed_batch_sync, texts)
        logger.debug("FastEmbed aembed_batch: done, %d vectors", len(results))
        return results
    except EmbeddingError:
        raise
    except Exception as e:
        logger.error("FastEmbed aembed_batch failed: %s", e)
        raise EmbeddingError(
            f"Failed to embed batch with model '{self._model_name}': {e}"
        ) from e

OpenAIAdapter

Bases: BaseEmbedder

Embedding adapter using the OpenAI Embeddings API.

Parameters:

Name Type Description Default
model_name str

OpenAI model identifier. Defaults to "text-embedding-3-small".

'text-embedding-3-small'
api_key str | None

OpenAI API key. If None, reads from OPENAI_API_KEY env var.

None
dimensions int | None

Optional dimension override (for models that support it).

None

Raises:

Type Description
EmbeddingError

If the OpenAI client cannot be initialized.

Source code in src/medha/embeddings/openai_adapter.py
class OpenAIAdapter(BaseEmbedder):
    """Embedding adapter using the OpenAI Embeddings API.

    Args:
        model_name: OpenAI model identifier. Defaults to "text-embedding-3-small".
        api_key: OpenAI API key. If None, reads from OPENAI_API_KEY env var.
        dimensions: Optional dimension override (for models that support it).

    Raises:
        EmbeddingError: If the OpenAI client cannot be initialized.
    """

    _KNOWN_DIMENSIONS = {
        "text-embedding-3-small": 1536,
        "text-embedding-3-large": 3072,
        "text-embedding-ada-002": 1536,
    }

    def __init__(
        self,
        model_name: str = "text-embedding-3-small",
        api_key: str | None = None,
        dimensions: int | None = None,
    ):
        if not _OPENAI_AVAILABLE:
            raise ImportError(
                "OpenAI is required for OpenAIAdapter. "
                "Install it with: pip install medha[openai]"
            )
        self._model_name = model_name
        self._dimensions = dimensions
        self._client: AsyncOpenAI | None = None

        self._initialize_client(api_key)

    @property
    def dimension(self) -> int:
        if self._dimensions:
            return self._dimensions
        dim = self._KNOWN_DIMENSIONS.get(self._model_name)
        if dim is None:
            raise EmbeddingError(
                f"Unknown dimension for model '{self._model_name}'. "
                "Pass 'dimensions' explicitly."
            )
        return dim

    @property
    def model_name(self) -> str:
        return self._model_name

    async def aembed(self, text: str) -> list[float]:
        """Generate embedding via OpenAI API.

        Uses the async client for non-blocking operation.
        """
        if self._client is None:
            raise EmbeddingError(
                "OpenAI client is not initialized. Call _initialize_client() first."
            )
        try:
            logger.debug("OpenAI aembed: text_len=%d, model='%s'", len(text), self._model_name)
            kwargs: dict[str, Any] = {"input": text, "model": self._model_name}
            if self._dimensions is not None:
                kwargs["dimensions"] = self._dimensions
            response = await self._client.embeddings.create(**kwargs)
            logger.debug("OpenAI aembed: done, dim=%d", len(response.data[0].embedding))
            return list(response.data[0].embedding)
        except AuthenticationError as e:
            logger.error("OpenAI authentication failed: %s", e)
            raise EmbeddingError(
                f"OpenAI authentication failed: {e}"
            ) from e
        except RateLimitError as e:
            logger.warning("OpenAI rate limit exceeded: %s", e)
            raise EmbeddingError(
                f"OpenAI rate limit exceeded: {e}"
            ) from e
        except APIConnectionError as e:
            logger.error("OpenAI API connection error: %s", e)
            raise EmbeddingError(
                f"OpenAI API connection error: {e}"
            ) from e
        except Exception as e:
            logger.error("OpenAI aembed failed: %s", e)
            raise EmbeddingError(
                f"Failed to embed text with OpenAI model '{self._model_name}': {e}"
            ) from e

    async def aembed_batch(self, texts: list[str], **kwargs: Any) -> list[list[float]]:
        """Generate embeddings for a batch via OpenAI API.

        OpenAI's API natively supports batched input.
        Respects rate limits via the client's built-in retry logic.
        """
        if self._client is None:
            raise EmbeddingError(
                "OpenAI client is not initialized. Call _initialize_client() first."
            )
        try:
            logger.debug("OpenAI aembed_batch: %d texts, model='%s'", len(texts), self._model_name)
            kwargs: dict[str, Any] = {"input": texts, "model": self._model_name}
            if self._dimensions is not None:
                kwargs["dimensions"] = self._dimensions
            response = await self._client.embeddings.create(**kwargs)
            # Sort by index to preserve input order
            sorted_data = sorted(response.data, key=lambda x: x.index)
            logger.debug("OpenAI aembed_batch: done, %d vectors", len(sorted_data))
            return [item.embedding for item in sorted_data]
        except AuthenticationError as e:
            logger.error("OpenAI authentication failed: %s", e)
            raise EmbeddingError(
                f"OpenAI authentication failed: {e}"
            ) from e
        except RateLimitError as e:
            logger.warning("OpenAI rate limit exceeded: %s", e)
            raise EmbeddingError(
                f"OpenAI rate limit exceeded: {e}"
            ) from e
        except APIConnectionError as e:
            logger.error("OpenAI API connection error: %s", e)
            raise EmbeddingError(
                f"OpenAI API connection error: {e}"
            ) from e
        except Exception as e:
            logger.error("OpenAI aembed_batch failed: %s", e)
            raise EmbeddingError(
                f"Failed to embed batch with OpenAI model '{self._model_name}': {e}"
            ) from e

    def _initialize_client(self, api_key: str | None) -> None:
        """Initialize the async OpenAI client.

        Raises:
            EmbeddingError: If initialization fails.
        """
        try:
            self._client = AsyncOpenAI(api_key=api_key)
            logger.info("OpenAI client initialized for model '%s'", self._model_name)
        except Exception as e:
            raise EmbeddingError(
                f"Failed to initialize OpenAI client: {e}"
            ) from e

aembed(text) async

Generate embedding via OpenAI API.

Uses the async client for non-blocking operation.

Source code in src/medha/embeddings/openai_adapter.py
async def aembed(self, text: str) -> list[float]:
    """Generate embedding via OpenAI API.

    Uses the async client for non-blocking operation.
    """
    if self._client is None:
        raise EmbeddingError(
            "OpenAI client is not initialized. Call _initialize_client() first."
        )
    try:
        logger.debug("OpenAI aembed: text_len=%d, model='%s'", len(text), self._model_name)
        kwargs: dict[str, Any] = {"input": text, "model": self._model_name}
        if self._dimensions is not None:
            kwargs["dimensions"] = self._dimensions
        response = await self._client.embeddings.create(**kwargs)
        logger.debug("OpenAI aembed: done, dim=%d", len(response.data[0].embedding))
        return list(response.data[0].embedding)
    except AuthenticationError as e:
        logger.error("OpenAI authentication failed: %s", e)
        raise EmbeddingError(
            f"OpenAI authentication failed: {e}"
        ) from e
    except RateLimitError as e:
        logger.warning("OpenAI rate limit exceeded: %s", e)
        raise EmbeddingError(
            f"OpenAI rate limit exceeded: {e}"
        ) from e
    except APIConnectionError as e:
        logger.error("OpenAI API connection error: %s", e)
        raise EmbeddingError(
            f"OpenAI API connection error: {e}"
        ) from e
    except Exception as e:
        logger.error("OpenAI aembed failed: %s", e)
        raise EmbeddingError(
            f"Failed to embed text with OpenAI model '{self._model_name}': {e}"
        ) from e

aembed_batch(texts, **kwargs) async

Generate embeddings for a batch via OpenAI API.

OpenAI's API natively supports batched input. Respects rate limits via the client's built-in retry logic.

Source code in src/medha/embeddings/openai_adapter.py
async def aembed_batch(self, texts: list[str], **kwargs: Any) -> list[list[float]]:
    """Generate embeddings for a batch via OpenAI API.

    OpenAI's API natively supports batched input.
    Respects rate limits via the client's built-in retry logic.
    """
    if self._client is None:
        raise EmbeddingError(
            "OpenAI client is not initialized. Call _initialize_client() first."
        )
    try:
        logger.debug("OpenAI aembed_batch: %d texts, model='%s'", len(texts), self._model_name)
        kwargs: dict[str, Any] = {"input": texts, "model": self._model_name}
        if self._dimensions is not None:
            kwargs["dimensions"] = self._dimensions
        response = await self._client.embeddings.create(**kwargs)
        # Sort by index to preserve input order
        sorted_data = sorted(response.data, key=lambda x: x.index)
        logger.debug("OpenAI aembed_batch: done, %d vectors", len(sorted_data))
        return [item.embedding for item in sorted_data]
    except AuthenticationError as e:
        logger.error("OpenAI authentication failed: %s", e)
        raise EmbeddingError(
            f"OpenAI authentication failed: {e}"
        ) from e
    except RateLimitError as e:
        logger.warning("OpenAI rate limit exceeded: %s", e)
        raise EmbeddingError(
            f"OpenAI rate limit exceeded: {e}"
        ) from e
    except APIConnectionError as e:
        logger.error("OpenAI API connection error: %s", e)
        raise EmbeddingError(
            f"OpenAI API connection error: {e}"
        ) from e
    except Exception as e:
        logger.error("OpenAI aembed_batch failed: %s", e)
        raise EmbeddingError(
            f"Failed to embed batch with OpenAI model '{self._model_name}': {e}"
        ) from e

CohereAdapter

Bases: BaseEmbedder

Embedding adapter using the Cohere Embed API v2.

Parameters:

Name Type Description Default
api_key str

Cohere API key.

required
model str

Cohere embedding model. Defaults to "embed-multilingual-v3.0".

'embed-multilingual-v3.0'
input_type_query str

Input type used for query embeddings.

'search_query'
input_type_document str

Input type used for document embeddings.

'search_document'
embedding_types list[str] | None

Optional list of embedding types to request.

None

Raises:

Type Description
ConfigurationError

If the cohere package is not installed.

Source code in src/medha/embeddings/cohere_adapter.py
class CohereAdapter(BaseEmbedder):
    """Embedding adapter using the Cohere Embed API v2.

    Args:
        api_key: Cohere API key.
        model: Cohere embedding model. Defaults to "embed-multilingual-v3.0".
        input_type_query: Input type used for query embeddings.
        input_type_document: Input type used for document embeddings.
        embedding_types: Optional list of embedding types to request.

    Raises:
        ConfigurationError: If the cohere package is not installed.
    """

    def __init__(
        self,
        api_key: str,
        model: str = "embed-multilingual-v3.0",
        input_type_query: str = "search_query",
        input_type_document: str = "search_document",
        embedding_types: list[str] | None = None,
    ) -> None:
        if not HAS_COHERE:
            raise ConfigurationError(
                "Cohere is required for CohereAdapter. "
                "Install it with: pip install medha[cohere]"
            )
        self._model = model
        self._input_type_query = input_type_query
        self._input_type_document = input_type_document
        self._embedding_types = embedding_types
        self._dimension: int | None = None
        self._client: cohere.AsyncClientV2 = cohere.AsyncClientV2(api_key=api_key)
        logger.info("CohereAdapter initialized with model '%s'", model)

    @property
    def dimension(self) -> int:
        if self._dimension is None:
            raise RuntimeError(
                "Dimension not available. Call aembed() or aembed_batch() first."
            )
        return self._dimension

    @property
    def model_name(self) -> str:
        return self._model

    async def aembed(self, text: str) -> list[float]:
        """Generate a query embedding via Cohere API."""
        try:
            logger.debug("CohereAdapter aembed: text_len=%d", len(text))
            kwargs: dict[str, Any] = {
                "texts": [text],
                "model": self._model,
                "input_type": self._input_type_query,
            }
            if self._embedding_types is not None:
                kwargs["embedding_types"] = self._embedding_types
            response = await self._client.embed(**kwargs)
            vector = list(response.embeddings.float_[0])
            self._dimension = len(vector)
            logger.debug("CohereAdapter aembed: done, dim=%d", self._dimension)
            return vector
        except (ConfigurationError, RuntimeError):
            raise
        except Exception as e:
            logger.error("CohereAdapter aembed failed: %s", e)
            raise EmbeddingError(f"Cohere aembed failed: {e}") from e

    async def aembed_batch(self, texts: list[str], **kwargs: Any) -> list[list[float]]:
        """Generate embeddings for multiple texts.

        Args:
            texts: List of texts to embed.
            **kwargs: Pass ``is_document=True`` to use the document input type.
        """
        is_document: bool = kwargs.get("is_document", False)
        input_type = self._input_type_document if is_document else self._input_type_query
        try:
            logger.debug(
                "CohereAdapter aembed_batch: %d texts, is_document=%s", len(texts), is_document
            )
            call_kwargs: dict[str, Any] = {
                "texts": texts,
                "model": self._model,
                "input_type": input_type,
            }
            if self._embedding_types is not None:
                call_kwargs["embedding_types"] = self._embedding_types
            response = await self._client.embed(**call_kwargs)
            vectors = [list(v) for v in response.embeddings.float_]
            if vectors:
                self._dimension = len(vectors[0])
            logger.debug("CohereAdapter aembed_batch: done, %d vectors", len(vectors))
            return vectors
        except (ConfigurationError, RuntimeError):
            raise
        except Exception as e:
            logger.error("CohereAdapter aembed_batch failed: %s", e)
            raise EmbeddingError(f"Cohere aembed_batch failed: {e}") from e

aembed(text) async

Generate a query embedding via Cohere API.

Source code in src/medha/embeddings/cohere_adapter.py
async def aembed(self, text: str) -> list[float]:
    """Generate a query embedding via Cohere API."""
    try:
        logger.debug("CohereAdapter aembed: text_len=%d", len(text))
        kwargs: dict[str, Any] = {
            "texts": [text],
            "model": self._model,
            "input_type": self._input_type_query,
        }
        if self._embedding_types is not None:
            kwargs["embedding_types"] = self._embedding_types
        response = await self._client.embed(**kwargs)
        vector = list(response.embeddings.float_[0])
        self._dimension = len(vector)
        logger.debug("CohereAdapter aembed: done, dim=%d", self._dimension)
        return vector
    except (ConfigurationError, RuntimeError):
        raise
    except Exception as e:
        logger.error("CohereAdapter aembed failed: %s", e)
        raise EmbeddingError(f"Cohere aembed failed: {e}") from e

aembed_batch(texts, **kwargs) async

Generate embeddings for multiple texts.

Parameters:

Name Type Description Default
texts list[str]

List of texts to embed.

required
**kwargs Any

Pass is_document=True to use the document input type.

{}
Source code in src/medha/embeddings/cohere_adapter.py
async def aembed_batch(self, texts: list[str], **kwargs: Any) -> list[list[float]]:
    """Generate embeddings for multiple texts.

    Args:
        texts: List of texts to embed.
        **kwargs: Pass ``is_document=True`` to use the document input type.
    """
    is_document: bool = kwargs.get("is_document", False)
    input_type = self._input_type_document if is_document else self._input_type_query
    try:
        logger.debug(
            "CohereAdapter aembed_batch: %d texts, is_document=%s", len(texts), is_document
        )
        call_kwargs: dict[str, Any] = {
            "texts": texts,
            "model": self._model,
            "input_type": input_type,
        }
        if self._embedding_types is not None:
            call_kwargs["embedding_types"] = self._embedding_types
        response = await self._client.embed(**call_kwargs)
        vectors = [list(v) for v in response.embeddings.float_]
        if vectors:
            self._dimension = len(vectors[0])
        logger.debug("CohereAdapter aembed_batch: done, %d vectors", len(vectors))
        return vectors
    except (ConfigurationError, RuntimeError):
        raise
    except Exception as e:
        logger.error("CohereAdapter aembed_batch failed: %s", e)
        raise EmbeddingError(f"Cohere aembed_batch failed: {e}") from e

GeminiAdapter

Bases: BaseEmbedder

Embedding adapter using the Google Gemini Embedding API.

Parameters:

Name Type Description Default
api_key str

Google AI API key.

required
model str

Gemini embedding model. Defaults to "models/text-embedding-004".

'models/text-embedding-004'
task_type_query str

Task type for query embeddings.

'RETRIEVAL_QUERY'
task_type_document str

Task type for document embeddings.

'RETRIEVAL_DOCUMENT'
output_dimensionality int | None

Optional dimension truncation.

None

Raises:

Type Description
ConfigurationError

If the google-genai package is not installed.

Source code in src/medha/embeddings/gemini_adapter.py
class GeminiAdapter(BaseEmbedder):
    """Embedding adapter using the Google Gemini Embedding API.

    Args:
        api_key: Google AI API key.
        model: Gemini embedding model. Defaults to "models/text-embedding-004".
        task_type_query: Task type for query embeddings.
        task_type_document: Task type for document embeddings.
        output_dimensionality: Optional dimension truncation.

    Raises:
        ConfigurationError: If the google-genai package is not installed.
    """

    def __init__(
        self,
        api_key: str,
        model: str = "models/text-embedding-004",
        task_type_query: str = "RETRIEVAL_QUERY",
        task_type_document: str = "RETRIEVAL_DOCUMENT",
        output_dimensionality: int | None = None,
    ) -> None:
        if not HAS_GEMINI:
            raise ConfigurationError(
                "google-genai is required for GeminiAdapter. "
                "Install it with: pip install medha[gemini]"
            )
        self._model = model
        self._task_type_query = task_type_query
        self._task_type_document = task_type_document
        self._output_dimensionality = output_dimensionality
        self._dimension: int | None = None
        self._client = genai.Client(api_key=api_key)
        logger.info("GeminiAdapter initialized with model '%s'", model)

    @property
    def dimension(self) -> int:
        if self._dimension is None:
            raise RuntimeError(
                "Dimension not available. Call aembed() or aembed_batch() first."
            )
        return self._dimension

    @property
    def model_name(self) -> str:
        return self._model

    async def aembed(self, text: str) -> list[float]:
        """Generate a query embedding via Gemini API (via thread to avoid blocking)."""
        try:
            logger.debug("GeminiAdapter aembed: text_len=%d", len(text))
            config = genai_types.EmbedContentConfig(
                task_type=self._task_type_query,
                output_dimensionality=self._output_dimensionality,
            )
            result = await asyncio.to_thread(
                self._client.models.embed_content,
                model=self._model,
                contents=text,
                config=config,
            )
            vector = list(result.embeddings[0].values)
            self._dimension = len(vector)
            logger.debug("GeminiAdapter aembed: done, dim=%d", self._dimension)
            return vector
        except (ConfigurationError, RuntimeError):
            raise
        except Exception as e:
            logger.error("GeminiAdapter aembed failed: %s", e)
            raise EmbeddingError(f"Gemini aembed failed: {e}") from e

    async def aembed_batch(self, texts: list[str], **kwargs: Any) -> list[list[float]]:
        """Generate embeddings for multiple texts in chunks of 100.

        Args:
            texts: List of texts to embed.
            **kwargs: Pass ``is_document=True`` to use the document task type.
        """
        is_document: bool = kwargs.get("is_document", False)
        task_type = self._task_type_document if is_document else self._task_type_query
        try:
            logger.debug(
                "GeminiAdapter aembed_batch: %d texts, is_document=%s", len(texts), is_document
            )
            chunks = [
                texts[i : i + _GEMINI_CHUNK_SIZE]
                for i in range(0, len(texts), _GEMINI_CHUNK_SIZE)
            ]

            async def _embed_chunk(chunk: list[str]) -> list[list[float]]:
                config = genai_types.EmbedContentConfig(
                    task_type=task_type,
                    output_dimensionality=self._output_dimensionality,
                )
                result = await asyncio.to_thread(
                    self._client.models.embed_content,
                    model=self._model,
                    contents=chunk,
                    config=config,
                )
                return [list(e.values) for e in result.embeddings]

            chunk_results: list[list[list[float]]] = await asyncio.gather(
                *[_embed_chunk(chunk) for chunk in chunks]
            )
            vectors = [vec for chunk_vecs in chunk_results for vec in chunk_vecs]
            if vectors:
                self._dimension = len(vectors[0])
            logger.debug("GeminiAdapter aembed_batch: done, %d vectors", len(vectors))
            return vectors
        except (ConfigurationError, RuntimeError):
            raise
        except Exception as e:
            logger.error("GeminiAdapter aembed_batch failed: %s", e)
            raise EmbeddingError(f"Gemini aembed_batch failed: {e}") from e

aembed(text) async

Generate a query embedding via Gemini API (via thread to avoid blocking).

Source code in src/medha/embeddings/gemini_adapter.py
async def aembed(self, text: str) -> list[float]:
    """Generate a query embedding via Gemini API (via thread to avoid blocking)."""
    try:
        logger.debug("GeminiAdapter aembed: text_len=%d", len(text))
        config = genai_types.EmbedContentConfig(
            task_type=self._task_type_query,
            output_dimensionality=self._output_dimensionality,
        )
        result = await asyncio.to_thread(
            self._client.models.embed_content,
            model=self._model,
            contents=text,
            config=config,
        )
        vector = list(result.embeddings[0].values)
        self._dimension = len(vector)
        logger.debug("GeminiAdapter aembed: done, dim=%d", self._dimension)
        return vector
    except (ConfigurationError, RuntimeError):
        raise
    except Exception as e:
        logger.error("GeminiAdapter aembed failed: %s", e)
        raise EmbeddingError(f"Gemini aembed failed: {e}") from e

aembed_batch(texts, **kwargs) async

Generate embeddings for multiple texts in chunks of 100.

Parameters:

Name Type Description Default
texts list[str]

List of texts to embed.

required
**kwargs Any

Pass is_document=True to use the document task type.

{}
Source code in src/medha/embeddings/gemini_adapter.py
async def aembed_batch(self, texts: list[str], **kwargs: Any) -> list[list[float]]:
    """Generate embeddings for multiple texts in chunks of 100.

    Args:
        texts: List of texts to embed.
        **kwargs: Pass ``is_document=True`` to use the document task type.
    """
    is_document: bool = kwargs.get("is_document", False)
    task_type = self._task_type_document if is_document else self._task_type_query
    try:
        logger.debug(
            "GeminiAdapter aembed_batch: %d texts, is_document=%s", len(texts), is_document
        )
        chunks = [
            texts[i : i + _GEMINI_CHUNK_SIZE]
            for i in range(0, len(texts), _GEMINI_CHUNK_SIZE)
        ]

        async def _embed_chunk(chunk: list[str]) -> list[list[float]]:
            config = genai_types.EmbedContentConfig(
                task_type=task_type,
                output_dimensionality=self._output_dimensionality,
            )
            result = await asyncio.to_thread(
                self._client.models.embed_content,
                model=self._model,
                contents=chunk,
                config=config,
            )
            return [list(e.values) for e in result.embeddings]

        chunk_results: list[list[list[float]]] = await asyncio.gather(
            *[_embed_chunk(chunk) for chunk in chunks]
        )
        vectors = [vec for chunk_vecs in chunk_results for vec in chunk_vecs]
        if vectors:
            self._dimension = len(vectors[0])
        logger.debug("GeminiAdapter aembed_batch: done, %d vectors", len(vectors))
        return vectors
    except (ConfigurationError, RuntimeError):
        raise
    except Exception as e:
        logger.error("GeminiAdapter aembed_batch failed: %s", e)
        raise EmbeddingError(f"Gemini aembed_batch failed: {e}") from e