raphgonda's picture
Update app.py
29bc38c verified
import gradio as gr
import pandas as pd
import cv2
import torch
import tempfile
import os
import librosa
from fer import FER
from transformers import AutoModelForAudioClassification, pipeline
from moviepy.editor import VideoFileClip, AudioFileClip
import numpy as np
from torch.nn.functional import softmax
import whisper_timestamped as whisper
from translate import Translator
# Load pre-trained models
audio_model = AutoModelForAudioClassification.from_pretrained("3loi/SER-Odyssey-Baseline-WavLM-Categorical-Attributes", trust_remote_code=True)
face_detector = FER(mtcnn=True)
classifier = pipeline("text-classification", model="j-hartmann/emotion-english-distilroberta-base", top_k=None)
# Set mean and std for audio model
mean = audio_model.config.mean
std = audio_model.config.std
# Function to extract audio from video for audio emotion analysis
def extract_audio_from_video(video_path):
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_audio_file:
video_clip = VideoFileClip(video_path)
audio_clip = video_clip.audio
audio_clip.write_audiofile(temp_audio_file.name, codec="pcm_s16le")
return temp_audio_file.name
# Function to perform audio emotion detection per second
def process_audio_and_detect_emotions(audio_clip):
audio_np = np.array(audio_clip)
mask = torch.ones(1, len(audio_np))
wavs = torch.tensor(audio_np).unsqueeze(0)
with torch.no_grad():
pred = audio_model(wavs, mask)
logits = pred.logits if hasattr(pred, 'logits') else pred[0]
labels = {0: 'Angry', 1: 'Sad', 2: 'Happy', 3: 'Surprise', 4: 'Fear', 5: 'Disgust', 7: 'Neutral'}
probabilities = softmax(logits, dim=-1).squeeze(0)[[0, 1, 2, 3, 4, 5, 7]]
probabilities = probabilities / probabilities.sum()
df = pd.DataFrame([probabilities.numpy()], columns=labels.values())
return df
# Function to analyze audio emotions
def analyze_audio_emotions(video_path):
temp_audio_path = None
try:
temp_audio_path = extract_audio_from_video(video_path)
raw_wav, _ = librosa.load(temp_audio_path, sr=audio_model.config.sampling_rate)
norm_wav = (raw_wav - mean) / (std + 0.000001)
times = []
emotions_dfs = []
for start_time in range(0, len(norm_wav), audio_model.config.sampling_rate):
audio_segment = norm_wav[start_time:start_time + audio_model.config.sampling_rate]
df = process_audio_and_detect_emotions(audio_segment)
times.append(start_time / audio_model.config.sampling_rate)
emotions_dfs.append(df)
emotions_df = pd.concat(emotions_dfs, ignore_index=True)
emotions_df.insert(0, "Time(s)", times)
emotion_rename_map = {'Angry': 'anger', 'Sad': 'sadness', 'Happy': 'happy', 'Surprise': 'surprise', 'Fear': 'fear', 'Disgust': 'disgust', 'Neutral': 'neutral'}
emotions_df.rename(columns=emotion_rename_map, inplace=True)
emotions_xlsx_path = tempfile.NamedTemporaryFile(suffix=".xlsx", delete=False).name
emotions_df.to_excel(emotions_xlsx_path, index=False)
return f"Audio emotion detection completed successfully.", emotions_df, emotions_xlsx_path
except Exception as e:
return f"Error during audio emotion detection: {str(e)}", None, None
finally:
if temp_audio_path and os.path.exists(temp_audio_path):
os.remove(temp_audio_path)
# Function to detect facial emotions
def detect_faces_and_emotions(video_path):
temp_video_path = None
temp_audio_path = None
output_video_path = None
emotions_data = []
try:
temp_video = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False)
temp_video_path = temp_video.name
temp_audio = tempfile.NamedTemporaryFile(suffix=".mp3", delete=False)
temp_audio_path = temp_audio.name
output_xlsx = tempfile.NamedTemporaryFile(suffix=".xlsx", delete=False)
output_xlsx_path = output_xlsx.name
original_video = VideoFileClip(video_path)
original_audio = original_video.audio
original_audio.write_audiofile(temp_audio_path)
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
raise Exception("Error: Could not open video file.")
fps = int(cap.get(cv2.CAP_PROP_FPS))
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(temp_video_path, fourcc, fps, (frame_width, frame_height))
frame_number = 0
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
if frame is None:
continue
time_seconds = round(frame_number / fps)
result = face_detector.detect_emotions(frame)
for face in result:
bounding_box = face["box"]
emotions = face["emotions"]
emotions["Time(s)"] = time_seconds
emotions_data.append(emotions)
cv2.rectangle(frame, (bounding_box[0], bounding_box[1]),
(bounding_box[0] + bounding_box[2], bounding_box[1] + bounding_box[3]), (0, 155, 255), 2)
for index, (emotion_name, score) in enumerate(emotions.items()):
color = (211, 211, 211) if score < 0.01 else (255, 0, 0)
emotion_score = "{}: {:.2f}".format(emotion_name, score)
cv2.putText(frame, emotion_score, (bounding_box[0], bounding_box[1] + bounding_box[3] + 30 + index * 15),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1, cv2.LINE_AA)
out.write(frame)
frame_number += 1
cap.release()
out.release()
emotions_df = pd.DataFrame(emotions_data)
emotions_df['Time(s)'] = emotions_df['Time(s)'].round().astype(int)
max_time = emotions_df['Time(s)'].max()
all_times = pd.DataFrame({'Time(s)': range(max_time + 1)})
avg_scores = emotions_df.groupby("Time(s)").mean().reset_index()
df_merged = pd.merge(all_times, avg_scores, on='Time(s)', how='left')
df_merged.fillna(0, inplace=True)
df_merged['Time(s)'] = df_merged['Time(s)'].astype(str) + " sec"
df_merged.to_excel(output_xlsx_path, index=False)
processed_video = VideoFileClip(temp_video_path)
audio = AudioFileClip(temp_audio_path)
final_video = processed_video.set_audio(audio)
output_video = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False)
output_video_path = output_video.name
final_video.write_videofile(output_video_path, codec='libx264')
return "Face and emotion detection completed successfully.", df_merged, output_xlsx_path, output_video_path
except Exception as e:
return f"Error during processing: {str(e)}", None, None, None
finally:
if temp_video_path and os.path.exists(temp_video_path):
os.remove(temp_video_path)
if temp_audio_path and os.path.exists(temp_audio_path):
os.remove(temp_audio_path)
# Function to analyze text emotions
def process_video_text(video_path):
temp_audio_path = None
try:
video_clip = VideoFileClip(video_path)
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_audio_file:
temp_audio_path = temp_audio_file.name
video_clip.audio.write_audiofile(temp_audio_path)
audio = whisper.load_audio(temp_audio_path)
model = whisper.load_model("medium", device="cpu")
result = whisper.transcribe(model, audio)
# Create lists to store word-level data with timestamps
word_texts = []
word_starts = []
word_ends = []
word_confidences = []
for segment in result['segments']:
for word in segment['words']:
word_texts.append(word['text'])
word_starts.append(word['start'])
word_ends.append(word['end'])
word_confidences.append(word['confidence'])
# Create segments DataFrame
segments_data = [{'text': seg['text'], 'start': seg['start'], 'end': seg['end'], 'confidence': seg['confidence']} for seg in result['segments']]
segments_df = pd.DataFrame(segments_data)
# Translate from Korean to English
translator = Translator(from_lang='ko', to_lang='en')
segments_df['Translated_Text'] = segments_df['text'].apply(lambda x: translator.translate(x))
# Apply the sentiment analysis model to the translated text
segments_df['Sentiment_Scores'] = segments_df['Translated_Text'].apply(lambda x: {entry['label']: entry['score'] for entry in classifier(x)[0]})
# Split the sentiment scores into individual columns
sentiment_df = segments_df['Sentiment_Scores'].apply(pd.Series)
sentiment_df = pd.concat([segments_df, sentiment_df], axis=1)
# Create words DataFrame
words_data = {
'text': word_texts,
'start': word_starts,
'end': word_ends,
'confidence': word_confidences
}
words_df = pd.DataFrame(words_data)
# Round up the start time to the next second
words_df['second'] = words_df['start'].apply(lambda x: int(np.ceil(x)))
# Group words by second, concatenating words that belong to the same second
words_grouped = words_df.groupby('second').agg({
'text': lambda x: ' '.join(x),
'start': 'min',
'end': 'max',
'confidence': 'mean'
}).reset_index()
# Fill in missing seconds
max_second = int(video_clip.duration) # The last second in the video
all_seconds = pd.DataFrame({'second': np.arange(0, max_second + 1)}) # Start from 0 and go to the maximum second
words_grouped = all_seconds.merge(words_grouped, on='second', how='left')
# Fill missing values with blanks or zeros
words_grouped['text'].fillna('', inplace=True)
words_grouped.fillna(0, inplace=True)
# Initialize emotion columns with NaN values
emotion_columns = sentiment_df.columns.difference(['text', 'start', 'end', 'confidence', 'Translated_Text', 'Sentiment_Scores'])
for col in emotion_columns:
words_grouped[col] = np.nan
# For each second, find the corresponding segment and copy its emotion scores
for i, row in words_grouped.iterrows():
matching_segment = sentiment_df[(sentiment_df['start'] <= row['start']) & (sentiment_df['end'] >= row['end'])]
if not matching_segment.empty:
for emotion in emotion_columns:
words_grouped.at[i, emotion] = matching_segment.iloc[0][emotion]
# Replace any NaN values in emotion columns with 0
words_grouped[emotion_columns] = words_grouped[emotion_columns].fillna(0)
# Save DataFrames to XLSX files
segments_xlsx_path = tempfile.NamedTemporaryFile(suffix=".xlsx", delete=False).name
words_xlsx_path = tempfile.NamedTemporaryFile(suffix=".xlsx", delete=False).name
sentiment_df.to_excel(segments_xlsx_path, index=False)
words_grouped.to_excel(words_xlsx_path, index=False)
return words_grouped, sentiment_df, words_xlsx_path, segments_xlsx_path, "Text emotion processing completed successfully!"
except Exception as e:
return None, None, None, None, f"Error during text emotion processing: {str(e)}"
finally:
if temp_audio_path and os.path.exists(temp_audio_path):
os.remove(temp_audio_path)
# Gradio App
def gradio_app():
interface = gr.Blocks()
with interface:
gr.Markdown("## I-MEQ: Emotion Monitoring System")
video_input = gr.Video(label="Upload your video for analysis", height=600)
with gr.Row():
analyze_audio_button = gr.Button("Analyze Audio Emotions")
analyze_fer_button = gr.Button("Analyze Facial Emotions")
analyze_text_button = gr.Button("Transcribe & Analyze Textual Emotions")
with gr.Row():
with gr.Column():
audio_analysis_status = gr.Textbox(label="Audio Emotion Analysis Status")
audio_emotions_dataframe = gr.Dataframe(label="Audio Emotions DataFrame", interactive=False)
audio_emotions_xlsx_download = gr.File(label="Download Audio Emotions XLSX")
with gr.Column():
fer_analysis_status = gr.Textbox(label="Facial Emotion Analysis Status")
fer_emotions_dataframe = gr.Dataframe(label="Facial Emotions DataFrame", interactive=False)
fer_emotions_xlsx_download = gr.File(label="Download Facial Emotions XLSX")
processed_video_download = gr.File(label="Download Processed Video")
with gr.Column():
text_analysis_status = gr.Textbox(label="Text Sentiment Analysis Status")
words_dataframe = gr.Dataframe(label="Words DataFrame", interactive=False)
segments_dataframe = gr.Dataframe(label="Segments DataFrame", interactive=False)
words_xlsx_download = gr.File(label="Download Words XLSX")
segments_xlsx_download = gr.File(label="Download Segments XLSX")
analyze_audio_button.click(
analyze_audio_emotions,
inputs=video_input,
outputs=[
audio_analysis_status,
audio_emotions_dataframe,
audio_emotions_xlsx_download
]
)
analyze_fer_button.click(
detect_faces_and_emotions,
inputs=video_input,
outputs=[
fer_analysis_status,
fer_emotions_dataframe,
fer_emotions_xlsx_download,
processed_video_download
]
)
analyze_text_button.click(
process_video_text,
inputs=video_input,
outputs=[
words_dataframe,
segments_dataframe,
words_xlsx_download,
segments_xlsx_download,
text_analysis_status
]
)
interface.launch()
# Start the Gradio app
gradio_app()