|
from fastapi import FastAPI, status |
|
from fastapi.responses import HTMLResponse |
|
from pydantic import BaseModel |
|
from fastapi.responses import JSONResponse, StreamingResponse |
|
import requests |
|
import json |
|
import openai |
|
import time |
|
|
|
|
|
class Text(BaseModel): |
|
content: str = "" |
|
|
|
|
|
app = FastAPI() |
|
key = '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' |
|
openai.api_key = key |
|
headers = { |
|
'Content-Type': 'application/json', |
|
'Authorization': 'Bearer ' + key |
|
} |
|
|
|
|
|
@app.get("/") |
|
def home(): |
|
html_content = open('index.html').read() |
|
return HTMLResponse(content=html_content, status_code=200) |
|
|
|
|
|
@app.post("/qa_maker") |
|
def sentiment_analysis_ep(content: Text = None): |
|
url = 'https://api.openai.com/v1/chat/completions' |
|
prompt = '根据下面的文章,生成的“问题和回答”QA对,大于5个,以一行一个json格式({“question”:"xxx","answer":"xxx"})生成:\n' |
|
messages = [{"role": "user", "content": prompt + content.content}] |
|
data = { |
|
"model": "gpt-3.5-turbo", |
|
"messages": messages |
|
} |
|
print("messages = \n", messages) |
|
result = requests.post(url=url, |
|
data=json.dumps(data), |
|
headers=headers |
|
) |
|
res = str(result.json()['choices'][0]['message']['content']).strip() |
|
print('res:', res) |
|
res = {'content': res} |
|
return JSONResponse(content=res) |
|
|
|
|
|
@app.post("/chatpdf") |
|
def chat_pdf_ep(content: Text = None): |
|
url = 'https://api.openai.com/v1/chat/completions' |
|
messages = [ |
|
{ |
|
"role": "system", |
|
"content": "你是一个有用的助手,可以使用文章内容准确地回答问题。使用提供的文章来生成你的答案,但避免逐字复制文章。尽可能使用自己的话。准确、有用、简洁、清晰。" |
|
} |
|
] |
|
obj = json.loads(content.content) |
|
messages.append({"role": "system", "content": "文章内容:\n" + obj['doc']}) |
|
history = obj['history'] |
|
for his in history: |
|
messages.append({"role": "user", "content": his[0]}) |
|
messages.append({"role": "assistant", "content": his[1]}) |
|
messages.append({"role": "user", "content": obj['question']}) |
|
data = { |
|
"model": "gpt-3.5-turbo", |
|
"messages": messages |
|
} |
|
print("messages = \n", messages) |
|
result = requests.post(url=url, |
|
data=json.dumps(data), |
|
headers=headers |
|
) |
|
res = str(result.json()['choices'][0]['message']['content']).strip() |
|
content = {'content': res} |
|
print('content:', content) |
|
return JSONResponse(content=content) |
|
|
|
|
|
@app.post("/sale") |
|
def sale_ep(content: Text = None): |
|
url = 'https://api.openai.com/v1/chat/completions' |
|
messages = [ |
|
{ |
|
"role": "system", |
|
"content": "你是小鹏汽车资深的销售,可以参考文章内容来生成你的答案,但避免逐字复制文章,尽可能使用自己的话,突出小鹏汽车的优点、说服用户来店试驾或购买小鹏汽车。准确、有用、有销售技巧、口语化、有亲和力。" |
|
} |
|
] |
|
obj = json.loads(content.content) |
|
messages.append({"role": "system", "content": "文章内容:\n" + obj['doc']}) |
|
history = obj['history'] |
|
for his in history: |
|
messages.append({"role": "user", "content": his[0]}) |
|
messages.append({"role": "assistant", "content": his[1]}) |
|
messages.append({"role": "user", "content": obj['question']}) |
|
data = { |
|
"model": "gpt-3.5-turbo", |
|
"messages": messages |
|
} |
|
print("messages = \n", messages) |
|
result = requests.post(url=url, |
|
data=json.dumps(data), |
|
headers=headers |
|
) |
|
res = str(result.json()['choices'][0]['message']['content']).strip() |
|
content = {'content': res} |
|
print('content:', content) |
|
return JSONResponse(content=content) |
|
|
|
|
|
@app.post("/chatgpt") |
|
def chat_gpt_ep(content: Text = None): |
|
url = 'https://api.openai.com/v1/chat/completions' |
|
obj = json.loads(content.content) |
|
data = { |
|
"model": "gpt-3.5-turbo", |
|
"messages": obj['messages'] |
|
} |
|
print("data = \n", data) |
|
key = obj['key'] |
|
openai.api_key = key |
|
headers = { |
|
'Content-Type': 'application/json', |
|
'Authorization': 'Bearer ' + key |
|
} |
|
result = requests.post(url=url, |
|
data=json.dumps(data), |
|
headers=headers |
|
) |
|
res = str(result.json()['choices'][0]['message']['content']).strip() |
|
content = {'content': res} |
|
print('content:', content) |
|
return JSONResponse(content=content) |
|
|
|
|
|
async def chat_gpt_stream_fun(content: Text = None): |
|
start_time = time.time() |
|
obj = json.loads(content.content) |
|
response = openai.ChatCompletion.create( |
|
model='gpt-3.5-turbo', |
|
messages=obj['messages'], |
|
stream=True, |
|
) |
|
|
|
collected_chunks = [] |
|
collected_messages = [] |
|
|
|
for chunk in response: |
|
chunk_time = time.time() - start_time |
|
collected_chunks.append(chunk) |
|
chunk_message = chunk['choices'][0]['delta'] |
|
collected_messages.append(chunk_message) |
|
print(f"Message received {chunk_time:.2f} seconds after request: {chunk_message}") |
|
full_reply_content = ''.join([m.get('content', '') for m in collected_messages]) |
|
print(f"Full conversation received: {full_reply_content}") |
|
content = {'content': full_reply_content} |
|
print('content:', content) |
|
yield json.dumps(content) + '\n' |
|
|
|
|
|
@app.post("/chatgptstream", status_code=status.HTTP_200_OK) |
|
async def get_random_numbers(content: Text = None): |
|
return StreamingResponse(chat_gpt_stream_fun(content), media_type='application/json') |
|
|
|
|
|
@app.post("/embeddings") |
|
def embeddings_ep(content: Text = None): |
|
url = 'https://api.openai.com/v1/embeddings' |
|
data = { |
|
"model": "text-embedding-ada-002", |
|
"input": content.content |
|
} |
|
result = requests.post(url=url, |
|
data=json.dumps(data), |
|
headers=headers |
|
) |
|
return JSONResponse(content=result.json()) |
|
|
|
|
|
@app.post("/create_image") |
|
def create_image_ep(content: Text = None): |
|
url = 'https://api.openai.com/v1/images/generations' |
|
obj = json.loads(content.content) |
|
data = { |
|
"prompt": obj["prompt"], |
|
"n": obj["n"], |
|
"size": obj["size"] |
|
} |
|
print("data = \n", data) |
|
result = requests.post(url=url, |
|
data=json.dumps(data), |
|
headers=headers |
|
) |
|
return JSONResponse(content=result.json()) |
|
|
|
|
|
|
|
|
|
from fastapi import FastAPI, Request, Response |
|
from fastapi.responses import PlainTextResponse |
|
from hashlib import sha1 |
|
from time import time |
|
from xml.etree.ElementTree import Element, tostring |
|
|
|
|
|
|
|
|
|
def chat_gpt_response(prompt): |
|
|
|
return "你好呀,小哥哥" |
|
|
|
|
|
@app.get('/wechat') |
|
def verify_server_address(signature: str, timestamp: str, nonce: str, echostr: str): |
|
token = 'zsj' |
|
if check_signature(token, signature, timestamp, nonce): |
|
return PlainTextResponse(echostr) |
|
|
|
|
|
@app.post('/wechat') |
|
def process_message(request: Request): |
|
xml_data = await request.body() |
|
xml_tree = ElementTree.fromstring(xml_data) |
|
|
|
msg_type = xml_tree.find('MsgType').text |
|
if msg_type == 'text': |
|
content = xml_tree.find('Content').text |
|
user_open_id = xml_tree.find('FromUserName').text |
|
public_account_id = xml_tree.find('ToUserName').text |
|
|
|
reply_content = chat_gpt_response(content) |
|
|
|
reply = Element('xml') |
|
to_user_name = Element('ToUserName') |
|
to_user_name.text = user_open_id |
|
reply.append(to_user_name) |
|
|
|
from_user_name = Element('FromUserName') |
|
from_user_name.text = public_account_id |
|
reply.append(from_user_name) |
|
|
|
create_time = Element('CreateTime') |
|
create_time.text = str(int(time())) |
|
reply.append(create_time) |
|
|
|
msg_type = Element('MsgType') |
|
msg_type.text = 'text' |
|
reply.append(msg_type) |
|
|
|
content = Element('Content') |
|
content.text = reply_content |
|
reply.append(content) |
|
|
|
response_xml = tostring(reply, encoding='utf-8') |
|
return Response(content=response_xml, media_type='application/xml') |
|
|
|
|
|
def check_signature(token, signature, timestamp, nonce): |
|
tmp_list = [token, timestamp, nonce] |
|
tmp_list.sort() |
|
tmp_str = ''.join(tmp_list) |
|
tmp_str = sha1(tmp_str.encode('utf-8')).hexdigest() |
|
|
|
return tmp_str == signature |
|
|
|
|