File size: 14,428 Bytes
29bc38c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 |
import gradio as gr
import pandas as pd
import cv2
import torch
import tempfile
import os
import librosa
from fer import FER
from transformers import AutoModelForAudioClassification, pipeline
from moviepy.editor import VideoFileClip, AudioFileClip
import numpy as np
from torch.nn.functional import softmax
import whisper_timestamped as whisper
from translate import Translator
# Load pre-trained models
audio_model = AutoModelForAudioClassification.from_pretrained("3loi/SER-Odyssey-Baseline-WavLM-Categorical-Attributes", trust_remote_code=True)
face_detector = FER(mtcnn=True)
classifier = pipeline("text-classification", model="j-hartmann/emotion-english-distilroberta-base", top_k=None)
# Set mean and std for audio model
mean = audio_model.config.mean
std = audio_model.config.std
# Function to extract audio from video for audio emotion analysis
def extract_audio_from_video(video_path):
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_audio_file:
video_clip = VideoFileClip(video_path)
audio_clip = video_clip.audio
audio_clip.write_audiofile(temp_audio_file.name, codec="pcm_s16le")
return temp_audio_file.name
# Function to perform audio emotion detection per second
def process_audio_and_detect_emotions(audio_clip):
audio_np = np.array(audio_clip)
mask = torch.ones(1, len(audio_np))
wavs = torch.tensor(audio_np).unsqueeze(0)
with torch.no_grad():
pred = audio_model(wavs, mask)
logits = pred.logits if hasattr(pred, 'logits') else pred[0]
labels = {0: 'Angry', 1: 'Sad', 2: 'Happy', 3: 'Surprise', 4: 'Fear', 5: 'Disgust', 7: 'Neutral'}
probabilities = softmax(logits, dim=-1).squeeze(0)[[0, 1, 2, 3, 4, 5, 7]]
probabilities = probabilities / probabilities.sum()
df = pd.DataFrame([probabilities.numpy()], columns=labels.values())
return df
# Function to analyze audio emotions
def analyze_audio_emotions(video_path):
temp_audio_path = None
try:
temp_audio_path = extract_audio_from_video(video_path)
raw_wav, _ = librosa.load(temp_audio_path, sr=audio_model.config.sampling_rate)
norm_wav = (raw_wav - mean) / (std + 0.000001)
times = []
emotions_dfs = []
for start_time in range(0, len(norm_wav), audio_model.config.sampling_rate):
audio_segment = norm_wav[start_time:start_time + audio_model.config.sampling_rate]
df = process_audio_and_detect_emotions(audio_segment)
times.append(start_time / audio_model.config.sampling_rate)
emotions_dfs.append(df)
emotions_df = pd.concat(emotions_dfs, ignore_index=True)
emotions_df.insert(0, "Time(s)", times)
emotion_rename_map = {'Angry': 'anger', 'Sad': 'sadness', 'Happy': 'happy', 'Surprise': 'surprise', 'Fear': 'fear', 'Disgust': 'disgust', 'Neutral': 'neutral'}
emotions_df.rename(columns=emotion_rename_map, inplace=True)
emotions_xlsx_path = tempfile.NamedTemporaryFile(suffix=".xlsx", delete=False).name
emotions_df.to_excel(emotions_xlsx_path, index=False)
return f"Audio emotion detection completed successfully.", emotions_df, emotions_xlsx_path
except Exception as e:
return f"Error during audio emotion detection: {str(e)}", None, None
finally:
if temp_audio_path and os.path.exists(temp_audio_path):
os.remove(temp_audio_path)
# Function to detect facial emotions
def detect_faces_and_emotions(video_path):
temp_video_path = None
temp_audio_path = None
output_video_path = None
emotions_data = []
try:
temp_video = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False)
temp_video_path = temp_video.name
temp_audio = tempfile.NamedTemporaryFile(suffix=".mp3", delete=False)
temp_audio_path = temp_audio.name
output_xlsx = tempfile.NamedTemporaryFile(suffix=".xlsx", delete=False)
output_xlsx_path = output_xlsx.name
original_video = VideoFileClip(video_path)
original_audio = original_video.audio
original_audio.write_audiofile(temp_audio_path)
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
raise Exception("Error: Could not open video file.")
fps = int(cap.get(cv2.CAP_PROP_FPS))
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(temp_video_path, fourcc, fps, (frame_width, frame_height))
frame_number = 0
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
if frame is None:
continue
time_seconds = round(frame_number / fps)
result = face_detector.detect_emotions(frame)
for face in result:
bounding_box = face["box"]
emotions = face["emotions"]
emotions["Time(s)"] = time_seconds
emotions_data.append(emotions)
cv2.rectangle(frame, (bounding_box[0], bounding_box[1]),
(bounding_box[0] + bounding_box[2], bounding_box[1] + bounding_box[3]), (0, 155, 255), 2)
for index, (emotion_name, score) in enumerate(emotions.items()):
color = (211, 211, 211) if score < 0.01 else (255, 0, 0)
emotion_score = "{}: {:.2f}".format(emotion_name, score)
cv2.putText(frame, emotion_score, (bounding_box[0], bounding_box[1] + bounding_box[3] + 30 + index * 15),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1, cv2.LINE_AA)
out.write(frame)
frame_number += 1
cap.release()
out.release()
emotions_df = pd.DataFrame(emotions_data)
emotions_df['Time(s)'] = emotions_df['Time(s)'].round().astype(int)
max_time = emotions_df['Time(s)'].max()
all_times = pd.DataFrame({'Time(s)': range(max_time + 1)})
avg_scores = emotions_df.groupby("Time(s)").mean().reset_index()
df_merged = pd.merge(all_times, avg_scores, on='Time(s)', how='left')
df_merged.fillna(0, inplace=True)
df_merged['Time(s)'] = df_merged['Time(s)'].astype(str) + " sec"
df_merged.to_excel(output_xlsx_path, index=False)
processed_video = VideoFileClip(temp_video_path)
audio = AudioFileClip(temp_audio_path)
final_video = processed_video.set_audio(audio)
output_video = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False)
output_video_path = output_video.name
final_video.write_videofile(output_video_path, codec='libx264')
return "Face and emotion detection completed successfully.", df_merged, output_xlsx_path, output_video_path
except Exception as e:
return f"Error during processing: {str(e)}", None, None, None
finally:
if temp_video_path and os.path.exists(temp_video_path):
os.remove(temp_video_path)
if temp_audio_path and os.path.exists(temp_audio_path):
os.remove(temp_audio_path)
# Function to analyze text emotions
def process_video_text(video_path):
temp_audio_path = None
try:
video_clip = VideoFileClip(video_path)
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_audio_file:
temp_audio_path = temp_audio_file.name
video_clip.audio.write_audiofile(temp_audio_path)
audio = whisper.load_audio(temp_audio_path)
model = whisper.load_model("medium", device="cpu")
result = whisper.transcribe(model, audio)
# Create lists to store word-level data with timestamps
word_texts = []
word_starts = []
word_ends = []
word_confidences = []
for segment in result['segments']:
for word in segment['words']:
word_texts.append(word['text'])
word_starts.append(word['start'])
word_ends.append(word['end'])
word_confidences.append(word['confidence'])
# Create segments DataFrame
segments_data = [{'text': seg['text'], 'start': seg['start'], 'end': seg['end'], 'confidence': seg['confidence']} for seg in result['segments']]
segments_df = pd.DataFrame(segments_data)
# Translate from Korean to English
translator = Translator(from_lang='ko', to_lang='en')
segments_df['Translated_Text'] = segments_df['text'].apply(lambda x: translator.translate(x))
# Apply the sentiment analysis model to the translated text
segments_df['Sentiment_Scores'] = segments_df['Translated_Text'].apply(lambda x: {entry['label']: entry['score'] for entry in classifier(x)[0]})
# Split the sentiment scores into individual columns
sentiment_df = segments_df['Sentiment_Scores'].apply(pd.Series)
sentiment_df = pd.concat([segments_df, sentiment_df], axis=1)
# Create words DataFrame
words_data = {
'text': word_texts,
'start': word_starts,
'end': word_ends,
'confidence': word_confidences
}
words_df = pd.DataFrame(words_data)
# Round up the start time to the next second
words_df['second'] = words_df['start'].apply(lambda x: int(np.ceil(x)))
# Group words by second, concatenating words that belong to the same second
words_grouped = words_df.groupby('second').agg({
'text': lambda x: ' '.join(x),
'start': 'min',
'end': 'max',
'confidence': 'mean'
}).reset_index()
# Fill in missing seconds
max_second = int(video_clip.duration) # The last second in the video
all_seconds = pd.DataFrame({'second': np.arange(0, max_second + 1)}) # Start from 0 and go to the maximum second
words_grouped = all_seconds.merge(words_grouped, on='second', how='left')
# Fill missing values with blanks or zeros
words_grouped['text'].fillna('', inplace=True)
words_grouped.fillna(0, inplace=True)
# Initialize emotion columns with NaN values
emotion_columns = sentiment_df.columns.difference(['text', 'start', 'end', 'confidence', 'Translated_Text', 'Sentiment_Scores'])
for col in emotion_columns:
words_grouped[col] = np.nan
# For each second, find the corresponding segment and copy its emotion scores
for i, row in words_grouped.iterrows():
matching_segment = sentiment_df[(sentiment_df['start'] <= row['start']) & (sentiment_df['end'] >= row['end'])]
if not matching_segment.empty:
for emotion in emotion_columns:
words_grouped.at[i, emotion] = matching_segment.iloc[0][emotion]
# Replace any NaN values in emotion columns with 0
words_grouped[emotion_columns] = words_grouped[emotion_columns].fillna(0)
# Save DataFrames to XLSX files
segments_xlsx_path = tempfile.NamedTemporaryFile(suffix=".xlsx", delete=False).name
words_xlsx_path = tempfile.NamedTemporaryFile(suffix=".xlsx", delete=False).name
sentiment_df.to_excel(segments_xlsx_path, index=False)
words_grouped.to_excel(words_xlsx_path, index=False)
return words_grouped, sentiment_df, words_xlsx_path, segments_xlsx_path, "Text emotion processing completed successfully!"
except Exception as e:
return None, None, None, None, f"Error during text emotion processing: {str(e)}"
finally:
if temp_audio_path and os.path.exists(temp_audio_path):
os.remove(temp_audio_path)
# Gradio App
def gradio_app():
interface = gr.Blocks()
with interface:
gr.Markdown("## I-MEQ: Emotion Monitoring System")
video_input = gr.Video(label="Upload your video for analysis", height=600)
with gr.Row():
analyze_audio_button = gr.Button("Analyze Audio Emotions")
analyze_fer_button = gr.Button("Analyze Facial Emotions")
analyze_text_button = gr.Button("Transcribe & Analyze Textual Emotions")
with gr.Row():
with gr.Column():
audio_analysis_status = gr.Textbox(label="Audio Emotion Analysis Status")
audio_emotions_dataframe = gr.Dataframe(label="Audio Emotions DataFrame", interactive=False)
audio_emotions_xlsx_download = gr.File(label="Download Audio Emotions XLSX")
with gr.Column():
fer_analysis_status = gr.Textbox(label="Facial Emotion Analysis Status")
fer_emotions_dataframe = gr.Dataframe(label="Facial Emotions DataFrame", interactive=False)
fer_emotions_xlsx_download = gr.File(label="Download Facial Emotions XLSX")
processed_video_download = gr.File(label="Download Processed Video")
with gr.Column():
text_analysis_status = gr.Textbox(label="Text Sentiment Analysis Status")
words_dataframe = gr.Dataframe(label="Words DataFrame", interactive=False)
segments_dataframe = gr.Dataframe(label="Segments DataFrame", interactive=False)
words_xlsx_download = gr.File(label="Download Words XLSX")
segments_xlsx_download = gr.File(label="Download Segments XLSX")
analyze_audio_button.click(
analyze_audio_emotions,
inputs=video_input,
outputs=[
audio_analysis_status,
audio_emotions_dataframe,
audio_emotions_xlsx_download
]
)
analyze_fer_button.click(
detect_faces_and_emotions,
inputs=video_input,
outputs=[
fer_analysis_status,
fer_emotions_dataframe,
fer_emotions_xlsx_download,
processed_video_download
]
)
analyze_text_button.click(
process_video_text,
inputs=video_input,
outputs=[
words_dataframe,
segments_dataframe,
words_xlsx_download,
segments_xlsx_download,
text_analysis_status
]
)
interface.launch()
# Start the Gradio app
gradio_app() |