Spaces:
Runtime error
Runtime error
File size: 4,997 Bytes
ed4d993 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 |
import asyncio
import logging
import threading
from typing import Dict, List, Optional
import requests
from langchain_core._api.deprecation import deprecated
from langchain_core.embeddings import Embeddings
from langchain_core.pydantic_v1 import BaseModel, root_validator
from langchain_core.runnables.config import run_in_executor
from langchain_core.utils import get_from_dict_or_env
logger = logging.getLogger(__name__)
@deprecated(
since="0.0.13",
alternative="langchain_community.embeddings.QianfanEmbeddingsEndpoint",
)
class ErnieEmbeddings(BaseModel, Embeddings):
"""`Ernie Embeddings V1` embedding models."""
ernie_api_base: Optional[str] = None
ernie_client_id: Optional[str] = None
ernie_client_secret: Optional[str] = None
access_token: Optional[str] = None
chunk_size: int = 16
model_name = "ErnieBot-Embedding-V1"
_lock = threading.Lock()
@root_validator()
def validate_environment(cls, values: Dict) -> Dict:
values["ernie_api_base"] = get_from_dict_or_env(
values, "ernie_api_base", "ERNIE_API_BASE", "https://aip.baidubce.com"
)
values["ernie_client_id"] = get_from_dict_or_env(
values,
"ernie_client_id",
"ERNIE_CLIENT_ID",
)
values["ernie_client_secret"] = get_from_dict_or_env(
values,
"ernie_client_secret",
"ERNIE_CLIENT_SECRET",
)
return values
def _embedding(self, json: object) -> dict:
base_url = (
f"{self.ernie_api_base}/rpc/2.0/ai_custom/v1/wenxinworkshop/embeddings"
)
resp = requests.post(
f"{base_url}/embedding-v1",
headers={
"Content-Type": "application/json",
},
params={"access_token": self.access_token},
json=json,
)
return resp.json()
def _refresh_access_token_with_lock(self) -> None:
with self._lock:
logger.debug("Refreshing access token")
base_url: str = f"{self.ernie_api_base}/oauth/2.0/token"
resp = requests.post(
base_url,
headers={
"Content-Type": "application/json",
"Accept": "application/json",
},
params={
"grant_type": "client_credentials",
"client_id": self.ernie_client_id,
"client_secret": self.ernie_client_secret,
},
)
self.access_token = str(resp.json().get("access_token"))
def embed_documents(self, texts: List[str]) -> List[List[float]]:
"""Embed search docs.
Args:
texts: The list of texts to embed
Returns:
List[List[float]]: List of embeddings, one for each text.
"""
if not self.access_token:
self._refresh_access_token_with_lock()
text_in_chunks = [
texts[i : i + self.chunk_size]
for i in range(0, len(texts), self.chunk_size)
]
lst = []
for chunk in text_in_chunks:
resp = self._embedding({"input": [text for text in chunk]})
if resp.get("error_code"):
if resp.get("error_code") == 111:
self._refresh_access_token_with_lock()
resp = self._embedding({"input": [text for text in chunk]})
else:
raise ValueError(f"Error from Ernie: {resp}")
lst.extend([i["embedding"] for i in resp["data"]])
return lst
def embed_query(self, text: str) -> List[float]:
"""Embed query text.
Args:
text: The text to embed.
Returns:
List[float]: Embeddings for the text.
"""
if not self.access_token:
self._refresh_access_token_with_lock()
resp = self._embedding({"input": [text]})
if resp.get("error_code"):
if resp.get("error_code") == 111:
self._refresh_access_token_with_lock()
resp = self._embedding({"input": [text]})
else:
raise ValueError(f"Error from Ernie: {resp}")
return resp["data"][0]["embedding"]
async def aembed_query(self, text: str) -> List[float]:
"""Asynchronous Embed query text.
Args:
text: The text to embed.
Returns:
List[float]: Embeddings for the text.
"""
return await run_in_executor(None, self.embed_query, text)
async def aembed_documents(self, texts: List[str]) -> List[List[float]]:
"""Asynchronous Embed search docs.
Args:
texts: The list of texts to embed
Returns:
List[List[float]]: List of embeddings, one for each text.
"""
result = await asyncio.gather(*[self.aembed_query(text) for text in texts])
return list(result)
|