AVCER / app /app_utils.py
ElenaRyumina's picture
Summary
47aeb66
"""
File: app_utils.py
Author: Elena Ryumina and Dmitry Ryumin
Description: This module contains utility functions for facial expression recognition application.
License: MIT License
"""
import torch
import numpy as np
import mediapipe as mp
import pandas as pd
from PIL import Image
import cv2
# Importing necessary components for the Gradio app
from app.model import (
pth_model_static,
pth_model_dynamic,
activations,
audio_processor,
audio_model,
device
)
from app.utils import (
convert_mp4_to_mp3,
pad_wav,
pad_wav_zeros,
get_box,
pth_processing,
convert_webm_to_mp4,
get_evenly_spaced_frame_indices,
get_c_expr_db_pred
)
from app.config import DICT_EMO_VIDEO, AV_WEIGHTS, NAME_EMO_AUDIO, DICT_PRED, config_data
from app.plot import display_frame_info, plot_images
from collections import Counter
mp_face_mesh = mp.solutions.face_mesh
class EmotionRecognition:
def __init__(
self,
step=2,
window=4,
sr=16000,
save_path="",
padding="",
):
self.save_path = save_path
self.step = step
self.window = window
self.sr = sr
self.padding = padding
def predict_emotion(self, path, frame_indices, fps):
prob, plt = self.load_audio_features(path, frame_indices, fps)
return prob, plt
def load_audio_features(self, path, frame_indices, fps):
window_a = self.window * self.sr
step_a = int(self.step * self.sr)
wav, audio_plt = convert_mp4_to_mp3(path, frame_indices, fps, self.sr)
probs = []
framess = []
for start_a in range(0, len(wav) + 1, step_a):
end_a = min(start_a + window_a, len(wav))
a_fss_chunk = wav[start_a:end_a]
if self.padding == "mean" or self.padding == "constant":
a_fss = pad_wav_zeros(a_fss_chunk, window_a, mode=self.padding)
elif self.padding == "repeat":
a_fss = pad_wav(a_fss_chunk, window_a)
a_fss = torch.unsqueeze(a_fss, 0)
a_fss = audio_processor(a_fss, sampling_rate=self.sr)
a_fss = a_fss["input_values"][0]
a_fss = torch.from_numpy(a_fss)
with torch.no_grad():
prob = audio_model(a_fss.to(device))
prob = prob.cpu().numpy()
frames = [
str(i).zfill(6) + ".jpg"
for i in range(
round(start_a / self.sr * fps), round(end_a / self.sr * fps + 1)
)
]
probs.extend([prob] * len(frames))
framess.extend(frames)
if len(probs[0]) == 7:
emo_ABAW = NAME_EMO_AUDIO[:-1]
else:
emo_ABAW = NAME_EMO_AUDIO
df = pd.DataFrame(np.array(probs), columns=emo_ABAW)
df["frames"] = framess
return df, audio_plt
def preprocess_audio_and_predict(
path_video="",
save_path="src/pred_results/C-EXPR-DB",
frame_indices=[],
fps=25,
step=0.5,
padding="mean",
window=4,
sr=16000,
):
audio_ER = EmotionRecognition(
step=step,
window=window,
sr=sr,
save_path=save_path,
padding=padding,
)
df_pred, audio_plt = audio_ER.predict_emotion(path_video, frame_indices, fps)
return df_pred, audio_plt
def preprocess_video_and_predict(video):
if video:
if video.split('.')[-1] == 'webm':
video = convert_webm_to_mp4(video)
cap = cv2.VideoCapture(video)
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = np.round(cap.get(cv2.CAP_PROP_FPS))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
frame_indices = get_evenly_spaced_frame_indices(total_frames, 9)
df_probs_audio, audio_plt = preprocess_audio_and_predict(
path_video=video,
frame_indices=frame_indices,
fps=fps,
step=config_data.AUDIO_STEP,
padding="mean",
save_path="",
window=4,
sr=16000,
)
lstm_features = []
count_frame = 1
count_face = 0
probs_dynamic = []
probs_static = []
frames = []
last_output = None
cur_face = None
faces = []
zeros = np.zeros((1, 7))
with torch.no_grad():
with mp_face_mesh.FaceMesh(
max_num_faces=1,
refine_landmarks=False,
min_detection_confidence=0.5,
min_tracking_confidence=0.5) as face_mesh:
while cap.isOpened():
_, frame = cap.read()
if frame is None: break
frame_copy = frame.copy()
frame_copy.flags.writeable = False
frame_copy = cv2.cvtColor(frame_copy, cv2.COLOR_BGR2RGB)
results = face_mesh.process(frame_copy)
frame_copy.flags.writeable = True
if results.multi_face_landmarks:
for fl in results.multi_face_landmarks:
startX, startY, endX, endY = get_box(fl, w, h)
cur_face = frame_copy[startY:endY, startX: endX]
if count_face%config_data.FRAME_DOWNSAMPLING == 0:
cur_face_copy = pth_processing(Image.fromarray(cur_face))
prediction = torch.nn.functional.softmax(pth_model_static(cur_face_copy.to(device)), dim=1)
features = torch.nn.functional.relu(activations['features']).detach().cpu().numpy()
output_s = prediction.clone()
output_s = output_s.detach().cpu().numpy()
if len(lstm_features) == 0:
lstm_features = [features]*10
else:
lstm_features = lstm_features[1:] + [features]
lstm_f = torch.from_numpy(np.vstack(lstm_features))
lstm_f = torch.unsqueeze(lstm_f, 0)
output_d = pth_model_dynamic(lstm_f.to(device)).detach().cpu().numpy()
last_output = output_d
if count_face == 0:
count_face += 1
else:
if last_output is not None:
output_d = last_output
elif last_output is None:
output_d = zeros
probs_static.append(output_s[0])
probs_dynamic.append(output_d[0])
frames.append(count_frame)
else:
lstm_features = []
if last_output is not None:
probs_static.append(probs_static[-1])
probs_dynamic.append(probs_dynamic[-1])
frames.append(count_frame)
elif last_output is None:
probs_static.append(zeros[0])
probs_dynamic.append(zeros[0])
frames.append(count_frame)
if cur_face is not None:
if count_frame-1 in frame_indices:
cur_face = cv2.resize(cur_face, (224,224), interpolation = cv2.INTER_AREA)
cur_face = display_frame_info(cur_face, 'Frame: {}'.format(count_frame), box_scale=.3)
faces.append(cur_face)
count_frame += 1
if count_face != 0:
count_face += 1
img_plt = plot_images(faces)
df_dynamic = pd.DataFrame(
np.array(probs_dynamic), columns=list(DICT_EMO_VIDEO.values())
)
df_static = pd.DataFrame(
np.array(probs_static), columns=list(DICT_EMO_VIDEO.values())
)
df, pred_plt = get_c_expr_db_pred(
stat_df=df_static,
dyn_df=df_dynamic,
audio_df=df_probs_audio,
name_video='',
weights_1=AV_WEIGHTS,
frame_indices=frame_indices,
)
av_pred = df['Audio-visual fusion'].tolist()
states = ['negative', 'neutral', 'positive']
dict_av_pred = Counter(av_pred)
count_states = np.zeros(3)
for k, v in dict_av_pred.items():
if k in [0]:
count_states[1] += v
elif k in [4, 6, 8, 18]:
count_states[2] += v
else:
count_states[0] += v
state_percent = count_states/np.sum(count_states)
# if np.argmax(state_percent) in [0,2]:
# text1 = "The audio-visual model predicts that a person mostly experiences {} ({:.2f}%) emotions. ".format(states[np.argmax(state_percent)], np.max(state_percent)*100)
# else:
text1 = "The audio-visual model predicts that a person is mostly in {} ({:.2f}%) state. ".format(states[np.argmax(state_percent)], np.max(state_percent)*100)
top_three = dict_av_pred.most_common(3)
top_three_text = "Predictions of the three most probable emotions: "
for index, count in top_three:
percentage = (count / np.sum(count_states)) * 100
top_three_text += f"{DICT_PRED[index]} ({percentage:.2f}%), "
top_three_text = top_three_text.rstrip(", ") + "."
df.to_csv(video.split('.')[0] + '.csv', index=False)
return img_plt, audio_plt, pred_plt, text1+top_three_text, video, video.split('.')[0] + '.csv'
else:
return None, None, None, None, None, None