Spaces:
Runtime error
Runtime error
File size: 3,047 Bytes
ed4d993 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
from __future__ import annotations
from typing import Any, Dict, Iterator, List
from urllib.parse import urlparse
from langchain_core.embeddings import Embeddings
from langchain_core.pydantic_v1 import BaseModel, PrivateAttr
def _chunk(texts: List[str], size: int) -> Iterator[List[str]]:
for i in range(0, len(texts), size):
yield texts[i : i + size]
class MlflowEmbeddings(Embeddings, BaseModel):
"""Embedding LLMs in MLflow.
To use, you should have the `mlflow[genai]` python package installed.
For more information, see https://mlflow.org/docs/latest/llms/deployments.
Example:
.. code-block:: python
from langchain_community.embeddings import MlflowEmbeddings
embeddings = MlflowEmbeddings(
target_uri="http://localhost:5000",
endpoint="embeddings",
)
"""
endpoint: str
"""The endpoint to use."""
target_uri: str
"""The target URI to use."""
_client: Any = PrivateAttr()
"""The parameters to use for queries."""
query_params: Dict[str, str] = {}
"""The parameters to use for documents."""
documents_params: Dict[str, str] = {}
def __init__(self, **kwargs: Any):
super().__init__(**kwargs)
self._validate_uri()
try:
from mlflow.deployments import get_deploy_client
self._client = get_deploy_client(self.target_uri)
except ImportError as e:
raise ImportError(
"Failed to create the client. "
f"Please run `pip install mlflow{self._mlflow_extras}` to install "
"required dependencies."
) from e
@property
def _mlflow_extras(self) -> str:
return "[genai]"
def _validate_uri(self) -> None:
if self.target_uri == "databricks":
return
allowed = ["http", "https", "databricks"]
if urlparse(self.target_uri).scheme not in allowed:
raise ValueError(
f"Invalid target URI: {self.target_uri}. "
f"The scheme must be one of {allowed}."
)
def embed(self, texts: List[str], params: Dict[str, str]) -> List[List[float]]:
embeddings: List[List[float]] = []
for txt in _chunk(texts, 20):
resp = self._client.predict(
endpoint=self.endpoint,
inputs={"input": txt, **params}, # type: ignore[arg-type]
)
embeddings.extend(r["embedding"] for r in resp["data"])
return embeddings
def embed_documents(self, texts: List[str]) -> List[List[float]]:
return self.embed(texts, params=self.documents_params)
def embed_query(self, text: str) -> List[float]:
return self.embed([text], params=self.query_params)[0]
class MlflowCohereEmbeddings(MlflowEmbeddings):
"""Cohere embedding LLMs in MLflow."""
query_params: Dict[str, str] = {"input_type": "search_query"}
documents_params: Dict[str, str] = {"input_type": "search_document"}
|