File size: 14,428 Bytes
29bc38c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
import gradio as gr
import pandas as pd
import cv2
import torch
import tempfile
import os
import librosa
from fer import FER
from transformers import AutoModelForAudioClassification, pipeline
from moviepy.editor import VideoFileClip, AudioFileClip
import numpy as np
from torch.nn.functional import softmax
import whisper_timestamped as whisper
from translate import Translator

# Load pre-trained models
audio_model = AutoModelForAudioClassification.from_pretrained("3loi/SER-Odyssey-Baseline-WavLM-Categorical-Attributes", trust_remote_code=True)
face_detector = FER(mtcnn=True)
classifier = pipeline("text-classification", model="j-hartmann/emotion-english-distilroberta-base", top_k=None)

# Set mean and std for audio model
mean = audio_model.config.mean
std = audio_model.config.std

# Function to extract audio from video for audio emotion analysis
def extract_audio_from_video(video_path):
    with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_audio_file:
        video_clip = VideoFileClip(video_path)
        audio_clip = video_clip.audio
        audio_clip.write_audiofile(temp_audio_file.name, codec="pcm_s16le")
        return temp_audio_file.name

# Function to perform audio emotion detection per second
def process_audio_and_detect_emotions(audio_clip):
    audio_np = np.array(audio_clip)
    mask = torch.ones(1, len(audio_np))
    wavs = torch.tensor(audio_np).unsqueeze(0)

    with torch.no_grad():
        pred = audio_model(wavs, mask)
    logits = pred.logits if hasattr(pred, 'logits') else pred[0]
    labels = {0: 'Angry', 1: 'Sad', 2: 'Happy', 3: 'Surprise', 4: 'Fear', 5: 'Disgust', 7: 'Neutral'}
    probabilities = softmax(logits, dim=-1).squeeze(0)[[0, 1, 2, 3, 4, 5, 7]]
    probabilities = probabilities / probabilities.sum()
    df = pd.DataFrame([probabilities.numpy()], columns=labels.values())
    return df

# Function to analyze audio emotions
def analyze_audio_emotions(video_path):
    temp_audio_path = None
    try:
        temp_audio_path = extract_audio_from_video(video_path)
        raw_wav, _ = librosa.load(temp_audio_path, sr=audio_model.config.sampling_rate)
        norm_wav = (raw_wav - mean) / (std + 0.000001)

        times = []
        emotions_dfs = []
        for start_time in range(0, len(norm_wav), audio_model.config.sampling_rate):
            audio_segment = norm_wav[start_time:start_time + audio_model.config.sampling_rate]
            df = process_audio_and_detect_emotions(audio_segment)
            times.append(start_time / audio_model.config.sampling_rate)
            emotions_dfs.append(df)

        emotions_df = pd.concat(emotions_dfs, ignore_index=True)
        emotions_df.insert(0, "Time(s)", times)
        emotion_rename_map = {'Angry': 'anger', 'Sad': 'sadness', 'Happy': 'happy', 'Surprise': 'surprise', 'Fear': 'fear', 'Disgust': 'disgust', 'Neutral': 'neutral'}
        emotions_df.rename(columns=emotion_rename_map, inplace=True)

        emotions_xlsx_path = tempfile.NamedTemporaryFile(suffix=".xlsx", delete=False).name
        emotions_df.to_excel(emotions_xlsx_path, index=False)

        return f"Audio emotion detection completed successfully.", emotions_df, emotions_xlsx_path

    except Exception as e:
        return f"Error during audio emotion detection: {str(e)}", None, None
    finally:
        if temp_audio_path and os.path.exists(temp_audio_path):
            os.remove(temp_audio_path)

# Function to detect facial emotions
def detect_faces_and_emotions(video_path):
    temp_video_path = None
    temp_audio_path = None
    output_video_path = None
    emotions_data = []
    try:
        temp_video = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False)
        temp_video_path = temp_video.name
        temp_audio = tempfile.NamedTemporaryFile(suffix=".mp3", delete=False)
        temp_audio_path = temp_audio.name
        output_xlsx = tempfile.NamedTemporaryFile(suffix=".xlsx", delete=False)
        output_xlsx_path = output_xlsx.name

        original_video = VideoFileClip(video_path)
        original_audio = original_video.audio
        original_audio.write_audiofile(temp_audio_path)

        cap = cv2.VideoCapture(video_path)
        if not cap.isOpened():
            raise Exception("Error: Could not open video file.")

        fps = int(cap.get(cv2.CAP_PROP_FPS))
        frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out = cv2.VideoWriter(temp_video_path, fourcc, fps, (frame_width, frame_height))

        frame_number = 0
        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break
            if frame is None:
                continue

            time_seconds = round(frame_number / fps)
            result = face_detector.detect_emotions(frame)

            for face in result:
                bounding_box = face["box"]
                emotions = face["emotions"]
                emotions["Time(s)"] = time_seconds
                emotions_data.append(emotions)
                cv2.rectangle(frame, (bounding_box[0], bounding_box[1]),
                              (bounding_box[0] + bounding_box[2], bounding_box[1] + bounding_box[3]), (0, 155, 255), 2)
                for index, (emotion_name, score) in enumerate(emotions.items()):
                    color = (211, 211, 211) if score < 0.01 else (255, 0, 0)
                    emotion_score = "{}: {:.2f}".format(emotion_name, score)
                    cv2.putText(frame, emotion_score, (bounding_box[0], bounding_box[1] + bounding_box[3] + 30 + index * 15),
                                cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1, cv2.LINE_AA)

            out.write(frame)
            frame_number += 1

        cap.release()
        out.release()

        emotions_df = pd.DataFrame(emotions_data)
        emotions_df['Time(s)'] = emotions_df['Time(s)'].round().astype(int)
        max_time = emotions_df['Time(s)'].max()
        all_times = pd.DataFrame({'Time(s)': range(max_time + 1)})
        avg_scores = emotions_df.groupby("Time(s)").mean().reset_index()
        df_merged = pd.merge(all_times, avg_scores, on='Time(s)', how='left')
        df_merged.fillna(0, inplace=True)
        df_merged['Time(s)'] = df_merged['Time(s)'].astype(str) + " sec"
        df_merged.to_excel(output_xlsx_path, index=False)

        processed_video = VideoFileClip(temp_video_path)
        audio = AudioFileClip(temp_audio_path)
        final_video = processed_video.set_audio(audio)
        output_video = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False)
        output_video_path = output_video.name
        final_video.write_videofile(output_video_path, codec='libx264')

        return "Face and emotion detection completed successfully.", df_merged, output_xlsx_path, output_video_path

    except Exception as e:
        return f"Error during processing: {str(e)}", None, None, None
    finally:
        if temp_video_path and os.path.exists(temp_video_path):
            os.remove(temp_video_path)
        if temp_audio_path and os.path.exists(temp_audio_path):
            os.remove(temp_audio_path)

# Function to analyze text emotions
def process_video_text(video_path):
    temp_audio_path = None
    try:
        video_clip = VideoFileClip(video_path)
        with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_audio_file:
            temp_audio_path = temp_audio_file.name
            video_clip.audio.write_audiofile(temp_audio_path)

        audio = whisper.load_audio(temp_audio_path)
        model = whisper.load_model("medium", device="cpu")
        result = whisper.transcribe(model, audio)

        # Create lists to store word-level data with timestamps
        word_texts = []
        word_starts = []
        word_ends = []
        word_confidences = []

        for segment in result['segments']:
            for word in segment['words']:
                word_texts.append(word['text'])
                word_starts.append(word['start'])
                word_ends.append(word['end'])
                word_confidences.append(word['confidence'])

        # Create segments DataFrame
        segments_data = [{'text': seg['text'], 'start': seg['start'], 'end': seg['end'], 'confidence': seg['confidence']} for seg in result['segments']]
        segments_df = pd.DataFrame(segments_data)

        # Translate from Korean to English
        translator = Translator(from_lang='ko', to_lang='en')
        segments_df['Translated_Text'] = segments_df['text'].apply(lambda x: translator.translate(x))

        # Apply the sentiment analysis model to the translated text
        segments_df['Sentiment_Scores'] = segments_df['Translated_Text'].apply(lambda x: {entry['label']: entry['score'] for entry in classifier(x)[0]})

        # Split the sentiment scores into individual columns
        sentiment_df = segments_df['Sentiment_Scores'].apply(pd.Series)
        sentiment_df = pd.concat([segments_df, sentiment_df], axis=1)

        # Create words DataFrame
        words_data = {
            'text': word_texts,
            'start': word_starts,
            'end': word_ends,
            'confidence': word_confidences
        }
        words_df = pd.DataFrame(words_data)

        # Round up the start time to the next second
        words_df['second'] = words_df['start'].apply(lambda x: int(np.ceil(x)))

        # Group words by second, concatenating words that belong to the same second
        words_grouped = words_df.groupby('second').agg({
            'text': lambda x: ' '.join(x),
            'start': 'min',
            'end': 'max',
            'confidence': 'mean'
        }).reset_index()

        # Fill in missing seconds
        max_second = int(video_clip.duration)  # The last second in the video
        all_seconds = pd.DataFrame({'second': np.arange(0, max_second + 1)})  # Start from 0 and go to the maximum second
        words_grouped = all_seconds.merge(words_grouped, on='second', how='left')

        # Fill missing values with blanks or zeros
        words_grouped['text'].fillna('', inplace=True)
        words_grouped.fillna(0, inplace=True)

        # Initialize emotion columns with NaN values
        emotion_columns = sentiment_df.columns.difference(['text', 'start', 'end', 'confidence', 'Translated_Text', 'Sentiment_Scores'])
        for col in emotion_columns:
            words_grouped[col] = np.nan

        # For each second, find the corresponding segment and copy its emotion scores
        for i, row in words_grouped.iterrows():
            matching_segment = sentiment_df[(sentiment_df['start'] <= row['start']) & (sentiment_df['end'] >= row['end'])]
            if not matching_segment.empty:
                for emotion in emotion_columns:
                    words_grouped.at[i, emotion] = matching_segment.iloc[0][emotion]

        # Replace any NaN values in emotion columns with 0
        words_grouped[emotion_columns] = words_grouped[emotion_columns].fillna(0)

        # Save DataFrames to XLSX files
        segments_xlsx_path = tempfile.NamedTemporaryFile(suffix=".xlsx", delete=False).name
        words_xlsx_path = tempfile.NamedTemporaryFile(suffix=".xlsx", delete=False).name
        sentiment_df.to_excel(segments_xlsx_path, index=False)
        words_grouped.to_excel(words_xlsx_path, index=False)

        return words_grouped, sentiment_df, words_xlsx_path, segments_xlsx_path, "Text emotion processing completed successfully!"

    except Exception as e:
        return None, None, None, None, f"Error during text emotion processing: {str(e)}"
    finally:
        if temp_audio_path and os.path.exists(temp_audio_path):
            os.remove(temp_audio_path)

# Gradio App
def gradio_app():
    interface = gr.Blocks()

    with interface:
        gr.Markdown("## I-MEQ: Emotion Monitoring System")
        video_input = gr.Video(label="Upload your video for analysis", height=600)

        with gr.Row():
            analyze_audio_button = gr.Button("Analyze Audio Emotions")
            analyze_fer_button = gr.Button("Analyze Facial Emotions")
            analyze_text_button = gr.Button("Transcribe & Analyze Textual Emotions")

        with gr.Row():
            with gr.Column():
                audio_analysis_status = gr.Textbox(label="Audio Emotion Analysis Status")
                audio_emotions_dataframe = gr.Dataframe(label="Audio Emotions DataFrame", interactive=False)
                audio_emotions_xlsx_download = gr.File(label="Download Audio Emotions XLSX")

            with gr.Column():
                fer_analysis_status = gr.Textbox(label="Facial Emotion Analysis Status")
                fer_emotions_dataframe = gr.Dataframe(label="Facial Emotions DataFrame", interactive=False)
                fer_emotions_xlsx_download = gr.File(label="Download Facial Emotions XLSX")
                processed_video_download = gr.File(label="Download Processed Video")

            with gr.Column():
                text_analysis_status = gr.Textbox(label="Text Sentiment Analysis Status")
                words_dataframe = gr.Dataframe(label="Words DataFrame", interactive=False)
                segments_dataframe = gr.Dataframe(label="Segments DataFrame", interactive=False)
                words_xlsx_download = gr.File(label="Download Words XLSX")
                segments_xlsx_download = gr.File(label="Download Segments XLSX")

        analyze_audio_button.click(
            analyze_audio_emotions,
            inputs=video_input,
            outputs=[
                audio_analysis_status,
                audio_emotions_dataframe,
                audio_emotions_xlsx_download
            ]
        )

        analyze_fer_button.click(
            detect_faces_and_emotions,
            inputs=video_input,
            outputs=[
                fer_analysis_status,
                fer_emotions_dataframe,
                fer_emotions_xlsx_download,
                processed_video_download
            ]
        )

        analyze_text_button.click(
            process_video_text,
            inputs=video_input,
            outputs=[
                words_dataframe,
                segments_dataframe,
                words_xlsx_download,
                segments_xlsx_download,
                text_analysis_status
            ]
        )

    interface.launch()

# Start the Gradio app
gradio_app()