Spaces:
Runtime error
Runtime error
import os | |
import whisper | |
from io import BytesIO | |
import base64 | |
import boto3 | |
from pydub import AudioSegment | |
from pydub.playback import play | |
import logging | |
from langchain import OpenAI | |
from langchain.chains import RetrievalQA | |
from langchain.vectorstores import Chroma | |
from langchain.document_loaders import DirectoryLoader | |
from langchain.embeddings.openai import OpenAIEmbeddings | |
from langchain.text_splitter import CharacterTextSplitter | |
OPENAI_API_KEY = os.getenv('OPENAI_API_KEY') | |
AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID') | |
AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY') | |
AWS_REGION_NAME = 'ap-south-1' | |
logging.basicConfig(level="INFO", | |
filename='conversations.log', | |
filemode='a', | |
format='%(asctime)s %(message)s', | |
datefmt='%H:%M:%S') | |
def buzz_user(): | |
input_prompt = AudioSegment.from_mp3('assets/timeout_audio.mp3') | |
play(input_prompt) | |
def initialize_knowledge_base(): | |
loader = DirectoryLoader('profiles', glob='**/*.txt') | |
docs = loader.load() | |
char_text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0) | |
doc_texts = char_text_splitter.split_documents(docs) | |
openAI_embeddings = OpenAIEmbeddings() | |
vStore = Chroma.from_documents(doc_texts, openAI_embeddings) | |
conv_model = RetrievalQA.from_chain_type( | |
llm=OpenAI(), | |
chain_type="stuff", | |
retriever=vStore.as_retriever( | |
search_kwargs={"k": 1} | |
) | |
) | |
voice_model = whisper.load_model("tiny") | |
return conv_model, voice_model | |
def text_to_speech_gen(answer): | |
polly = boto3.client('polly', | |
aws_access_key_id=AWS_ACCESS_KEY_ID, | |
aws_secret_access_key=AWS_SECRET_ACCESS_KEY, | |
region_name=AWS_REGION_NAME) | |
response = polly.synthesize_speech( | |
Text=answer, | |
VoiceId='Matthew', | |
OutputFormat='mp3', | |
Engine = "neural") | |
audio_stream = response['AudioStream'].read() | |
audio_html = audio_to_html(audio_stream) | |
return audio_html | |
def audio_to_html(audio_bytes): | |
audio_io = BytesIO(audio_bytes) | |
audio_io.seek(0) | |
audio_base64 = base64.b64encode(audio_io.read()).decode("utf-8") | |
audio_html = f'<audio src="data:audio/mpeg;base64,{audio_base64}" controls autoplay></audio>' | |
return audio_html | |
def get_chat_history(user_message, history): | |
return "", history + [[user_message, None]] | |