my_api / app.py
souljoy's picture
Update app.py
cf22d38
raw
history blame
9 kB
from fastapi import FastAPI, status
from fastapi.responses import HTMLResponse
from pydantic import BaseModel
from fastapi.responses import JSONResponse, StreamingResponse
import requests
import json
import openai
import time
class Text(BaseModel):
content: str = ""
app = FastAPI()
key = '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'
openai.api_key = key
headers = {
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + key
}
@app.get("/")
def home():
html_content = open('index.html').read()
return HTMLResponse(content=html_content, status_code=200)
@app.post("/qa_maker")
def sentiment_analysis_ep(content: Text = None):
url = 'https://api.openai.com/v1/chat/completions'
prompt = '根据下面的文章,生成的“问题和回答”QA对,大于5个,以一行一个json格式({“question”:"xxx","answer":"xxx"})生成:\n'
messages = [{"role": "user", "content": prompt + content.content}]
data = {
"model": "gpt-3.5-turbo",
"messages": messages
}
print("messages = \n", messages)
result = requests.post(url=url,
data=json.dumps(data),
headers=headers
)
res = str(result.json()['choices'][0]['message']['content']).strip()
print('res:', res)
res = {'content': res}
return JSONResponse(content=res)
@app.post("/chatpdf")
def chat_pdf_ep(content: Text = None):
url = 'https://api.openai.com/v1/chat/completions'
messages = [
{
"role": "system",
"content": "你是一个有用的助手,可以使用文章内容准确地回答问题。使用提供的文章来生成你的答案,但避免逐字复制文章。尽可能使用自己的话。准确、有用、简洁、清晰。"
}
]
obj = json.loads(content.content)
messages.append({"role": "system", "content": "文章内容:\n" + obj['doc']})
history = obj['history']
for his in history:
messages.append({"role": "user", "content": his[0]})
messages.append({"role": "assistant", "content": his[1]})
messages.append({"role": "user", "content": obj['question']})
data = {
"model": "gpt-3.5-turbo",
"messages": messages
}
print("messages = \n", messages)
result = requests.post(url=url,
data=json.dumps(data),
headers=headers
)
res = str(result.json()['choices'][0]['message']['content']).strip()
content = {'content': res}
print('content:', content)
return JSONResponse(content=content)
@app.post("/sale")
def sale_ep(content: Text = None):
url = 'https://api.openai.com/v1/chat/completions'
messages = [
{
"role": "system",
"content": "你是小鹏汽车资深的销售,可以参考文章内容来生成你的答案,但避免逐字复制文章,尽可能使用自己的话,突出小鹏汽车的优点、说服用户来店试驾或购买小鹏汽车。准确、有用、有销售技巧、口语化、有亲和力。"
}
]
obj = json.loads(content.content)
messages.append({"role": "system", "content": "文章内容:\n" + obj['doc']})
history = obj['history']
for his in history:
messages.append({"role": "user", "content": his[0]})
messages.append({"role": "assistant", "content": his[1]})
messages.append({"role": "user", "content": obj['question']})
data = {
"model": "gpt-3.5-turbo",
"messages": messages
}
print("messages = \n", messages)
result = requests.post(url=url,
data=json.dumps(data),
headers=headers
)
res = str(result.json()['choices'][0]['message']['content']).strip()
content = {'content': res}
print('content:', content)
return JSONResponse(content=content)
@app.post("/chatgpt")
def chat_gpt_ep(content: Text = None):
url = 'https://api.openai.com/v1/chat/completions'
obj = json.loads(content.content)
data = {
"model": "gpt-3.5-turbo",
"messages": obj['messages']
}
print("data = \n", data)
key = obj['key']
openai.api_key = key
headers = {
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + key
}
result = requests.post(url=url,
data=json.dumps(data),
headers=headers
)
res = str(result.json()['choices'][0]['message']['content']).strip()
content = {'content': res}
print('content:', content)
return JSONResponse(content=content)
async def chat_gpt_stream_fun(content: Text = None):
start_time = time.time()
obj = json.loads(content.content)
response = openai.ChatCompletion.create(
model='gpt-3.5-turbo',
messages=obj['messages'],
stream=True, # this time, we set stream=True
)
# create variables to collect the stream of chunks
collected_chunks = []
collected_messages = []
# iterate through the stream of events
for chunk in response:
chunk_time = time.time() - start_time # calculate the time delay of the chunk
collected_chunks.append(chunk) # save the event response
chunk_message = chunk['choices'][0]['delta'] # extract the message
collected_messages.append(chunk_message) # save the message
print(f"Message received {chunk_time:.2f} seconds after request: {chunk_message}") # print the delay and text
full_reply_content = ''.join([m.get('content', '') for m in collected_messages])
print(f"Full conversation received: {full_reply_content}")
content = {'content': full_reply_content}
print('content:', content)
yield json.dumps(content) + '\n'
@app.post("/chatgptstream", status_code=status.HTTP_200_OK)
async def get_random_numbers(content: Text = None):
return StreamingResponse(chat_gpt_stream_fun(content), media_type='application/json')
@app.post("/embeddings")
def embeddings_ep(content: Text = None):
url = 'https://api.openai.com/v1/embeddings'
data = {
"model": "text-embedding-ada-002",
"input": content.content
}
result = requests.post(url=url,
data=json.dumps(data),
headers=headers
)
return JSONResponse(content=result.json())
@app.post("/create_image")
def create_image_ep(content: Text = None):
url = 'https://api.openai.com/v1/images/generations'
obj = json.loads(content.content)
data = {
"prompt": obj["prompt"],
"n": obj["n"],
"size": obj["size"]
}
print("data = \n", data)
result = requests.post(url=url,
data=json.dumps(data),
headers=headers
)
return JSONResponse(content=result.json())
from fastapi import FastAPI, Request, Response
from fastapi.responses import PlainTextResponse
from hashlib import sha1
from time import time
from xml.etree.ElementTree import Element, tostring
def chat_gpt_response(prompt):
# Replace with your GPT-3.5 implementation
return "你好呀,小哥哥"
@app.get('/wechat')
def verify_server_address(signature: str, timestamp: str, nonce: str, echostr: str):
token = 'zsj'
if check_signature(token, signature, timestamp, nonce):
return PlainTextResponse(echostr)
@app.post('/wechat')
def process_message(request: Request):
xml_data = await request.body()
xml_tree = ElementTree.fromstring(xml_data)
msg_type = xml_tree.find('MsgType').text
if msg_type == 'text':
content = xml_tree.find('Content').text
user_open_id = xml_tree.find('FromUserName').text
public_account_id = xml_tree.find('ToUserName').text
reply_content = chat_gpt_response(content)
reply = Element('xml')
to_user_name = Element('ToUserName')
to_user_name.text = user_open_id
reply.append(to_user_name)
from_user_name = Element('FromUserName')
from_user_name.text = public_account_id
reply.append(from_user_name)
create_time = Element('CreateTime')
create_time.text = str(int(time()))
reply.append(create_time)
msg_type = Element('MsgType')
msg_type.text = 'text'
reply.append(msg_type)
content = Element('Content')
content.text = reply_content
reply.append(content)
response_xml = tostring(reply, encoding='utf-8')
return Response(content=response_xml, media_type='application/xml')
def check_signature(token, signature, timestamp, nonce):
tmp_list = [token, timestamp, nonce]
tmp_list.sort()
tmp_str = ''.join(tmp_list)
tmp_str = sha1(tmp_str.encode('utf-8')).hexdigest()
return tmp_str == signature