|
|
|
import os |
|
import base64 |
|
import json |
|
import cv2 |
|
import moviepy.editor as mp |
|
import gradio as gr |
|
from pathlib import Path |
|
from llama_index.core import Settings |
|
from llama_index.core import StorageContext |
|
from llama_index.core import SimpleDirectoryReader |
|
from llama_index.core.indices.multi_modal.base import MultiModalVectorStoreIndex |
|
from llama_index.embeddings.mistralai import MistralAIEmbedding |
|
from llama_index.vector_stores.milvus import MilvusVectorStore |
|
|
|
|
|
|
|
Settings.embed_model = MistralAIEmbedding( |
|
"mistral-embed", |
|
api_key=os.getenv('MISTRAL_API_KEY') |
|
) |
|
|
|
|
|
index = None |
|
metadata = None |
|
|
|
|
|
def process_video(video_path, output_folder, output_audio_path): |
|
Path(output_folder).mkdir(parents=True, exist_ok=True) |
|
video_to_images(video_path, output_folder) |
|
video_to_audio(video_path, output_audio_path) |
|
with open(os.path.join(output_folder, "output_text.txt"), "w") as file: |
|
file.write(audio_to_text(output_audio_path)) |
|
os.remove(output_audio_path) |
|
|
|
response = f"Video path: {video_path}\nAudio path: {output_audio_path}\nText: {output_audio_path}" |
|
return response |
|
|
|
def video_to_images(video_path, output_folder, frame_interval=30): |
|
cap = cv2.VideoCapture(video_path) |
|
frame_count = 0 |
|
while cap.isOpened(): |
|
ret, frame = cap.read() |
|
if not ret: |
|
break |
|
if frame_count % frame_interval == 0: |
|
cv2.imwrite(f"{output_folder}/frame_{frame_count}.jpg", frame) |
|
frame_count += 1 |
|
cap.release() |
|
|
|
def audio_to_text(audio_path): |
|
from openai import OpenAI |
|
client = OpenAI(api_key=os.getenv('OPENAI_API_KEY')) |
|
with open(audio_path, "rb") as audio_file: |
|
transcript = client.audio.transcriptions.create( |
|
model="whisper-1", |
|
file=audio_file |
|
) |
|
return transcript.text |
|
|
|
def video_to_audio(video_path, output_path): |
|
video = mp.VideoFileClip(video_path) |
|
video.audio.write_audiofile(output_path) |
|
|
|
def create_index(output_folder): |
|
text_store = MilvusVectorStore( |
|
uri="milvus_local.db", |
|
collection_name="text_collection", |
|
overwrite=True, |
|
dim=1024 |
|
) |
|
image_store = MilvusVectorStore( |
|
uri="milvus_local.db", |
|
collection_name="image_collection", |
|
overwrite=True, |
|
dim=512 |
|
) |
|
storage_context = StorageContext.from_defaults( |
|
vector_store=text_store, |
|
image_store=image_store |
|
) |
|
documents = SimpleDirectoryReader(output_folder).load_data() |
|
return MultiModalVectorStoreIndex.from_documents( |
|
documents, |
|
storage_context=storage_context |
|
) |
|
|
|
|
|
def process_video_callback(video_file): |
|
global index, metadata |
|
|
|
output_folder = "output" |
|
output_audio_path = "output/audio.wav" |
|
video_path = video_file.name |
|
|
|
|
|
metadata = process_video(video_path, output_folder, output_audio_path) |
|
|
|
index = create_index(output_folder) |
|
return "Video processed successfully!" |
|
|
|
def query_video_callback(query): |
|
global index, metadata |
|
if not index: |
|
return "No video index found. Please upload and process a video first." |
|
|
|
|
|
|
|
retrieval_result = index.as_retriever().retrieve(query) |
|
text_contexts = [] |
|
image_documents = [] |
|
for node in retrieval_result: |
|
if hasattr(node.node, 'image'): |
|
image_documents.append(node.node) |
|
else: |
|
text_contexts.append(node.node.text) |
|
|
|
|
|
context_str = "\n".join(text_contexts) |
|
metadata_str = json.dumps(metadata, indent=2) |
|
|
|
|
|
if image_documents: |
|
response = f"Text Context: {context_str}\nMetadata: {metadata_str}\nImage Documents Found: {len(image_documents)}" |
|
else: |
|
response = "No relevant images found to answer the query." |
|
|
|
return response |
|
|
|
|
|
with gr.Blocks() as demo: |
|
gr.Markdown("## Multi-Modal RAG with Gradio") |
|
video_input = gr.File(label="Upload a Video", file_types=[".mp4", ".avi"]) |
|
process_button = gr.Button("Process Video") |
|
query_input = gr.Textbox(label="Ask a Question About the Video") |
|
query_button = gr.Button("Submit Query") |
|
output_text = gr.Textbox(label="Response") |
|
|
|
process_button.click(process_video_callback, inputs=video_input, outputs=output_text) |
|
query_button.click(query_video_callback, inputs=query_input, outputs=output_text) |
|
|
|
demo.launch(debug=True) |