Spaces:
Sleeping
Sleeping
File size: 5,524 Bytes
f5c3f87 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 |
import os
import re
import requests
import json
import gradio as gr
from langchain.chat_models import ChatOpenAI
from langchain import LLMChain, PromptTemplate
from langchain.memory import ConversationBufferMemory
OPENAI_API_KEY=os.getenv('OPENAI_API_KEY')
PLAY_HT_API_KEY=os.getenv('PLAY_HT_API_KEY')
PLAY_HT_USER_ID=os.getenv('PLAY_HT_USER_ID')
PLAY_HT_VOICE_ID=os.getenv('PLAY_HT_VOICE_ID')
play_ht_api_get_audio_url = "https://play.ht/api/v2/tts"
template = """You are a helpful assistant to answer user queries.
{chat_history}
User: {user_message}
Chatbot:"""
prompt = PromptTemplate(
input_variables=["chat_history", "user_message"], template=template
)
memory = ConversationBufferMemory(memory_key="chat_history")
llm_chain = LLMChain(
llm=ChatOpenAI(temperature='0.5', model_name="gpt-3.5-turbo"),
prompt=prompt,
verbose=True,
memory=memory,
)
headers = {
"accept": "text/event-stream",
"content-type": "application/json",
"AUTHORIZATION": "Bearer "+ PLAY_HT_API_KEY,
"X-USER-ID": PLAY_HT_USER_ID
}
def get_payload(text):
return {
"text": text,
"voice": PLAY_HT_VOICE_ID,
"quality": "medium",
"output_format": "mp3",
"speed": 1,
"sample_rate": 24000,
"seed": None,
"temperature": None
}
def get_generated_audio(text):
payload = get_payload(text)
generated_response = {}
try:
response = requests.post(play_ht_api_get_audio_url, json=payload, headers=headers)
response.raise_for_status()
generated_response["type"]= 'SUCCESS'
generated_response["response"] = response.text
except requests.exceptions.RequestException as e:
generated_response["type"]= 'ERROR'
try:
response_text = json.loads(response.text)
if response_text['error_message']:
generated_response["response"] = response_text['error_message']
else:
generated_response["response"] = response.text
except Exception as e:
generated_response["response"] = response.text
except Exception as e:
generated_response["type"]= 'ERROR'
generated_response["response"] = response.text
return generated_response
def extract_urls(text):
# Define the regex pattern for URLs
url_pattern = r'https?://(?:[-\w.]|(?:%[\da-fA-F]{2}))+[/\w\.-]*'
# Find all occurrences of URLs in the text
urls = re.findall(url_pattern, text)
return urls
def get_audio_reply_for_question(text):
generated_audio_event = get_generated_audio(text)
#From get_generated_audio, you will get events in a string format, from that we need to extract the url
final_response = {
"audio_url": '',
"message": ''
}
if generated_audio_event["type"] == 'SUCCESS':
audio_urls = extract_urls(generated_audio_event["response"])
if len(audio_urls) == 0:
final_response['message'] = "No audio file link found in generated event"
else:
final_response['audio_url'] = audio_urls[-1]
else:
final_response['message'] = generated_audio_event['response']
return final_response
def download_url(url):
try:
# Send a GET request to the URL to fetch the content
final_response = {
'content':'',
'error':''
}
response = requests.get(url)
# Check if the request was successful (status code 200)
if response.status_code == 200:
final_response['content'] = response.content
else:
final_response['error'] = f"Failed to download the URL. Status code: {response.status_code}"
except Exception as e:
final_response['error'] = f"Failed to download the URL. Error: {e}"
return final_response
def get_filename_from_url(url):
# Use os.path.basename() to extract the file name from the URL
file_name = os.path.basename(url)
return file_name
def get_text_response(user_message):
response = llm_chain.predict(user_message = user_message)
return response
def get_text_response_and_audio_response(user_message):
response = get_text_response(user_message) # Getting the reply from Open AI
audio_reply_for_question_response = get_audio_reply_for_question(response)
final_response = {
'output_file_path': '',
'message':''
}
audio_url = audio_reply_for_question_response['audio_url']
if audio_url:
output_file_path=get_filename_from_url(audio_url)
download_url_response = download_url(audio_url)
audio_content = download_url_response['content']
if audio_content:
with open(output_file_path, "wb") as audio_file:
audio_file.write(audio_content)
final_response['output_file_path'] = output_file_path
else:
final_response['message'] = download_url_response['error']
else:
final_response['message'] = audio_reply_for_question_response['message']
return final_response
def chat_bot_response(message, history):
text_and_audio_response = get_text_response_and_audio_response(message)
output_file_path = text_and_audio_response['output_file_path']
if output_file_path:
return (text_and_audio_response['output_file_path'],)
else:
return text_and_audio_response['message']
demo = gr.ChatInterface(chat_bot_response,examples=["How are you doing?","What are your interests?","Which places do you like to visit?"])
if __name__ == "__main__":
demo.launch() #To create a public link, set `share=True` in `launch()`. To enable errors and logs, set `debug=True` in `launch()`.
|