import gradio as gr import torch import requests import tempfile import threading import numpy as np from transformers import pipeline, AutoModelForSpeechSeq2Seq, AutoProcessor from langchain_openai import ChatOpenAI, OpenAIEmbeddings from langchain_community.vectorstores import Neo4jVector from langchain_community.graphs import Neo4jGraph from langchain_core.prompts import ChatPromptTemplate import time import os import io from pydub import AudioSegment from dataclasses import dataclass # Define AppState dataclass for managing the application's state @dataclass class AppState: stream: np.ndarray | None = None sampling_rate: int = 0 pause_detected: bool = False stopped: bool = False conversation: list = [] # Neo4j setup graph = Neo4jGraph( url="neo4j+s://c62d0d35.databases.neo4j.io", username="neo4j", password="_x8f-_aAQvs2NB0x6s0ZHSh3W_y-HrENDbgStvsUCM0" ) # Initialize the vector index with Neo4j vector_index = Neo4jVector.from_existing_graph( OpenAIEmbeddings(api_key=os.environ['OPENAI_API_KEY']), graph=graph, search_type="hybrid", node_label="Document", text_node_properties=["text"], embedding_node_property="embedding", ) # Define the ASR model with Whisper model_id = 'openai/whisper-large-v3' device = "cuda:0" if torch.cuda.is_available() else "cpu" torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32 model = AutoModelForSpeechSeq2Seq.from_pretrained(model_id, torch_dtype=torch_dtype).to(device) processor = AutoProcessor.from_pretrained(model_id) pipe_asr = pipeline( "automatic-speech-recognition", model=model, tokenizer=processor.tokenizer, feature_extractor=processor.feature_extractor, max_new_tokens=128, chunk_length_s=15, batch_size=16, torch_dtype=torch_dtype, device=device, return_timestamps=True ) # Function to reset the state after 10 seconds def auto_reset_state(): time.sleep(2) return AppState() # Reset the state # Function to process audio input and transcribe it def transcribe_function(state: AppState, new_chunk): try: sr, y = new_chunk[0], new_chunk[1] except TypeError: print(f"Error chunk structure: {type(new_chunk)}, content: {new_chunk}") return state, "" if y is None or len(y) == 0: return state, "" y = y.astype(np.float32) max_abs_y = np.max(np.abs(y)) if max_abs_y > 0: y = y / max_abs_y if state.stream is not None and len(state.stream) > 0: state.stream = np.concatenate([state.stream, y]) else: state.stream = y result = pipe_asr({"array": state.stream, "sampling_rate": sr}, return_timestamps=False) full_text = result.get("text", "") threading.Thread(target=auto_reset_state).start() return state, full_text # Function to generate a response using the prompt and the context def generate_response_with_prompt(context, question): formatted_prompt = prompt.format(context=context, question=question) llm = ChatOpenAI(temperature=0, api_key=os.environ['OPENAI_API_KEY']) response = llm(formatted_prompt) return response.content.strip() # Function to generate audio with Eleven Labs TTS def generate_audio_elevenlabs(text): XI_API_KEY = os.environ['ELEVENLABS_API'] VOICE_ID = 'ehbJzYLQFpwbJmGkqbnW' tts_url = f"https://api.elevenlabs.io/v1/text-to-speech/{VOICE_ID}/stream" headers = {"Accept": "application/json", "xi-api-key": XI_API_KEY} data = {"text": text, "model_id": "eleven_multilingual_v2", "voice_settings": {"stability": 1.0}} response = requests.post(tts_url, headers=headers, json=data, stream=True) if response.ok: with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as f: for chunk in response.iter_content(chunk_size=1024): f.write(chunk) return f.name else: print(f"Error generating audio: {response.text}") return None # Define the function to retrieve information using Neo4j and the vector store def retriever(question: str): structured_query = """ CALL db.index.fulltext.queryNodes('entity', $query, {limit: 2}) YIELD node, score RETURN node.id AS entity, node.text AS context, score ORDER BY score DESC LIMIT 2 """ structured_data = graph.query(structured_query, {"query": generate_full_text_query(question)}) structured_response = "\n".join([f"{record['entity']}: {record['context']}" for record in structured_data]) unstructured_data = [el.page_content for el in vector_index.similarity_search(question)] unstructured_response = "\n".join(unstructured_data) combined_context = f"Structured data:\n{structured_response}\n\nUnstructured data:\n{unstructured_response}" return generate_response_with_prompt(combined_context, question) # Function to handle the entire audio query and response process def process_audio_query(state: AppState, audio_input): state, transcription = transcribe_function(state, audio_input) response_text = retriever(transcription) audio_path = generate_audio_elevenlabs(response_text) return audio_path, state # Create Gradio interface for audio input and output with gr.Blocks() as interface: audio_input = gr.Audio(sources="microphone", type="numpy", streaming=True, every=0.1) submit_button = gr.Button("Submit") audio_output = gr.Audio(type="filepath", autoplay=True) state = gr.State(AppState()) submit_button.click(fn=process_audio_query, inputs=[state, audio_input], outputs=[audio_output, state]) # Launch the Gradio app interface.launch()