Spaces:
Sleeping
Sleeping
""" | |
File: app_utils.py | |
Author: Elena Ryumina and Dmitry Ryumin | |
Description: This module contains utility functions for facial expression recognition application. | |
License: MIT License | |
""" | |
import torch | |
import numpy as np | |
import mediapipe as mp | |
import pandas as pd | |
from PIL import Image | |
import cv2 | |
# Importing necessary components for the Gradio app | |
from app.model import ( | |
pth_model_static, | |
pth_model_dynamic, | |
activations, | |
audio_processor, | |
audio_model, | |
device | |
) | |
from app.utils import ( | |
convert_mp4_to_mp3, | |
pad_wav, | |
pad_wav_zeros, | |
get_box, | |
pth_processing, | |
convert_webm_to_mp4, | |
get_evenly_spaced_frame_indices, | |
get_c_expr_db_pred | |
) | |
from app.config import DICT_EMO_VIDEO, AV_WEIGHTS, NAME_EMO_AUDIO, DICT_PRED, config_data | |
from app.plot import display_frame_info, plot_images | |
from collections import Counter | |
mp_face_mesh = mp.solutions.face_mesh | |
class EmotionRecognition: | |
def __init__( | |
self, | |
step=2, | |
window=4, | |
sr=16000, | |
save_path="", | |
padding="", | |
): | |
self.save_path = save_path | |
self.step = step | |
self.window = window | |
self.sr = sr | |
self.padding = padding | |
def predict_emotion(self, path, frame_indices, fps): | |
prob, plt = self.load_audio_features(path, frame_indices, fps) | |
return prob, plt | |
def load_audio_features(self, path, frame_indices, fps): | |
window_a = self.window * self.sr | |
step_a = int(self.step * self.sr) | |
wav, audio_plt = convert_mp4_to_mp3(path, frame_indices, fps, self.sr) | |
probs = [] | |
framess = [] | |
for start_a in range(0, len(wav) + 1, step_a): | |
end_a = min(start_a + window_a, len(wav)) | |
a_fss_chunk = wav[start_a:end_a] | |
if self.padding == "mean" or self.padding == "constant": | |
a_fss = pad_wav_zeros(a_fss_chunk, window_a, mode=self.padding) | |
elif self.padding == "repeat": | |
a_fss = pad_wav(a_fss_chunk, window_a) | |
a_fss = torch.unsqueeze(a_fss, 0) | |
a_fss = audio_processor(a_fss, sampling_rate=self.sr) | |
a_fss = a_fss["input_values"][0] | |
a_fss = torch.from_numpy(a_fss) | |
with torch.no_grad(): | |
prob = audio_model(a_fss.to(device)) | |
prob = prob.cpu().numpy() | |
frames = [ | |
str(i).zfill(6) + ".jpg" | |
for i in range( | |
round(start_a / self.sr * fps), round(end_a / self.sr * fps + 1) | |
) | |
] | |
probs.extend([prob] * len(frames)) | |
framess.extend(frames) | |
if len(probs[0]) == 7: | |
emo_ABAW = NAME_EMO_AUDIO[:-1] | |
else: | |
emo_ABAW = NAME_EMO_AUDIO | |
df = pd.DataFrame(np.array(probs), columns=emo_ABAW) | |
df["frames"] = framess | |
return df, audio_plt | |
def preprocess_audio_and_predict( | |
path_video="", | |
save_path="src/pred_results/C-EXPR-DB", | |
frame_indices=[], | |
fps=25, | |
step=0.5, | |
padding="mean", | |
window=4, | |
sr=16000, | |
): | |
audio_ER = EmotionRecognition( | |
step=step, | |
window=window, | |
sr=sr, | |
save_path=save_path, | |
padding=padding, | |
) | |
df_pred, audio_plt = audio_ER.predict_emotion(path_video, frame_indices, fps) | |
return df_pred, audio_plt | |
def preprocess_video_and_predict(video): | |
if video: | |
if video.split('.')[-1] == 'webm': | |
video = convert_webm_to_mp4(video) | |
cap = cv2.VideoCapture(video) | |
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
fps = np.round(cap.get(cv2.CAP_PROP_FPS)) | |
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
frame_indices = get_evenly_spaced_frame_indices(total_frames, 9) | |
df_probs_audio, audio_plt = preprocess_audio_and_predict( | |
path_video=video, | |
frame_indices=frame_indices, | |
fps=fps, | |
step=config_data.AUDIO_STEP, | |
padding="mean", | |
save_path="", | |
window=4, | |
sr=16000, | |
) | |
lstm_features = [] | |
count_frame = 1 | |
count_face = 0 | |
probs_dynamic = [] | |
probs_static = [] | |
frames = [] | |
last_output = None | |
cur_face = None | |
faces = [] | |
zeros = np.zeros((1, 7)) | |
with torch.no_grad(): | |
with mp_face_mesh.FaceMesh( | |
max_num_faces=1, | |
refine_landmarks=False, | |
min_detection_confidence=0.5, | |
min_tracking_confidence=0.5) as face_mesh: | |
while cap.isOpened(): | |
_, frame = cap.read() | |
if frame is None: break | |
frame_copy = frame.copy() | |
frame_copy.flags.writeable = False | |
frame_copy = cv2.cvtColor(frame_copy, cv2.COLOR_BGR2RGB) | |
results = face_mesh.process(frame_copy) | |
frame_copy.flags.writeable = True | |
if results.multi_face_landmarks: | |
for fl in results.multi_face_landmarks: | |
startX, startY, endX, endY = get_box(fl, w, h) | |
cur_face = frame_copy[startY:endY, startX: endX] | |
if count_face%config_data.FRAME_DOWNSAMPLING == 0: | |
cur_face_copy = pth_processing(Image.fromarray(cur_face)) | |
prediction = torch.nn.functional.softmax(pth_model_static(cur_face_copy.to(device)), dim=1) | |
features = torch.nn.functional.relu(activations['features']).detach().cpu().numpy() | |
output_s = prediction.clone() | |
output_s = output_s.detach().cpu().numpy() | |
if len(lstm_features) == 0: | |
lstm_features = [features]*10 | |
else: | |
lstm_features = lstm_features[1:] + [features] | |
lstm_f = torch.from_numpy(np.vstack(lstm_features)) | |
lstm_f = torch.unsqueeze(lstm_f, 0) | |
output_d = pth_model_dynamic(lstm_f.to(device)).detach().cpu().numpy() | |
last_output = output_d | |
if count_face == 0: | |
count_face += 1 | |
else: | |
if last_output is not None: | |
output_d = last_output | |
elif last_output is None: | |
output_d = zeros | |
probs_static.append(output_s[0]) | |
probs_dynamic.append(output_d[0]) | |
frames.append(count_frame) | |
else: | |
lstm_features = [] | |
if last_output is not None: | |
probs_static.append(probs_static[-1]) | |
probs_dynamic.append(probs_dynamic[-1]) | |
frames.append(count_frame) | |
elif last_output is None: | |
probs_static.append(zeros[0]) | |
probs_dynamic.append(zeros[0]) | |
frames.append(count_frame) | |
if cur_face is not None: | |
if count_frame-1 in frame_indices: | |
cur_face = cv2.resize(cur_face, (224,224), interpolation = cv2.INTER_AREA) | |
cur_face = display_frame_info(cur_face, 'Frame: {}'.format(count_frame), box_scale=.3) | |
faces.append(cur_face) | |
count_frame += 1 | |
if count_face != 0: | |
count_face += 1 | |
img_plt = plot_images(faces) | |
df_dynamic = pd.DataFrame( | |
np.array(probs_dynamic), columns=list(DICT_EMO_VIDEO.values()) | |
) | |
df_static = pd.DataFrame( | |
np.array(probs_static), columns=list(DICT_EMO_VIDEO.values()) | |
) | |
df, pred_plt = get_c_expr_db_pred( | |
stat_df=df_static, | |
dyn_df=df_dynamic, | |
audio_df=df_probs_audio, | |
name_video='', | |
weights_1=AV_WEIGHTS, | |
frame_indices=frame_indices, | |
) | |
av_pred = df['Audio-visual fusion'].tolist() | |
states = ['negative', 'neutral', 'positive'] | |
dict_av_pred = Counter(av_pred) | |
count_states = np.zeros(3) | |
for k, v in dict_av_pred.items(): | |
if k in [0]: | |
count_states[1] += v | |
elif k in [4, 6, 8, 18]: | |
count_states[2] += v | |
else: | |
count_states[0] += v | |
state_percent = count_states/np.sum(count_states) | |
# if np.argmax(state_percent) in [0,2]: | |
# text1 = "The audio-visual model predicts that a person mostly experiences {} ({:.2f}%) emotions. ".format(states[np.argmax(state_percent)], np.max(state_percent)*100) | |
# else: | |
text1 = "The audio-visual model predicts that a person is mostly in {} ({:.2f}%) state. ".format(states[np.argmax(state_percent)], np.max(state_percent)*100) | |
top_three = dict_av_pred.most_common(3) | |
top_three_text = "Predictions of the three most probable emotions: " | |
for index, count in top_three: | |
percentage = (count / np.sum(count_states)) * 100 | |
top_three_text += f"{DICT_PRED[index]} ({percentage:.2f}%), " | |
top_three_text = top_three_text.rstrip(", ") + "." | |
df.to_csv(video.split('.')[0] + '.csv', index=False) | |
return img_plt, audio_plt, pred_plt, text1+top_three_text, video, video.split('.')[0] + '.csv' | |
else: | |
return None, None, None, None, None, None |