Spaces:
Running
Running
from __future__ import annotations | |
import logging | |
import os | |
import warnings | |
from typing import ( | |
Any, | |
Dict, | |
Iterable, | |
List, | |
Literal, | |
Mapping, | |
Optional, | |
Sequence, | |
Set, | |
Tuple, | |
Union, | |
cast, | |
) | |
import openai | |
import tiktoken | |
from langchain_core.embeddings import Embeddings | |
from langchain_core.pydantic_v1 import ( | |
BaseModel, | |
Extra, | |
Field, | |
SecretStr, | |
root_validator, | |
) | |
from langchain_core.utils import ( | |
convert_to_secret_str, | |
get_from_dict_or_env, | |
get_pydantic_field_names, | |
) | |
logger = logging.getLogger(__name__) | |
def _process_batched_chunked_embeddings( | |
num_texts: int, | |
tokens: List[Union[List[int], str]], | |
batched_embeddings: List[List[float]], | |
indices: List[int], | |
skip_empty: bool, | |
) -> List[Optional[List[float]]]: | |
# for each text, this is the list of embeddings (list of list of floats) | |
# corresponding to the chunks of the text | |
results: List[List[List[float]]] = [[] for _ in range(num_texts)] | |
# for each text, this is the token length of each chunk | |
# for transformers tokenization, this is the string length | |
# for tiktoken, this is the number of tokens | |
num_tokens_in_batch: List[List[int]] = [[] for _ in range(num_texts)] | |
for i in range(len(indices)): | |
if skip_empty and len(batched_embeddings[i]) == 1: | |
continue | |
results[indices[i]].append(batched_embeddings[i]) | |
num_tokens_in_batch[indices[i]].append(len(tokens[i])) | |
# for each text, this is the final embedding | |
embeddings: List[Optional[List[float]]] = [] | |
for i in range(num_texts): | |
# an embedding for each chunk | |
_result: List[List[float]] = results[i] | |
if len(_result) == 0: | |
# this will be populated with the embedding of an empty string | |
# in the sync or async code calling this | |
embeddings.append(None) | |
continue | |
elif len(_result) == 1: | |
# if only one embedding was produced, use it | |
embeddings.append(_result[0]) | |
continue | |
else: | |
# else we need to weighted average | |
# should be same as | |
# average = np.average(_result, axis=0, weights=num_tokens_in_batch[i]) | |
total_weight = sum(num_tokens_in_batch[i]) | |
average = [ | |
sum( | |
val * weight | |
for val, weight in zip(embedding, num_tokens_in_batch[i]) | |
) | |
/ total_weight | |
for embedding in zip(*_result) | |
] | |
# should be same as | |
# embeddings.append((average / np.linalg.norm(average)).tolist()) | |
magnitude = sum(val**2 for val in average) ** 0.5 | |
embeddings.append([val / magnitude for val in average]) | |
return embeddings | |
class OpenAIEmbeddings(BaseModel, Embeddings): | |
"""OpenAI embedding models. | |
To use, you should have the | |
environment variable ``OPENAI_API_KEY`` set with your API key or pass it | |
as a named parameter to the constructor. | |
In order to use the library with Microsoft Azure endpoints, use | |
AzureOpenAIEmbeddings. | |
Example: | |
.. code-block:: python | |
from langchain_openai import OpenAIEmbeddings | |
model = OpenAIEmbeddings(model="text-embedding-3-large") | |
""" | |
client: Any = Field(default=None, exclude=True) #: :meta private: | |
async_client: Any = Field(default=None, exclude=True) #: :meta private: | |
model: str = "text-embedding-ada-002" | |
dimensions: Optional[int] = None | |
"""The number of dimensions the resulting output embeddings should have. | |
Only supported in `text-embedding-3` and later models. | |
""" | |
# to support Azure OpenAI Service custom deployment names | |
deployment: Optional[str] = model | |
# TODO: Move to AzureOpenAIEmbeddings. | |
openai_api_version: Optional[str] = Field(default=None, alias="api_version") | |
"""Automatically inferred from env var `OPENAI_API_VERSION` if not provided.""" | |
# to support Azure OpenAI Service custom endpoints | |
openai_api_base: Optional[str] = Field(default=None, alias="base_url") | |
"""Base URL path for API requests, leave blank if not using a proxy or service | |
emulator.""" | |
# to support Azure OpenAI Service custom endpoints | |
openai_api_type: Optional[str] = None | |
# to support explicit proxy for OpenAI | |
openai_proxy: Optional[str] = None | |
embedding_ctx_length: int = 8191 | |
"""The maximum number of tokens to embed at once.""" | |
openai_api_key: Optional[SecretStr] = Field(default=None, alias="api_key") | |
"""Automatically inferred from env var `OPENAI_API_KEY` if not provided.""" | |
openai_organization: Optional[str] = Field(default=None, alias="organization") | |
"""Automatically inferred from env var `OPENAI_ORG_ID` if not provided.""" | |
allowed_special: Union[Literal["all"], Set[str], None] = None | |
disallowed_special: Union[Literal["all"], Set[str], Sequence[str], None] = None | |
chunk_size: int = 1000 | |
"""Maximum number of texts to embed in each batch""" | |
max_retries: int = 2 | |
"""Maximum number of retries to make when generating.""" | |
request_timeout: Optional[Union[float, Tuple[float, float], Any]] = Field( | |
default=None, alias="timeout" | |
) | |
"""Timeout for requests to OpenAI completion API. Can be float, httpx.Timeout or | |
None.""" | |
headers: Any = None | |
tiktoken_enabled: bool = True | |
"""Set this to False for non-OpenAI implementations of the embeddings API, e.g. | |
the `--extensions openai` extension for `text-generation-webui`""" | |
tiktoken_model_name: Optional[str] = None | |
"""The model name to pass to tiktoken when using this class. | |
Tiktoken is used to count the number of tokens in documents to constrain | |
them to be under a certain limit. By default, when set to None, this will | |
be the same as the embedding model name. However, there are some cases | |
where you may want to use this Embedding class with a model name not | |
supported by tiktoken. This can include when using Azure embeddings or | |
when using one of the many model providers that expose an OpenAI-like | |
API but with different models. In those cases, in order to avoid erroring | |
when tiktoken is called, you can specify a model name to use here.""" | |
show_progress_bar: bool = False | |
"""Whether to show a progress bar when embedding.""" | |
model_kwargs: Dict[str, Any] = Field(default_factory=dict) | |
"""Holds any model parameters valid for `create` call not explicitly specified.""" | |
skip_empty: bool = False | |
"""Whether to skip empty strings when embedding or raise an error. | |
Defaults to not skipping.""" | |
default_headers: Union[Mapping[str, str], None] = None | |
default_query: Union[Mapping[str, object], None] = None | |
# Configure a custom httpx client. See the | |
# [httpx documentation](https://www.python-httpx.org/api/#client) for more details. | |
retry_min_seconds: int = 4 | |
"""Min number of seconds to wait between retries""" | |
retry_max_seconds: int = 20 | |
"""Max number of seconds to wait between retries""" | |
http_client: Union[Any, None] = None | |
"""Optional httpx.Client. Only used for sync invocations. Must specify | |
http_async_client as well if you'd like a custom client for async invocations. | |
""" | |
http_async_client: Union[Any, None] = None | |
"""Optional httpx.AsyncClient. Only used for async invocations. Must specify | |
http_client as well if you'd like a custom client for sync invocations.""" | |
check_embedding_ctx_length: bool = True | |
"""Whether to check the token length of inputs and automatically split inputs | |
longer than embedding_ctx_length.""" | |
class Config: | |
"""Configuration for this pydantic object.""" | |
extra = Extra.forbid | |
allow_population_by_field_name = True | |
def build_extra(cls, values: Dict[str, Any]) -> Dict[str, Any]: | |
"""Build extra kwargs from additional params that were passed in.""" | |
all_required_field_names = get_pydantic_field_names(cls) | |
extra = values.get("model_kwargs", {}) | |
for field_name in list(values): | |
if field_name in extra: | |
raise ValueError(f"Found {field_name} supplied twice.") | |
if field_name not in all_required_field_names: | |
warnings.warn( | |
f"""WARNING! {field_name} is not default parameter. | |
{field_name} was transferred to model_kwargs. | |
Please confirm that {field_name} is what you intended.""" | |
) | |
extra[field_name] = values.pop(field_name) | |
invalid_model_kwargs = all_required_field_names.intersection(extra.keys()) | |
if invalid_model_kwargs: | |
raise ValueError( | |
f"Parameters {invalid_model_kwargs} should be specified explicitly. " | |
f"Instead they were passed in as part of `model_kwargs` parameter." | |
) | |
values["model_kwargs"] = extra | |
return values | |
def validate_environment(cls, values: Dict) -> Dict: | |
"""Validate that api key and python package exists in environment.""" | |
openai_api_key = get_from_dict_or_env( | |
values, "openai_api_key", "OPENAI_API_KEY" | |
) | |
values["openai_api_key"] = ( | |
convert_to_secret_str(openai_api_key) if openai_api_key else None | |
) | |
values["openai_api_base"] = values["openai_api_base"] or os.getenv( | |
"OPENAI_API_BASE" | |
) | |
values["openai_api_type"] = get_from_dict_or_env( | |
values, | |
"openai_api_type", | |
"OPENAI_API_TYPE", | |
default="", | |
) | |
values["openai_proxy"] = get_from_dict_or_env( | |
values, | |
"openai_proxy", | |
"OPENAI_PROXY", | |
default="", | |
) | |
if values["openai_api_type"] in ("azure", "azure_ad", "azuread"): | |
default_api_version = "2023-05-15" | |
# Azure OpenAI embedding models allow a maximum of 16 texts | |
# at a time in each batch | |
# See: https://learn.microsoft.com/en-us/azure/ai-services/openai/reference#embeddings | |
values["chunk_size"] = min(values["chunk_size"], 16) | |
else: | |
default_api_version = "" | |
values["openai_api_version"] = get_from_dict_or_env( | |
values, | |
"openai_api_version", | |
"OPENAI_API_VERSION", | |
default=default_api_version, | |
) | |
# Check OPENAI_ORGANIZATION for backwards compatibility. | |
values["openai_organization"] = ( | |
values["openai_organization"] | |
or os.getenv("OPENAI_ORG_ID") | |
or os.getenv("OPENAI_ORGANIZATION") | |
) | |
if values["openai_api_type"] in ("azure", "azure_ad", "azuread"): | |
raise ValueError( | |
"If you are using Azure, " | |
"please use the `AzureOpenAIEmbeddings` class." | |
) | |
client_params = { | |
"api_key": ( | |
values["openai_api_key"].get_secret_value() | |
if values["openai_api_key"] | |
else None | |
), | |
"organization": values["openai_organization"], | |
"base_url": values["openai_api_base"], | |
"timeout": values["request_timeout"], | |
"max_retries": values["max_retries"], | |
"default_headers": values["default_headers"], | |
"default_query": values["default_query"], | |
} | |
if not values.get("client"): | |
sync_specific = {"http_client": values["http_client"]} | |
values["client"] = openai.OpenAI( | |
**client_params, **sync_specific | |
).embeddings | |
if not values.get("async_client"): | |
async_specific = {"http_client": values["http_async_client"]} | |
values["async_client"] = openai.AsyncOpenAI( | |
**client_params, **async_specific | |
).embeddings | |
return values | |
def _invocation_params(self) -> Dict[str, Any]: | |
params: Dict = {"model": self.model, **self.model_kwargs} | |
if self.dimensions is not None: | |
params["dimensions"] = self.dimensions | |
return params | |
def _tokenize( | |
self, texts: List[str], chunk_size: int | |
) -> Tuple[Iterable[int], List[Union[List[int], str]], List[int]]: | |
""" | |
Take the input `texts` and `chunk_size` and return 3 iterables as a tuple: | |
We have `batches`, where batches are sets of individual texts | |
we want responses from the openai api. The length of a single batch is | |
`chunk_size` texts. | |
Each individual text is also split into multiple texts based on the | |
`embedding_ctx_length` parameter (based on number of tokens). | |
This function returns a 3-tuple of the following: | |
_iter: An iterable of the starting index in `tokens` for each *batch* | |
tokens: A list of tokenized texts, where each text has already been split | |
into sub-texts based on the `embedding_ctx_length` parameter. In the | |
case of tiktoken, this is a list of token arrays. In the case of | |
HuggingFace transformers, this is a list of strings. | |
indices: An iterable of the same length as `tokens` that maps each token-array | |
to the index of the original text in `texts`. | |
""" | |
tokens: List[Union[List[int], str]] = [] | |
indices: List[int] = [] | |
model_name = self.tiktoken_model_name or self.model | |
# If tiktoken flag set to False | |
if not self.tiktoken_enabled: | |
try: | |
from transformers import AutoTokenizer | |
except ImportError: | |
raise ValueError( | |
"Could not import transformers python package. " | |
"This is needed for OpenAIEmbeddings to work without " | |
"`tiktoken`. Please install it with `pip install transformers`. " | |
) | |
tokenizer = AutoTokenizer.from_pretrained( | |
pretrained_model_name_or_path=model_name | |
) | |
for i, text in enumerate(texts): | |
# Tokenize the text using HuggingFace transformers | |
tokenized: List[int] = tokenizer.encode(text, add_special_tokens=False) | |
# Split tokens into chunks respecting the embedding_ctx_length | |
for j in range(0, len(tokenized), self.embedding_ctx_length): | |
token_chunk: List[int] = tokenized[ | |
j : j + self.embedding_ctx_length | |
] | |
# Convert token IDs back to a string | |
chunk_text: str = tokenizer.decode(token_chunk) | |
tokens.append(chunk_text) | |
indices.append(i) | |
else: | |
try: | |
encoding = tiktoken.encoding_for_model(model_name) | |
except KeyError: | |
encoding = tiktoken.get_encoding("cl100k_base") | |
encoder_kwargs: Dict[str, Any] = { | |
k: v | |
for k, v in { | |
"allowed_special": self.allowed_special, | |
"disallowed_special": self.disallowed_special, | |
}.items() | |
if v is not None | |
} | |
for i, text in enumerate(texts): | |
if self.model.endswith("001"): | |
# See: https://github.com/openai/openai-python/ | |
# issues/418#issuecomment-1525939500 | |
# replace newlines, which can negatively affect performance. | |
text = text.replace("\n", " ") | |
if encoder_kwargs: | |
token = encoding.encode(text, **encoder_kwargs) | |
else: | |
token = encoding.encode_ordinary(text) | |
# Split tokens into chunks respecting the embedding_ctx_length | |
for j in range(0, len(token), self.embedding_ctx_length): | |
tokens.append(token[j : j + self.embedding_ctx_length]) | |
indices.append(i) | |
if self.show_progress_bar: | |
try: | |
from tqdm.auto import tqdm | |
_iter: Iterable = tqdm(range(0, len(tokens), chunk_size)) | |
except ImportError: | |
_iter = range(0, len(tokens), chunk_size) | |
else: | |
_iter = range(0, len(tokens), chunk_size) | |
return _iter, tokens, indices | |
# please refer to | |
# https://github.com/openai/openai-cookbook/blob/main/examples/Embedding_long_inputs.ipynb | |
def _get_len_safe_embeddings( | |
self, texts: List[str], *, engine: str, chunk_size: Optional[int] = None | |
) -> List[List[float]]: | |
""" | |
Generate length-safe embeddings for a list of texts. | |
This method handles tokenization and embedding generation, respecting the | |
set embedding context length and chunk size. It supports both tiktoken | |
and HuggingFace tokenizer based on the tiktoken_enabled flag. | |
Args: | |
texts (List[str]): A list of texts to embed. | |
engine (str): The engine or model to use for embeddings. | |
chunk_size (Optional[int]): The size of chunks for processing embeddings. | |
Returns: | |
List[List[float]]: A list of embeddings for each input text. | |
""" | |
_chunk_size = chunk_size or self.chunk_size | |
_iter, tokens, indices = self._tokenize(texts, _chunk_size) | |
batched_embeddings: List[List[float]] = [] | |
for i in _iter: | |
response = self.client.create( | |
input=tokens[i : i + _chunk_size], **self._invocation_params | |
) | |
if not isinstance(response, dict): | |
response = response.model_dump() | |
batched_embeddings.extend(r["embedding"] for r in response["data"]) | |
embeddings = _process_batched_chunked_embeddings( | |
len(texts), tokens, batched_embeddings, indices, self.skip_empty | |
) | |
_cached_empty_embedding: Optional[List[float]] = None | |
def empty_embedding() -> List[float]: | |
nonlocal _cached_empty_embedding | |
if _cached_empty_embedding is None: | |
average_embedded = self.client.create( | |
input="", **self._invocation_params | |
) | |
if not isinstance(average_embedded, dict): | |
average_embedded = average_embedded.model_dump() | |
_cached_empty_embedding = average_embedded["data"][0]["embedding"] | |
return _cached_empty_embedding | |
return [e if e is not None else empty_embedding() for e in embeddings] | |
# please refer to | |
# https://github.com/openai/openai-cookbook/blob/main/examples/Embedding_long_inputs.ipynb | |
async def _aget_len_safe_embeddings( | |
self, texts: List[str], *, engine: str, chunk_size: Optional[int] = None | |
) -> List[List[float]]: | |
""" | |
Asynchronously generate length-safe embeddings for a list of texts. | |
This method handles tokenization and asynchronous embedding generation, | |
respecting the set embedding context length and chunk size. It supports both | |
`tiktoken` and HuggingFace `tokenizer` based on the tiktoken_enabled flag. | |
Args: | |
texts (List[str]): A list of texts to embed. | |
engine (str): The engine or model to use for embeddings. | |
chunk_size (Optional[int]): The size of chunks for processing embeddings. | |
Returns: | |
List[List[float]]: A list of embeddings for each input text. | |
""" | |
_chunk_size = chunk_size or self.chunk_size | |
_iter, tokens, indices = self._tokenize(texts, _chunk_size) | |
batched_embeddings: List[List[float]] = [] | |
_chunk_size = chunk_size or self.chunk_size | |
for i in range(0, len(tokens), _chunk_size): | |
response = await self.async_client.create( | |
input=tokens[i : i + _chunk_size], **self._invocation_params | |
) | |
if not isinstance(response, dict): | |
response = response.model_dump() | |
batched_embeddings.extend(r["embedding"] for r in response["data"]) | |
embeddings = _process_batched_chunked_embeddings( | |
len(texts), tokens, batched_embeddings, indices, self.skip_empty | |
) | |
_cached_empty_embedding: Optional[List[float]] = None | |
async def empty_embedding() -> List[float]: | |
nonlocal _cached_empty_embedding | |
if _cached_empty_embedding is None: | |
average_embedded = await self.async_client.create( | |
input="", **self._invocation_params | |
) | |
if not isinstance(average_embedded, dict): | |
average_embedded = average_embedded.model_dump() | |
_cached_empty_embedding = average_embedded["data"][0]["embedding"] | |
return _cached_empty_embedding | |
return [e if e is not None else await empty_embedding() for e in embeddings] | |
def embed_documents( | |
self, texts: List[str], chunk_size: Optional[int] = 0 | |
) -> List[List[float]]: | |
"""Call out to OpenAI's embedding endpoint for embedding search docs. | |
Args: | |
texts: The list of texts to embed. | |
chunk_size: The chunk size of embeddings. If None, will use the chunk size | |
specified by the class. | |
Returns: | |
List of embeddings, one for each text. | |
""" | |
if not self.check_embedding_ctx_length: | |
embeddings: List[List[float]] = [] | |
for text in texts: | |
response = self.client.create( | |
input=text, | |
**self._invocation_params, | |
) | |
if not isinstance(response, dict): | |
response = response.dict() | |
embeddings.extend(r["embedding"] for r in response["data"]) | |
return embeddings | |
# NOTE: to keep things simple, we assume the list may contain texts longer | |
# than the maximum context and use length-safe embedding function. | |
engine = cast(str, self.deployment) | |
return self._get_len_safe_embeddings(texts, engine=engine) | |
async def aembed_documents( | |
self, texts: List[str], chunk_size: Optional[int] = 0 | |
) -> List[List[float]]: | |
"""Call out to OpenAI's embedding endpoint async for embedding search docs. | |
Args: | |
texts: The list of texts to embed. | |
chunk_size: The chunk size of embeddings. If None, will use the chunk size | |
specified by the class. | |
Returns: | |
List of embeddings, one for each text. | |
""" | |
if not self.check_embedding_ctx_length: | |
embeddings: List[List[float]] = [] | |
for text in texts: | |
response = await self.async_client.create( | |
input=text, | |
**self._invocation_params, | |
) | |
if not isinstance(response, dict): | |
response = response.dict() | |
embeddings.extend(r["embedding"] for r in response["data"]) | |
return embeddings | |
# NOTE: to keep things simple, we assume the list may contain texts longer | |
# than the maximum context and use length-safe embedding function. | |
engine = cast(str, self.deployment) | |
return await self._aget_len_safe_embeddings(texts, engine=engine) | |
def embed_query(self, text: str) -> List[float]: | |
"""Call out to OpenAI's embedding endpoint for embedding query text. | |
Args: | |
text: The text to embed. | |
Returns: | |
Embedding for the text. | |
""" | |
return self.embed_documents([text])[0] | |
async def aembed_query(self, text: str) -> List[float]: | |
"""Call out to OpenAI's embedding endpoint async for embedding query text. | |
Args: | |
text: The text to embed. | |
Returns: | |
Embedding for the text. | |
""" | |
embeddings = await self.aembed_documents([text]) | |
return embeddings[0] | |