Spaces:
Runtime error
Runtime error
File size: 8,709 Bytes
ed4d993 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 |
from __future__ import annotations
import logging
from typing import Any, Callable, Dict, List, Optional
from langchain_core._api.deprecation import deprecated
from langchain_core.callbacks import (
AsyncCallbackManagerForLLMRun,
CallbackManagerForLLMRun,
)
from langchain_core.language_models.llms import LLM
from langchain_core.load.serializable import Serializable
from langchain_core.pydantic_v1 import Extra, Field, SecretStr, root_validator
from langchain_core.utils import convert_to_secret_str, get_from_dict_or_env
from tenacity import (
before_sleep_log,
retry,
retry_if_exception_type,
stop_after_attempt,
wait_exponential,
)
from langchain_community.llms.utils import enforce_stop_tokens
logger = logging.getLogger(__name__)
def _create_retry_decorator(max_retries: int) -> Callable[[Any], Any]:
import cohere
# support v4 and v5
retry_conditions = (
retry_if_exception_type(cohere.error.CohereError)
if hasattr(cohere, "error")
else retry_if_exception_type(Exception)
)
min_seconds = 4
max_seconds = 10
# Wait 2^x * 1 second between each retry starting with
# 4 seconds, then up to 10 seconds, then 10 seconds afterwards
return retry(
reraise=True,
stop=stop_after_attempt(max_retries),
wait=wait_exponential(multiplier=1, min=min_seconds, max=max_seconds),
retry=retry_conditions,
before_sleep=before_sleep_log(logger, logging.WARNING),
)
def completion_with_retry(llm: Cohere, **kwargs: Any) -> Any:
"""Use tenacity to retry the completion call."""
retry_decorator = _create_retry_decorator(llm.max_retries)
@retry_decorator
def _completion_with_retry(**kwargs: Any) -> Any:
return llm.client.generate(**kwargs)
return _completion_with_retry(**kwargs)
def acompletion_with_retry(llm: Cohere, **kwargs: Any) -> Any:
"""Use tenacity to retry the completion call."""
retry_decorator = _create_retry_decorator(llm.max_retries)
@retry_decorator
async def _completion_with_retry(**kwargs: Any) -> Any:
return await llm.async_client.generate(**kwargs)
return _completion_with_retry(**kwargs)
@deprecated(
since="0.0.30", removal="0.3.0", alternative_import="langchain_cohere.BaseCohere"
)
class BaseCohere(Serializable):
"""Base class for Cohere models."""
client: Any #: :meta private:
async_client: Any #: :meta private:
model: Optional[str] = Field(default=None)
"""Model name to use."""
temperature: float = 0.75
"""A non-negative float that tunes the degree of randomness in generation."""
cohere_api_key: Optional[SecretStr] = None
"""Cohere API key. If not provided, will be read from the environment variable."""
stop: Optional[List[str]] = None
streaming: bool = Field(default=False)
"""Whether to stream the results."""
user_agent: str = "langchain"
"""Identifier for the application making the request."""
@root_validator()
def validate_environment(cls, values: Dict) -> Dict:
"""Validate that api key and python package exists in environment."""
try:
import cohere
except ImportError:
raise ImportError(
"Could not import cohere python package. "
"Please install it with `pip install cohere`."
)
else:
values["cohere_api_key"] = convert_to_secret_str(
get_from_dict_or_env(values, "cohere_api_key", "COHERE_API_KEY")
)
client_name = values["user_agent"]
values["client"] = cohere.Client(
api_key=values["cohere_api_key"].get_secret_value(),
client_name=client_name,
)
values["async_client"] = cohere.AsyncClient(
api_key=values["cohere_api_key"].get_secret_value(),
client_name=client_name,
)
return values
@deprecated(
since="0.1.14", removal="0.3.0", alternative_import="langchain_cohere.Cohere"
)
class Cohere(LLM, BaseCohere):
"""Cohere large language models.
To use, you should have the ``cohere`` python package installed, and the
environment variable ``COHERE_API_KEY`` set with your API key, or pass
it as a named parameter to the constructor.
Example:
.. code-block:: python
from langchain_community.llms import Cohere
cohere = Cohere(model="gptd-instruct-tft", cohere_api_key="my-api-key")
"""
max_tokens: int = 256
"""Denotes the number of tokens to predict per generation."""
k: int = 0
"""Number of most likely tokens to consider at each step."""
p: int = 1
"""Total probability mass of tokens to consider at each step."""
frequency_penalty: float = 0.0
"""Penalizes repeated tokens according to frequency. Between 0 and 1."""
presence_penalty: float = 0.0
"""Penalizes repeated tokens. Between 0 and 1."""
truncate: Optional[str] = None
"""Specify how the client handles inputs longer than the maximum token
length: Truncate from START, END or NONE"""
max_retries: int = 10
"""Maximum number of retries to make when generating."""
class Config:
"""Configuration for this pydantic object."""
extra = Extra.forbid
@property
def _default_params(self) -> Dict[str, Any]:
"""Get the default parameters for calling Cohere API."""
return {
"max_tokens": self.max_tokens,
"temperature": self.temperature,
"k": self.k,
"p": self.p,
"frequency_penalty": self.frequency_penalty,
"presence_penalty": self.presence_penalty,
"truncate": self.truncate,
}
@property
def lc_secrets(self) -> Dict[str, str]:
return {"cohere_api_key": "COHERE_API_KEY"}
@property
def _identifying_params(self) -> Dict[str, Any]:
"""Get the identifying parameters."""
return {**{"model": self.model}, **self._default_params}
@property
def _llm_type(self) -> str:
"""Return type of llm."""
return "cohere"
def _invocation_params(self, stop: Optional[List[str]], **kwargs: Any) -> dict:
params = self._default_params
if self.stop is not None and stop is not None:
raise ValueError("`stop` found in both the input and default params.")
elif self.stop is not None:
params["stop_sequences"] = self.stop
else:
params["stop_sequences"] = stop
return {**params, **kwargs}
def _process_response(self, response: Any, stop: Optional[List[str]]) -> str:
text = response.generations[0].text
# If stop tokens are provided, Cohere's endpoint returns them.
# In order to make this consistent with other endpoints, we strip them.
if stop:
text = enforce_stop_tokens(text, stop)
return text
def _call(
self,
prompt: str,
stop: Optional[List[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> str:
"""Call out to Cohere's generate endpoint.
Args:
prompt: The prompt to pass into the model.
stop: Optional list of stop words to use when generating.
Returns:
The string generated by the model.
Example:
.. code-block:: python
response = cohere("Tell me a joke.")
"""
params = self._invocation_params(stop, **kwargs)
response = completion_with_retry(
self, model=self.model, prompt=prompt, **params
)
_stop = params.get("stop_sequences")
return self._process_response(response, _stop)
async def _acall(
self,
prompt: str,
stop: Optional[List[str]] = None,
run_manager: Optional[AsyncCallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> str:
"""Async call out to Cohere's generate endpoint.
Args:
prompt: The prompt to pass into the model.
stop: Optional list of stop words to use when generating.
Returns:
The string generated by the model.
Example:
.. code-block:: python
response = await cohere("Tell me a joke.")
"""
params = self._invocation_params(stop, **kwargs)
response = await acompletion_with_retry(
self, model=self.model, prompt=prompt, **params
)
_stop = params.get("stop_sequences")
return self._process_response(response, _stop)
|