LLMChat / modules /models /OpenAI.py
JohnSmith9982's picture
Upload 114 files
7bab2f2
from __future__ import annotations
import json
import logging
import traceback
import colorama
import requests
from .. import shared
from ..config import retrieve_proxy, sensitive_id, usage_limit
from ..index_func import *
from ..presets import *
from ..utils import *
from .base_model import BaseLLMModel
class OpenAIClient(BaseLLMModel):
def __init__(
self,
model_name,
api_key,
system_prompt=INITIAL_SYSTEM_PROMPT,
temperature=1.0,
top_p=1.0,
user_name=""
) -> None:
super().__init__(
model_name=model_name,
temperature=temperature,
top_p=top_p,
system_prompt=system_prompt,
user=user_name
)
self.api_key = api_key
self.need_api_key = True
self._refresh_header()
def get_answer_stream_iter(self):
response = self._get_response(stream=True)
if response is not None:
iter = self._decode_chat_response(response)
partial_text = ""
for i in iter:
partial_text += i
yield partial_text
else:
yield STANDARD_ERROR_MSG + GENERAL_ERROR_MSG
def get_answer_at_once(self):
response = self._get_response()
response = json.loads(response.text)
content = response["choices"][0]["message"]["content"]
total_token_count = response["usage"]["total_tokens"]
return content, total_token_count
def count_token(self, user_input):
input_token_count = count_token(construct_user(user_input))
if self.system_prompt is not None and len(self.all_token_counts) == 0:
system_prompt_token_count = count_token(
construct_system(self.system_prompt)
)
return input_token_count + system_prompt_token_count
return input_token_count
def billing_info(self):
try:
curr_time = datetime.datetime.now()
last_day_of_month = get_last_day_of_month(
curr_time).strftime("%Y-%m-%d")
first_day_of_month = curr_time.replace(day=1).strftime("%Y-%m-%d")
usage_url = f"{shared.state.usage_api_url}?start_date={first_day_of_month}&end_date={last_day_of_month}"
try:
usage_data = self._get_billing_data(usage_url)
except Exception as e:
# logging.error(f"获取API使用情况失败: " + str(e))
if "Invalid authorization header" in str(e):
return i18n("**获取API使用情况失败**,需在填写`config.json`中正确填写sensitive_id")
elif "Incorrect API key provided: sess" in str(e):
return i18n("**获取API使用情况失败**,sensitive_id错误或已过期")
return i18n("**获取API使用情况失败**")
# rounded_usage = "{:.5f}".format(usage_data["total_usage"] / 100)
rounded_usage = round(usage_data["total_usage"] / 100, 5)
usage_percent = round(usage_data["total_usage"] / usage_limit, 2)
from ..webui import get_html
# return i18n("**本月使用金额** ") + f"\u3000 ${rounded_usage}"
return get_html("billing_info.html").format(
label = i18n("本月使用金额"),
usage_percent = usage_percent,
rounded_usage = rounded_usage,
usage_limit = usage_limit
)
except requests.exceptions.ConnectTimeout:
status_text = (
STANDARD_ERROR_MSG + CONNECTION_TIMEOUT_MSG + ERROR_RETRIEVE_MSG
)
return status_text
except requests.exceptions.ReadTimeout:
status_text = STANDARD_ERROR_MSG + READ_TIMEOUT_MSG + ERROR_RETRIEVE_MSG
return status_text
except Exception as e:
import traceback
traceback.print_exc()
logging.error(i18n("获取API使用情况失败:") + str(e))
return STANDARD_ERROR_MSG + ERROR_RETRIEVE_MSG
def set_token_upper_limit(self, new_upper_limit):
pass
@shared.state.switching_api_key # 在不开启多账号模式的时候,这个装饰器不会起作用
def _get_response(self, stream=False):
openai_api_key = self.api_key
system_prompt = self.system_prompt
history = self.history
logging.debug(colorama.Fore.YELLOW +
f"{history}" + colorama.Fore.RESET)
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {openai_api_key}",
}
if system_prompt is not None:
history = [construct_system(system_prompt), *history]
payload = {
"model": self.model_name,
"messages": history,
"temperature": self.temperature,
"top_p": self.top_p,
"n": self.n_choices,
"stream": stream,
"presence_penalty": self.presence_penalty,
"frequency_penalty": self.frequency_penalty,
}
if self.max_generation_token is not None:
payload["max_tokens"] = self.max_generation_token
if self.stop_sequence is not None:
payload["stop"] = self.stop_sequence
if self.logit_bias is not None:
payload["logit_bias"] = self.logit_bias
if self.user_identifier:
payload["user"] = self.user_identifier
if stream:
timeout = TIMEOUT_STREAMING
else:
timeout = TIMEOUT_ALL
# 如果有自定义的api-host,使用自定义host发送请求,否则使用默认设置发送请求
if shared.state.chat_completion_url != CHAT_COMPLETION_URL:
logging.debug(f"使用自定义API URL: {shared.state.chat_completion_url}")
with retrieve_proxy():
try:
response = requests.post(
shared.state.chat_completion_url,
headers=headers,
json=payload,
stream=stream,
timeout=timeout,
)
except:
traceback.print_exc()
return None
return response
def _refresh_header(self):
self.headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {sensitive_id}",
}
def _get_billing_data(self, billing_url):
with retrieve_proxy():
response = requests.get(
billing_url,
headers=self.headers,
timeout=TIMEOUT_ALL,
)
if response.status_code == 200:
data = response.json()
return data
else:
raise Exception(
f"API request failed with status code {response.status_code}: {response.text}"
)
def _decode_chat_response(self, response):
error_msg = ""
for chunk in response.iter_lines():
if chunk:
chunk = chunk.decode()
chunk_length = len(chunk)
try:
chunk = json.loads(chunk[6:])
except:
print(i18n("JSON解析错误,收到的内容: ") + f"{chunk}")
error_msg += chunk
continue
try:
if chunk_length > 6 and "delta" in chunk["choices"][0]:
if "finish_reason" in chunk["choices"][0]:
finish_reason = chunk["choices"][0]["finish_reason"]
else:
finish_reason = chunk["finish_reason"]
if finish_reason == "stop":
break
try:
yield chunk["choices"][0]["delta"]["content"]
except Exception as e:
# logging.error(f"Error: {e}")
continue
except:
print(f"ERROR: {chunk}")
continue
if error_msg and not error_msg=="data: [DONE]":
raise Exception(error_msg)
def set_key(self, new_access_key):
ret = super().set_key(new_access_key)
self._refresh_header()
return ret
def _single_query_at_once(self, history, temperature=1.0):
timeout = TIMEOUT_ALL
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.api_key}",
"temperature": f"{temperature}",
}
payload = {
"model": self.model_name,
"messages": history,
}
# 如果有自定义的api-host,使用自定义host发送请求,否则使用默认设置发送请求
if shared.state.chat_completion_url != CHAT_COMPLETION_URL:
logging.debug(f"使用自定义API URL: {shared.state.chat_completion_url}")
with retrieve_proxy():
response = requests.post(
shared.state.chat_completion_url,
headers=headers,
json=payload,
stream=False,
timeout=timeout,
)
return response
def auto_name_chat_history(self, name_chat_method, user_question, chatbot, user_name, single_turn_checkbox):
if len(self.history) == 2 and not single_turn_checkbox and not hide_history_when_not_logged_in:
user_question = self.history[0]["content"]
if name_chat_method == i18n("模型自动总结(消耗tokens)"):
ai_answer = self.history[1]["content"]
try:
history = [
{ "role": "system", "content": SUMMARY_CHAT_SYSTEM_PROMPT},
{ "role": "user", "content": f"Please write a title based on the following conversation:\n---\nUser: {user_question}\nAssistant: {ai_answer}"}
]
response = self._single_query_at_once(history, temperature=0.0)
response = json.loads(response.text)
content = response["choices"][0]["message"]["content"]
filename = replace_special_symbols(content) + ".json"
except Exception as e:
logging.info(f"自动命名失败。{e}")
filename = replace_special_symbols(user_question)[:16] + ".json"
return self.rename_chat_history(filename, chatbot, user_name)
elif name_chat_method == i18n("第一条提问"):
filename = replace_special_symbols(user_question)[:16] + ".json"
return self.rename_chat_history(filename, chatbot, user_name)
else:
return gr.update()
else:
return gr.update()