AVCER / app /utils.py
ElenaRyumina's picture
Summary
47aeb66
"""
File: face_utils.py
Author: Elena Ryumina and Dmitry Ryumin
Description: This module contains utility functions related to facial landmarks and image processing.
License: MIT License
"""
import numpy as np
import pandas as pd
import math
import subprocess
import torchaudio
import torch
import os
from PIL import Image
from torchvision import transforms
# Importing necessary components for the Gradio app
from app.config import NAME_EMO_AUDIO, DICT_CE, config_data
from app.plot import plot_compound_expression_prediction, plot_audio
def norm_coordinates(normalized_x, normalized_y, image_width, image_height):
x_px = min(math.floor(normalized_x * image_width), image_width - 1)
y_px = min(math.floor(normalized_y * image_height), image_height - 1)
return x_px, y_px
def get_box(fl, w, h):
idx_to_coors = {}
for idx, landmark in enumerate(fl.landmark):
landmark_px = norm_coordinates(landmark.x, landmark.y, w, h)
if landmark_px:
idx_to_coors[idx] = landmark_px
x_min = np.min(np.asarray(list(idx_to_coors.values()))[:, 0])
y_min = np.min(np.asarray(list(idx_to_coors.values()))[:, 1])
endX = np.max(np.asarray(list(idx_to_coors.values()))[:, 0])
endY = np.max(np.asarray(list(idx_to_coors.values()))[:, 1])
(startX, startY) = (max(0, x_min), max(0, y_min))
(endX, endY) = (min(w - 1, endX), min(h - 1, endY))
return startX, startY, endX, endY
def pth_processing(fp):
class PreprocessInput(torch.nn.Module):
def init(self):
super(PreprocessInput, self).init()
def forward(self, x):
x = x.to(torch.float32)
x = torch.flip(x, dims=(0,))
x[0, :, :] -= 91.4953
x[1, :, :] -= 103.8827
x[2, :, :] -= 131.0912
return x
def get_img_torch(img, target_size=(224, 224)):
transform = transforms.Compose([transforms.PILToTensor(), PreprocessInput()])
img = img.resize(target_size, Image.Resampling.NEAREST)
img = transform(img)
img = torch.unsqueeze(img, 0)
return img
return get_img_torch(fp)
def convert_webm_to_mp4(input_file):
path_save = input_file.split('.')[0] + ".mp4"
if not os.path.exists(path_save):
ff_video = "ffmpeg -i {} -c:v copy -c:a aac -strict experimental {}".format(
input_file, path_save
)
subprocess.call(ff_video, shell=True)
return path_save
def convert_mp4_to_mp3(path, frame_indices, fps, sampling_rate=16000):
path_save = path.split('.')[0] + ".wav"
if not os.path.exists(path_save):
ff_audio = "ffmpeg -i {} -vn -acodec pcm_s16le -ar 44100 -ac 2 {}".format(
path, path_save
)
subprocess.call(ff_audio, shell=True)
wav, sr = torchaudio.load(path_save)
num_frames = wav.numpy().shape[1]
time_axis = [i / sr for i in range(num_frames)]
plt = plot_audio(time_axis, wav, frame_indices, fps, (12, 2))
if wav.size(0) > 1:
wav = wav.mean(dim=0, keepdim=True)
if sr != sampling_rate:
transform = torchaudio.transforms.Resample(orig_freq=sr, new_freq=sampling_rate)
wav = transform(wav)
sr = sampling_rate
assert sr == sampling_rate
return wav.squeeze(0), plt
def pad_wav(wav, max_length):
current_length = len(wav)
if current_length < max_length:
repetitions = (max_length + current_length - 1) // current_length
wav = torch.cat([wav] * repetitions, dim=0)[:max_length]
elif current_length > max_length:
wav = wav[:max_length]
return wav
def pad_wav_zeros(wav, max_length, mode="constant"):
if mode == "mean":
wav = torch.nn.functional.pad(
wav,
(0, max(0, max_length - wav.shape[0])),
mode="constant",
value=torch.mean(wav),
)
else:
wav = torch.nn.functional.pad(
wav, (0, max(0, max_length - wav.shape[0])), mode=mode
)
return wav
def softmax(matrix):
exp_matrix = np.exp(matrix - np.max(matrix, axis=1, keepdims=True))
return exp_matrix / np.sum(exp_matrix, axis=1, keepdims=True)
def get_compound_expression(pred, com_emo):
pred = np.asarray(pred)
prob = np.zeros((len(pred), len(com_emo)))
for idx, (_, v) in enumerate(com_emo.items()):
idx_1 = v[0]
idx_2 = v[1]
prob[:, idx] = pred[:, idx_1] + pred[:, idx_2]
return prob
def get_image_location(curr_video, frame):
frame = int(frame.split(".")[0]) + 1
frame = str(frame).zfill(5) + ".jpg"
return f"{curr_video}/{frame}"
def save_txt(column_names, file_names, labels, save_name):
data_lines = [",".join(column_names)]
for file_name, label in zip(file_names, labels):
data_lines.append(f"{file_name},{label}")
with open(save_name, "w") as file:
for line in data_lines:
file.write(line + "\n")
def get_mix_pred(emo_pred, ce_prob):
pred = []
for idx, curr_pred in enumerate(emo_pred):
if np.max(curr_pred) > config_data.CONFIDENCE_BE:
pred.append(np.argmax(curr_pred))
else:
pred.append(ce_prob[idx]+6)
return pred
def get_c_expr_db_pred(
stat_df: pd.DataFrame,
dyn_df: pd.DataFrame,
audio_df: pd.DataFrame,
name_video: str,
weights_1: list[float],
frame_indices: list[int],
) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, list[str]]:
"""
Predict compound expressions using audio-visual emotional probabilities, optimized weights, and rules.
Args:
stat_df (pd.DataFrame): DataFrame containing static visual probabilities.
dyn_df (pd.DataFrame): DataFrame containing dynamic visual probabilities.
audio_df (pd.DataFrame): DataFrame containing audio probabilities.
name_video (str): Name of the video.
weights_1 (List[float]): List of weights for the Dirichlet-based fusion.
Returns:
Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, List[str]]: Predictions for compound expressions,
and list of image locations.
"""
stat_df["image_location"] = [
f"{name_video}/{str(f+1).zfill(5)}.jpg" for f in stat_df.index
]
dyn_df["image_location"] = [
f"{name_video}/{str(f+1).zfill(5)}.jpg" for f in dyn_df.index
]
image_location = dyn_df.image_location.tolist()
stat_df = stat_df[stat_df.image_location.isin(image_location)][NAME_EMO_AUDIO[:-1]].values
dyn_df = softmax(
dyn_df[dyn_df.image_location.isin(image_location)][NAME_EMO_AUDIO[:-1]].values
)
audio_df = audio_df.groupby(["frames"]).mean().reset_index()
audio_df = audio_df.rename(columns={"frames": "image_location"})
audio_df["image_location"] = [
get_image_location(name_video, i) for i in audio_df.image_location
]
audio_df = softmax(
audio_df[audio_df.image_location.isin(image_location)][NAME_EMO_AUDIO[:-1]].values
)
if len(image_location) > len(audio_df):
last_pred_audio = audio_df[-1]
audio_df = np.vstack(
(audio_df, [last_pred_audio] * (len(image_location) - len(audio_df)))
)
predictions = [stat_df, dyn_df, audio_df]
num_predictions = len(predictions)
if weights_1:
final_predictions = predictions[0] * weights_1[0]
for i in range(1, num_predictions):
final_predictions += predictions[i] * weights_1[i]
else:
final_predictions = np.sum(predictions, axis=0) / num_predictions
av_prob = np.argmax(get_compound_expression(
final_predictions, DICT_CE,
), axis=1)
vs_prob = get_compound_expression(
predictions[0], DICT_CE)
vd_prob = get_compound_expression(
predictions[1], DICT_CE)
a_prob = get_compound_expression(
predictions[2], DICT_CE)
av_pred = get_mix_pred(final_predictions, av_prob)
vs_pred = get_mix_pred(predictions[0], np.argmax(vs_prob, axis=1))
vd_pred = get_mix_pred(predictions[1], np.argmax(vd_prob, axis=1))
a_pred = get_mix_pred(predictions[2], np.argmax(a_prob, axis=1))
dict_pred_final = {'Audio-visual fusion':av_pred, 'Static visual model':vs_pred,'Dynamic visual model':vd_pred,'Audio model':a_pred}
plt = plot_compound_expression_prediction(
dict_preds = dict_pred_final,
save_path = None,
frame_indices = frame_indices,
title = "Basic emotion and compound expression predictions")
df = pd.DataFrame(dict_pred_final)
return df, plt
def get_evenly_spaced_frame_indices(total_frames, num_frames=10):
if total_frames <= num_frames:
return list(range(total_frames))
step = total_frames / num_frames
return [int(np.round(i * step)) for i in range(num_frames)]