Spaces:
Sleeping
Sleeping
import gradio as gr | |
import pandas as pd | |
import cv2 | |
import torch | |
import tempfile | |
import os | |
import librosa | |
from fer import FER | |
from transformers import AutoModelForAudioClassification, pipeline | |
from moviepy.editor import VideoFileClip, AudioFileClip | |
import numpy as np | |
from torch.nn.functional import softmax | |
import whisper_timestamped as whisper | |
from translate import Translator | |
# Load pre-trained models | |
audio_model = AutoModelForAudioClassification.from_pretrained("3loi/SER-Odyssey-Baseline-WavLM-Categorical-Attributes", trust_remote_code=True) | |
face_detector = FER(mtcnn=True) | |
classifier = pipeline("text-classification", model="j-hartmann/emotion-english-distilroberta-base", top_k=None) | |
# Set mean and std for audio model | |
mean = audio_model.config.mean | |
std = audio_model.config.std | |
# Function to extract audio from video for audio emotion analysis | |
def extract_audio_from_video(video_path): | |
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_audio_file: | |
video_clip = VideoFileClip(video_path) | |
audio_clip = video_clip.audio | |
audio_clip.write_audiofile(temp_audio_file.name, codec="pcm_s16le") | |
return temp_audio_file.name | |
# Function to perform audio emotion detection per second | |
def process_audio_and_detect_emotions(audio_clip): | |
audio_np = np.array(audio_clip) | |
mask = torch.ones(1, len(audio_np)) | |
wavs = torch.tensor(audio_np).unsqueeze(0) | |
with torch.no_grad(): | |
pred = audio_model(wavs, mask) | |
logits = pred.logits if hasattr(pred, 'logits') else pred[0] | |
labels = {0: 'Angry', 1: 'Sad', 2: 'Happy', 3: 'Surprise', 4: 'Fear', 5: 'Disgust', 7: 'Neutral'} | |
probabilities = softmax(logits, dim=-1).squeeze(0)[[0, 1, 2, 3, 4, 5, 7]] | |
probabilities = probabilities / probabilities.sum() | |
df = pd.DataFrame([probabilities.numpy()], columns=labels.values()) | |
return df | |
# Function to analyze audio emotions | |
def analyze_audio_emotions(video_path): | |
temp_audio_path = None | |
try: | |
temp_audio_path = extract_audio_from_video(video_path) | |
raw_wav, _ = librosa.load(temp_audio_path, sr=audio_model.config.sampling_rate) | |
norm_wav = (raw_wav - mean) / (std + 0.000001) | |
times = [] | |
emotions_dfs = [] | |
for start_time in range(0, len(norm_wav), audio_model.config.sampling_rate): | |
audio_segment = norm_wav[start_time:start_time + audio_model.config.sampling_rate] | |
df = process_audio_and_detect_emotions(audio_segment) | |
times.append(start_time / audio_model.config.sampling_rate) | |
emotions_dfs.append(df) | |
emotions_df = pd.concat(emotions_dfs, ignore_index=True) | |
emotions_df.insert(0, "Time(s)", times) | |
emotion_rename_map = {'Angry': 'anger', 'Sad': 'sadness', 'Happy': 'happy', 'Surprise': 'surprise', 'Fear': 'fear', 'Disgust': 'disgust', 'Neutral': 'neutral'} | |
emotions_df.rename(columns=emotion_rename_map, inplace=True) | |
emotions_xlsx_path = tempfile.NamedTemporaryFile(suffix=".xlsx", delete=False).name | |
emotions_df.to_excel(emotions_xlsx_path, index=False) | |
return f"Audio emotion detection completed successfully.", emotions_df, emotions_xlsx_path | |
except Exception as e: | |
return f"Error during audio emotion detection: {str(e)}", None, None | |
finally: | |
if temp_audio_path and os.path.exists(temp_audio_path): | |
os.remove(temp_audio_path) | |
# Function to detect facial emotions | |
def detect_faces_and_emotions(video_path): | |
temp_video_path = None | |
temp_audio_path = None | |
output_video_path = None | |
emotions_data = [] | |
try: | |
temp_video = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) | |
temp_video_path = temp_video.name | |
temp_audio = tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) | |
temp_audio_path = temp_audio.name | |
output_xlsx = tempfile.NamedTemporaryFile(suffix=".xlsx", delete=False) | |
output_xlsx_path = output_xlsx.name | |
original_video = VideoFileClip(video_path) | |
original_audio = original_video.audio | |
original_audio.write_audiofile(temp_audio_path) | |
cap = cv2.VideoCapture(video_path) | |
if not cap.isOpened(): | |
raise Exception("Error: Could not open video file.") | |
fps = int(cap.get(cv2.CAP_PROP_FPS)) | |
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
out = cv2.VideoWriter(temp_video_path, fourcc, fps, (frame_width, frame_height)) | |
frame_number = 0 | |
while cap.isOpened(): | |
ret, frame = cap.read() | |
if not ret: | |
break | |
if frame is None: | |
continue | |
time_seconds = round(frame_number / fps) | |
result = face_detector.detect_emotions(frame) | |
for face in result: | |
bounding_box = face["box"] | |
emotions = face["emotions"] | |
emotions["Time(s)"] = time_seconds | |
emotions_data.append(emotions) | |
cv2.rectangle(frame, (bounding_box[0], bounding_box[1]), | |
(bounding_box[0] + bounding_box[2], bounding_box[1] + bounding_box[3]), (0, 155, 255), 2) | |
for index, (emotion_name, score) in enumerate(emotions.items()): | |
color = (211, 211, 211) if score < 0.01 else (255, 0, 0) | |
emotion_score = "{}: {:.2f}".format(emotion_name, score) | |
cv2.putText(frame, emotion_score, (bounding_box[0], bounding_box[1] + bounding_box[3] + 30 + index * 15), | |
cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1, cv2.LINE_AA) | |
out.write(frame) | |
frame_number += 1 | |
cap.release() | |
out.release() | |
emotions_df = pd.DataFrame(emotions_data) | |
emotions_df['Time(s)'] = emotions_df['Time(s)'].round().astype(int) | |
max_time = emotions_df['Time(s)'].max() | |
all_times = pd.DataFrame({'Time(s)': range(max_time + 1)}) | |
avg_scores = emotions_df.groupby("Time(s)").mean().reset_index() | |
df_merged = pd.merge(all_times, avg_scores, on='Time(s)', how='left') | |
df_merged.fillna(0, inplace=True) | |
df_merged['Time(s)'] = df_merged['Time(s)'].astype(str) + " sec" | |
df_merged.to_excel(output_xlsx_path, index=False) | |
processed_video = VideoFileClip(temp_video_path) | |
audio = AudioFileClip(temp_audio_path) | |
final_video = processed_video.set_audio(audio) | |
output_video = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) | |
output_video_path = output_video.name | |
final_video.write_videofile(output_video_path, codec='libx264') | |
return "Face and emotion detection completed successfully.", df_merged, output_xlsx_path, output_video_path | |
except Exception as e: | |
return f"Error during processing: {str(e)}", None, None, None | |
finally: | |
if temp_video_path and os.path.exists(temp_video_path): | |
os.remove(temp_video_path) | |
if temp_audio_path and os.path.exists(temp_audio_path): | |
os.remove(temp_audio_path) | |
# Function to analyze text emotions | |
def process_video_text(video_path): | |
temp_audio_path = None | |
try: | |
video_clip = VideoFileClip(video_path) | |
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_audio_file: | |
temp_audio_path = temp_audio_file.name | |
video_clip.audio.write_audiofile(temp_audio_path) | |
audio = whisper.load_audio(temp_audio_path) | |
model = whisper.load_model("medium", device="cpu") | |
result = whisper.transcribe(model, audio) | |
# Create lists to store word-level data with timestamps | |
word_texts = [] | |
word_starts = [] | |
word_ends = [] | |
word_confidences = [] | |
for segment in result['segments']: | |
for word in segment['words']: | |
word_texts.append(word['text']) | |
word_starts.append(word['start']) | |
word_ends.append(word['end']) | |
word_confidences.append(word['confidence']) | |
# Create segments DataFrame | |
segments_data = [{'text': seg['text'], 'start': seg['start'], 'end': seg['end'], 'confidence': seg['confidence']} for seg in result['segments']] | |
segments_df = pd.DataFrame(segments_data) | |
# Translate from Korean to English | |
translator = Translator(from_lang='ko', to_lang='en') | |
segments_df['Translated_Text'] = segments_df['text'].apply(lambda x: translator.translate(x)) | |
# Apply the sentiment analysis model to the translated text | |
segments_df['Sentiment_Scores'] = segments_df['Translated_Text'].apply(lambda x: {entry['label']: entry['score'] for entry in classifier(x)[0]}) | |
# Split the sentiment scores into individual columns | |
sentiment_df = segments_df['Sentiment_Scores'].apply(pd.Series) | |
sentiment_df = pd.concat([segments_df, sentiment_df], axis=1) | |
# Create words DataFrame | |
words_data = { | |
'text': word_texts, | |
'start': word_starts, | |
'end': word_ends, | |
'confidence': word_confidences | |
} | |
words_df = pd.DataFrame(words_data) | |
# Round up the start time to the next second | |
words_df['second'] = words_df['start'].apply(lambda x: int(np.ceil(x))) | |
# Group words by second, concatenating words that belong to the same second | |
words_grouped = words_df.groupby('second').agg({ | |
'text': lambda x: ' '.join(x), | |
'start': 'min', | |
'end': 'max', | |
'confidence': 'mean' | |
}).reset_index() | |
# Fill in missing seconds | |
max_second = int(video_clip.duration) # The last second in the video | |
all_seconds = pd.DataFrame({'second': np.arange(0, max_second + 1)}) # Start from 0 and go to the maximum second | |
words_grouped = all_seconds.merge(words_grouped, on='second', how='left') | |
# Fill missing values with blanks or zeros | |
words_grouped['text'].fillna('', inplace=True) | |
words_grouped.fillna(0, inplace=True) | |
# Initialize emotion columns with NaN values | |
emotion_columns = sentiment_df.columns.difference(['text', 'start', 'end', 'confidence', 'Translated_Text', 'Sentiment_Scores']) | |
for col in emotion_columns: | |
words_grouped[col] = np.nan | |
# For each second, find the corresponding segment and copy its emotion scores | |
for i, row in words_grouped.iterrows(): | |
matching_segment = sentiment_df[(sentiment_df['start'] <= row['start']) & (sentiment_df['end'] >= row['end'])] | |
if not matching_segment.empty: | |
for emotion in emotion_columns: | |
words_grouped.at[i, emotion] = matching_segment.iloc[0][emotion] | |
# Replace any NaN values in emotion columns with 0 | |
words_grouped[emotion_columns] = words_grouped[emotion_columns].fillna(0) | |
# Save DataFrames to XLSX files | |
segments_xlsx_path = tempfile.NamedTemporaryFile(suffix=".xlsx", delete=False).name | |
words_xlsx_path = tempfile.NamedTemporaryFile(suffix=".xlsx", delete=False).name | |
sentiment_df.to_excel(segments_xlsx_path, index=False) | |
words_grouped.to_excel(words_xlsx_path, index=False) | |
return words_grouped, sentiment_df, words_xlsx_path, segments_xlsx_path, "Text emotion processing completed successfully!" | |
except Exception as e: | |
return None, None, None, None, f"Error during text emotion processing: {str(e)}" | |
finally: | |
if temp_audio_path and os.path.exists(temp_audio_path): | |
os.remove(temp_audio_path) | |
# Gradio App | |
def gradio_app(): | |
interface = gr.Blocks() | |
with interface: | |
gr.Markdown("## I-MEQ: Emotion Monitoring System") | |
video_input = gr.Video(label="Upload your video for analysis", height=600) | |
with gr.Row(): | |
analyze_audio_button = gr.Button("Analyze Audio Emotions") | |
analyze_fer_button = gr.Button("Analyze Facial Emotions") | |
analyze_text_button = gr.Button("Transcribe & Analyze Textual Emotions") | |
with gr.Row(): | |
with gr.Column(): | |
audio_analysis_status = gr.Textbox(label="Audio Emotion Analysis Status") | |
audio_emotions_dataframe = gr.Dataframe(label="Audio Emotions DataFrame", interactive=False) | |
audio_emotions_xlsx_download = gr.File(label="Download Audio Emotions XLSX") | |
with gr.Column(): | |
fer_analysis_status = gr.Textbox(label="Facial Emotion Analysis Status") | |
fer_emotions_dataframe = gr.Dataframe(label="Facial Emotions DataFrame", interactive=False) | |
fer_emotions_xlsx_download = gr.File(label="Download Facial Emotions XLSX") | |
processed_video_download = gr.File(label="Download Processed Video") | |
with gr.Column(): | |
text_analysis_status = gr.Textbox(label="Text Sentiment Analysis Status") | |
words_dataframe = gr.Dataframe(label="Words DataFrame", interactive=False) | |
segments_dataframe = gr.Dataframe(label="Segments DataFrame", interactive=False) | |
words_xlsx_download = gr.File(label="Download Words XLSX") | |
segments_xlsx_download = gr.File(label="Download Segments XLSX") | |
analyze_audio_button.click( | |
analyze_audio_emotions, | |
inputs=video_input, | |
outputs=[ | |
audio_analysis_status, | |
audio_emotions_dataframe, | |
audio_emotions_xlsx_download | |
] | |
) | |
analyze_fer_button.click( | |
detect_faces_and_emotions, | |
inputs=video_input, | |
outputs=[ | |
fer_analysis_status, | |
fer_emotions_dataframe, | |
fer_emotions_xlsx_download, | |
processed_video_download | |
] | |
) | |
analyze_text_button.click( | |
process_video_text, | |
inputs=video_input, | |
outputs=[ | |
words_dataframe, | |
segments_dataframe, | |
words_xlsx_download, | |
segments_xlsx_download, | |
text_analysis_status | |
] | |
) | |
interface.launch() | |
# Start the Gradio app | |
gradio_app() |