Spaces:
Runtime error
Runtime error
import gradio as gr | |
import os | |
import logging | |
import requests | |
import tempfile | |
from langchain_openai import ChatOpenAI | |
from langchain_community.graphs import Neo4jGraph | |
import torch | |
import numpy as np | |
from transformers import pipeline, AutoModelForSpeechSeq2Seq, AutoProcessor | |
import threading | |
# Setup Neo4j connection | |
graph = Neo4jGraph( | |
url="neo4j+s://6457770f.databases.neo4j.io", | |
username="neo4j", | |
password="Z10duoPkKCtENuOukw3eIlvl0xJWKtrVSr-_hGX1LQ4" | |
) | |
# Function to clean input for Neo4j full-text query | |
def remove_lucene_chars(input: str) -> str: | |
return input.translate(str.maketrans({ | |
"\\": r"\\", "+": r"\+", "-": r"\-", "&": r"\&", "|": r"\|", "!": r"\!", | |
"(": r"\(", ")": r"\)", "{": r"\{", "}": r"\}", "[": r"\[", "]": r"\]", | |
"^": r"\^", "~": r"\~", "*": r"\*", "?": r"\?", ":": r"\:", '"': r'\"', | |
";": r"\;", " ": r"\ " | |
})) | |
# Function to generate a full-text query | |
def generate_full_text_query(input: str) -> str: | |
full_text_query = "" | |
words = [el for el in remove_lucene_chars(input).split() if el] | |
for word in words[:-1]: | |
full_text_query += f" {word}~2 AND" | |
full_text_query += f" {words[-1]}~2" | |
return full_text_query.strip() | |
# Define the function to query Neo4j and get a response | |
def get_response(question): | |
query = generate_full_text_query(question) | |
try: | |
# Query the Neo4j database using a full-text search | |
response = graph.query( | |
""" | |
CALL db.index.fulltext.queryNodes('entity', $query) | |
YIELD node, score | |
RETURN node.content AS content, score | |
ORDER BY score DESC LIMIT 1 | |
""", | |
{"query": query} | |
) | |
# Extract the content from the top response | |
if response: | |
result = response[0]['content'] | |
return result | |
else: | |
return "Sorry, I couldn't find any relevant information in the database." | |
except Exception as e: | |
logging.error(f"Error querying Neo4j: {e}") | |
return "An error occurred while fetching data from the database." | |
# Function to generate audio with Eleven Labs TTS | |
def generate_audio_elevenlabs(text): | |
XI_API_KEY = os.environ['ELEVENLABS_API'] | |
VOICE_ID = 'ehbJzYLQFpwbJmGkqbnW' | |
tts_url = f"https://api.elevenlabs.io/v1/text-to-speech/{VOICE_ID}/stream" | |
headers = { | |
"Accept": "application/json", | |
"xi-api-key": XI_API_KEY | |
} | |
data = { | |
"text": str(text), | |
"model_id": "eleven_multilingual_v2", | |
"voice_settings": { | |
"stability": 1.0, | |
"similarity_boost": 0.0, | |
"style": 0.60, | |
"use_speaker_boost": False | |
} | |
} | |
response = requests.post(tts_url, headers=headers, json=data, stream=True) | |
if response.ok: | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as f: | |
for chunk in response.iter_content(chunk_size=1024): | |
if chunk: | |
f.write(chunk) | |
audio_path = f.name | |
return audio_path | |
else: | |
return None | |
# Define ASR model for speech-to-text | |
model_id = 'openai/whisper-large-v3' | |
device = "cuda:0" if torch.cuda.is_available() else "cpu" | |
torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32 | |
model = AutoModelForSpeechSeq2Seq.from_pretrained(model_id, torch_dtype=torch_dtype).to(device) | |
processor = AutoProcessor.from_pretrained(model_id) | |
pipe_asr = pipeline( | |
"automatic-speech-recognition", | |
model=model, | |
tokenizer=processor.tokenizer, | |
feature_extractor=processor.feature_extractor, | |
max_new_tokens=128, | |
chunk_length_s=15, | |
batch_size=16, | |
torch_dtype=torch_dtype, | |
device=device, | |
return_timestamps=True | |
) | |
# Function to handle voice input, generate response from Neo4j, and return audio output | |
def handle_voice_to_voice(audio): | |
# Transcribe audio input to text | |
sr, y = audio | |
# Ensure that the audio is in float32 format | |
y = y.astype(np.float32) | |
y = y / np.max(np.abs(y)) # Normalize audio to range [-1.0, 1.0] | |
# Process the audio data with Whisper ASR | |
result = pipe_asr({"array": y, "sampling_rate": sr}, return_timestamps=False) | |
question = result.get("text", "") | |
# Get response using the transcribed question | |
response = get_response(question) | |
# Generate audio from the response | |
audio_path = generate_audio_elevenlabs(response) | |
return audio_path | |
# Define the Gradio interface | |
with gr.Blocks() as demo: | |
audio_input = gr.Audio(sources=["microphone"], type='numpy', streaming=False, label="Speak to Ask") | |
submit_voice_btn = gr.Button("Submit Voice") | |
audio_output = gr.Audio(label="Response Audio", type="filepath", autoplay=True, interactive=False) | |
# Interactions for Submit Voice Button | |
submit_voice_btn.click( | |
fn=handle_voice_to_voice, | |
inputs=audio_input, | |
outputs=audio_output | |
) | |
# Launch the Gradio interface | |
demo.launch(show_error=True, share=True) | |