Spaces:
Runtime error
Runtime error
import os | |
import re | |
import requests | |
import json | |
import gradio as gr | |
from langchain.chat_models import ChatOpenAI | |
from langchain import LLMChain, PromptTemplate | |
from langchain.memory import ConversationBufferMemory | |
OPENAI_API_KEY=os.getenv('OPENAI_API_KEY') | |
PLAY_HT_API_KEY=os.getenv('PLAY_HT_API_KEY') | |
PLAY_HT_USER_ID=os.getenv('PLAY_HT_USER_ID') | |
PLAY_HT_VOICE_ID=os.getenv('PLAY_HT_VOICE_ID') | |
play_ht_api_get_audio_url = "https://play.ht/api/v2/tts" | |
template = """You are a tech-savvy computer science student who spends countless hours coding, building apps, and keeping up with the latest tech trends. You enjoy discussing programming languages, AI, and gadgets and are always ready to troubleshoot tech-related problems. | |
{chat_history} | |
User: {user_message} | |
Chatbot:""" | |
prompt = PromptTemplate( | |
input_variables=["chat_history", "user_message"], template=template | |
) | |
memory = ConversationBufferMemory(memory_key="chat_history") | |
llm_chain = LLMChain( | |
llm=ChatOpenAI(temperature='0.5', model_name="gpt-3.5-turbo"), | |
prompt=prompt, | |
verbose=True, | |
memory=memory, | |
) | |
headers = { | |
"accept": "text/event-stream", | |
"content-type": "application/json", | |
"AUTHORIZATION": "Bearer "+ PLAY_HT_API_KEY, | |
"X-USER-ID": PLAY_HT_USER_ID | |
} | |
def get_payload(text): | |
return { | |
"text": text, | |
"voice": PLAY_HT_VOICE_ID, | |
"quality": "medium", | |
"output_format": "mp3", | |
"speed": 1, | |
"sample_rate": 24000, | |
"seed": None, | |
"temperature": None | |
} | |
def get_generated_audio(text): | |
payload = get_payload(text) | |
generated_response = {} | |
try: | |
response = requests.post(play_ht_api_get_audio_url, json=payload, headers=headers) | |
response.raise_for_status() | |
generated_response["type"]= 'SUCCESS' | |
generated_response["response"] = response.text | |
except requests.exceptions.RequestException as e: | |
generated_response["type"]= 'ERROR' | |
try: | |
response_text = json.loads(response.text) | |
if response_text['error_message']: | |
generated_response["response"] = response_text['error_message'] | |
else: | |
generated_response["response"] = response.text | |
except Exception as e: | |
generated_response["response"] = response.text | |
except Exception as e: | |
generated_response["type"]= 'ERROR' | |
generated_response["response"] = response.text | |
return generated_response | |
def extract_urls(text): | |
# Define the regex pattern for URLs | |
url_pattern = r'https?://(?:[-\w.]|(?:%[\da-fA-F]{2}))+[/\w\.-]*' | |
# Find all occurrences of URLs in the text | |
urls = re.findall(url_pattern, text) | |
return urls | |
def get_audio_reply_for_question(text): | |
generated_audio_event = get_generated_audio(text) | |
#From get_generated_audio, you will get events in a string format, from that we need to extract the url | |
final_response = { | |
"audio_url": '', | |
"message": '' | |
} | |
if generated_audio_event["type"] == 'SUCCESS': | |
audio_urls = extract_urls(generated_audio_event["response"]) | |
if len(audio_urls) == 0: | |
final_response['message'] = "No audio file link found in generated event" | |
else: | |
final_response['audio_url'] = audio_urls[-1] | |
else: | |
final_response['message'] = generated_audio_event['response'] | |
return final_response | |
def download_url(url): | |
try: | |
# Send a GET request to the URL to fetch the content | |
final_response = { | |
'content':'', | |
'error':'' | |
} | |
response = requests.get(url) | |
# Check if the request was successful (status code 200) | |
if response.status_code == 200: | |
final_response['content'] = response.content | |
else: | |
final_response['error'] = f"Failed to download the URL. Status code: {response.status_code}" | |
except Exception as e: | |
final_response['error'] = f"Failed to download the URL. Error: {e}" | |
return final_response | |
def get_filename_from_url(url): | |
# Use os.path.basename() to extract the file name from the URL | |
file_name = os.path.basename(url) | |
return file_name | |
def get_text_response(user_message): | |
response = llm_chain.predict(user_message = user_message) | |
return response | |
def get_text_response_and_audio_response(user_message): | |
response = get_text_response(user_message) # Getting the reply from Open AI | |
audio_reply_for_question_response = get_audio_reply_for_question(response) | |
final_response = { | |
'output_file_path': '', | |
'message':'' | |
} | |
audio_url = audio_reply_for_question_response['audio_url'] | |
if audio_url: | |
output_file_path=get_filename_from_url(audio_url) | |
download_url_response = download_url(audio_url) | |
audio_content = download_url_response['content'] | |
if audio_content: | |
with open(output_file_path, "wb") as audio_file: | |
audio_file.write(audio_content) | |
final_response['output_file_path'] = output_file_path | |
else: | |
final_response['message'] = download_url_response['error'] | |
else: | |
final_response['message'] = audio_reply_for_question_response['message'] | |
return final_response | |
def chat_bot_response(message, history): | |
text_and_audio_response = get_text_response_and_audio_response(message) | |
output_file_path = text_and_audio_response['output_file_path'] | |
if output_file_path: | |
return (text_and_audio_response['output_file_path'],) | |
else: | |
return text_and_audio_response['message'] | |
demo = gr.ChatInterface(chat_bot_response,examples=["How are you doing?","What are your interests?","Which places do you like to visit?"]) | |
if __name__ == "__main__": | |
demo.launch() #To create a public link, set `share=True` in `launch()`. To enable errors and logs, set `debug=True` in `launch()`. | |