from fastapi import FastAPI, status from fastapi.responses import HTMLResponse from pydantic import BaseModel from fastapi.responses import JSONResponse, StreamingResponse import requests import json import openai import time class Text(BaseModel): content: str = "" app = FastAPI() key = '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' openai.api_key = key headers = { 'Content-Type': 'application/json', 'Authorization': 'Bearer ' + key } @app.get("/") def home(): html_content = open('index.html').read() return HTMLResponse(content=html_content, status_code=200) @app.post("/qa_maker") def sentiment_analysis_ep(content: Text = None): url = 'https://api.openai.com/v1/chat/completions' prompt = '根据下面的文章,生成的“问题和回答”QA对,大于5个,以一行一个json格式({“question”:"xxx","answer":"xxx"})生成:\n' messages = [{"role": "user", "content": prompt + content.content}] data = { "model": "gpt-3.5-turbo", "messages": messages } print("messages = \n", messages) result = requests.post(url=url, data=json.dumps(data), headers=headers ) res = str(result.json()['choices'][0]['message']['content']).strip() print('res:', res) res = {'content': res} return JSONResponse(content=res) @app.post("/chatpdf") def chat_pdf_ep(content: Text = None): url = 'https://api.openai.com/v1/chat/completions' messages = [ { "role": "system", "content": "你是一个有用的助手,可以使用文章内容准确地回答问题。使用提供的文章来生成你的答案,但避免逐字复制文章。尽可能使用自己的话。准确、有用、简洁、清晰。" } ] obj = json.loads(content.content) messages.append({"role": "system", "content": "文章内容:\n" + obj['doc']}) history = obj['history'] for his in history: messages.append({"role": "user", "content": his[0]}) messages.append({"role": "assistant", "content": his[1]}) messages.append({"role": "user", "content": obj['question']}) data = { "model": "gpt-3.5-turbo", "messages": messages } print("messages = \n", messages) result = requests.post(url=url, data=json.dumps(data), headers=headers ) res = str(result.json()['choices'][0]['message']['content']).strip() content = {'content': res} print('content:', content) return JSONResponse(content=content) @app.post("/sale") def sale_ep(content: Text = None): url = 'https://api.openai.com/v1/chat/completions' messages = [ { "role": "system", "content": "你是小鹏汽车资深的销售,可以参考文章内容来生成你的答案,但避免逐字复制文章,尽可能使用自己的话,突出小鹏汽车的优点、说服用户来店试驾或购买小鹏汽车。准确、有用、有销售技巧、口语化、有亲和力。" } ] obj = json.loads(content.content) messages.append({"role": "system", "content": "文章内容:\n" + obj['doc']}) history = obj['history'] for his in history: messages.append({"role": "user", "content": his[0]}) messages.append({"role": "assistant", "content": his[1]}) messages.append({"role": "user", "content": obj['question']}) data = { "model": "gpt-3.5-turbo", "messages": messages } print("messages = \n", messages) result = requests.post(url=url, data=json.dumps(data), headers=headers ) res = str(result.json()['choices'][0]['message']['content']).strip() content = {'content': res} print('content:', content) return JSONResponse(content=content) @app.post("/chatgpt") def chat_gpt_ep(content: Text = None): url = 'https://api.openai.com/v1/chat/completions' obj = json.loads(content.content) data = { "model": "gpt-3.5-turbo", "messages": obj['messages'] } print("data = \n", data) key = obj['key'] openai.api_key = key headers = { 'Content-Type': 'application/json', 'Authorization': 'Bearer ' + key } result = requests.post(url=url, data=json.dumps(data), headers=headers ) res = str(result.json()['choices'][0]['message']['content']).strip() content = {'content': res} print('content:', content) return JSONResponse(content=content) async def chat_gpt_stream_fun(content: Text = None): start_time = time.time() obj = json.loads(content.content) response = openai.ChatCompletion.create( model='gpt-3.5-turbo', messages=obj['messages'], stream=True, # this time, we set stream=True ) # create variables to collect the stream of chunks collected_chunks = [] collected_messages = [] # iterate through the stream of events for chunk in response: chunk_time = time.time() - start_time # calculate the time delay of the chunk collected_chunks.append(chunk) # save the event response chunk_message = chunk['choices'][0]['delta'] # extract the message collected_messages.append(chunk_message) # save the message print(f"Message received {chunk_time:.2f} seconds after request: {chunk_message}") # print the delay and text full_reply_content = ''.join([m.get('content', '') for m in collected_messages]) print(f"Full conversation received: {full_reply_content}") content = {'content': full_reply_content} print('content:', content) yield json.dumps(content) + '\n' @app.post("/chatgptstream", status_code=status.HTTP_200_OK) async def get_random_numbers(content: Text = None): return StreamingResponse(chat_gpt_stream_fun(content), media_type='application/json') @app.post("/embeddings") def embeddings_ep(content: Text = None): url = 'https://api.openai.com/v1/embeddings' data = { "model": "text-embedding-ada-002", "input": content.content } result = requests.post(url=url, data=json.dumps(data), headers=headers ) return JSONResponse(content=result.json()) @app.post("/create_image") def create_image_ep(content: Text = None): url = 'https://api.openai.com/v1/images/generations' obj = json.loads(content.content) data = { "prompt": obj["prompt"], "n": obj["n"], "size": obj["size"] } print("data = \n", data) result = requests.post(url=url, data=json.dumps(data), headers=headers ) return JSONResponse(content=result.json()) from fastapi import FastAPI, Request, Response from fastapi.responses import PlainTextResponse from hashlib import sha1 from time import time from xml.etree.ElementTree import Element, tostring def chat_gpt_response(prompt): # Replace with your GPT-3.5 implementation return "你好呀,小哥哥" @app.get('/wechat') def verify_server_address(signature: str, timestamp: str, nonce: str, echostr: str): token = 'zsj' if check_signature(token, signature, timestamp, nonce): return PlainTextResponse(echostr) @app.post('/wechat') def process_message(request: Request): xml_data = await request.body() xml_tree = ElementTree.fromstring(xml_data) msg_type = xml_tree.find('MsgType').text if msg_type == 'text': content = xml_tree.find('Content').text user_open_id = xml_tree.find('FromUserName').text public_account_id = xml_tree.find('ToUserName').text reply_content = chat_gpt_response(content) reply = Element('xml') to_user_name = Element('ToUserName') to_user_name.text = user_open_id reply.append(to_user_name) from_user_name = Element('FromUserName') from_user_name.text = public_account_id reply.append(from_user_name) create_time = Element('CreateTime') create_time.text = str(int(time())) reply.append(create_time) msg_type = Element('MsgType') msg_type.text = 'text' reply.append(msg_type) content = Element('Content') content.text = reply_content reply.append(content) response_xml = tostring(reply, encoding='utf-8') return Response(content=response_xml, media_type='application/xml') def check_signature(token, signature, timestamp, nonce): tmp_list = [token, timestamp, nonce] tmp_list.sort() tmp_str = ''.join(tmp_list) tmp_str = sha1(tmp_str.encode('utf-8')).hexdigest() return tmp_str == signature