Spaces:
Sleeping
Sleeping
""" | |
File: face_utils.py | |
Author: Elena Ryumina and Dmitry Ryumin | |
Description: This module contains utility functions related to facial landmarks and image processing. | |
License: MIT License | |
""" | |
import numpy as np | |
import pandas as pd | |
import math | |
import subprocess | |
import torchaudio | |
import torch | |
import os | |
from PIL import Image | |
from torchvision import transforms | |
# Importing necessary components for the Gradio app | |
from app.config import NAME_EMO_AUDIO, DICT_CE, config_data | |
from app.plot import plot_compound_expression_prediction, plot_audio | |
def norm_coordinates(normalized_x, normalized_y, image_width, image_height): | |
x_px = min(math.floor(normalized_x * image_width), image_width - 1) | |
y_px = min(math.floor(normalized_y * image_height), image_height - 1) | |
return x_px, y_px | |
def get_box(fl, w, h): | |
idx_to_coors = {} | |
for idx, landmark in enumerate(fl.landmark): | |
landmark_px = norm_coordinates(landmark.x, landmark.y, w, h) | |
if landmark_px: | |
idx_to_coors[idx] = landmark_px | |
x_min = np.min(np.asarray(list(idx_to_coors.values()))[:, 0]) | |
y_min = np.min(np.asarray(list(idx_to_coors.values()))[:, 1]) | |
endX = np.max(np.asarray(list(idx_to_coors.values()))[:, 0]) | |
endY = np.max(np.asarray(list(idx_to_coors.values()))[:, 1]) | |
(startX, startY) = (max(0, x_min), max(0, y_min)) | |
(endX, endY) = (min(w - 1, endX), min(h - 1, endY)) | |
return startX, startY, endX, endY | |
def pth_processing(fp): | |
class PreprocessInput(torch.nn.Module): | |
def init(self): | |
super(PreprocessInput, self).init() | |
def forward(self, x): | |
x = x.to(torch.float32) | |
x = torch.flip(x, dims=(0,)) | |
x[0, :, :] -= 91.4953 | |
x[1, :, :] -= 103.8827 | |
x[2, :, :] -= 131.0912 | |
return x | |
def get_img_torch(img, target_size=(224, 224)): | |
transform = transforms.Compose([transforms.PILToTensor(), PreprocessInput()]) | |
img = img.resize(target_size, Image.Resampling.NEAREST) | |
img = transform(img) | |
img = torch.unsqueeze(img, 0) | |
return img | |
return get_img_torch(fp) | |
def convert_webm_to_mp4(input_file): | |
path_save = input_file.split('.')[0] + ".mp4" | |
if not os.path.exists(path_save): | |
ff_video = "ffmpeg -i {} -c:v copy -c:a aac -strict experimental {}".format( | |
input_file, path_save | |
) | |
subprocess.call(ff_video, shell=True) | |
return path_save | |
def convert_mp4_to_mp3(path, frame_indices, fps, sampling_rate=16000): | |
path_save = path.split('.')[0] + ".wav" | |
if not os.path.exists(path_save): | |
ff_audio = "ffmpeg -i {} -vn -acodec pcm_s16le -ar 44100 -ac 2 {}".format( | |
path, path_save | |
) | |
subprocess.call(ff_audio, shell=True) | |
wav, sr = torchaudio.load(path_save) | |
num_frames = wav.numpy().shape[1] | |
time_axis = [i / sr for i in range(num_frames)] | |
plt = plot_audio(time_axis, wav, frame_indices, fps, (12, 2)) | |
if wav.size(0) > 1: | |
wav = wav.mean(dim=0, keepdim=True) | |
if sr != sampling_rate: | |
transform = torchaudio.transforms.Resample(orig_freq=sr, new_freq=sampling_rate) | |
wav = transform(wav) | |
sr = sampling_rate | |
assert sr == sampling_rate | |
return wav.squeeze(0), plt | |
def pad_wav(wav, max_length): | |
current_length = len(wav) | |
if current_length < max_length: | |
repetitions = (max_length + current_length - 1) // current_length | |
wav = torch.cat([wav] * repetitions, dim=0)[:max_length] | |
elif current_length > max_length: | |
wav = wav[:max_length] | |
return wav | |
def pad_wav_zeros(wav, max_length, mode="constant"): | |
if mode == "mean": | |
wav = torch.nn.functional.pad( | |
wav, | |
(0, max(0, max_length - wav.shape[0])), | |
mode="constant", | |
value=torch.mean(wav), | |
) | |
else: | |
wav = torch.nn.functional.pad( | |
wav, (0, max(0, max_length - wav.shape[0])), mode=mode | |
) | |
return wav | |
def softmax(matrix): | |
exp_matrix = np.exp(matrix - np.max(matrix, axis=1, keepdims=True)) | |
return exp_matrix / np.sum(exp_matrix, axis=1, keepdims=True) | |
def get_compound_expression(pred, com_emo): | |
pred = np.asarray(pred) | |
prob = np.zeros((len(pred), len(com_emo))) | |
for idx, (_, v) in enumerate(com_emo.items()): | |
idx_1 = v[0] | |
idx_2 = v[1] | |
prob[:, idx] = pred[:, idx_1] + pred[:, idx_2] | |
return prob | |
def get_image_location(curr_video, frame): | |
frame = int(frame.split(".")[0]) + 1 | |
frame = str(frame).zfill(5) + ".jpg" | |
return f"{curr_video}/{frame}" | |
def save_txt(column_names, file_names, labels, save_name): | |
data_lines = [",".join(column_names)] | |
for file_name, label in zip(file_names, labels): | |
data_lines.append(f"{file_name},{label}") | |
with open(save_name, "w") as file: | |
for line in data_lines: | |
file.write(line + "\n") | |
def get_mix_pred(emo_pred, ce_prob): | |
pred = [] | |
for idx, curr_pred in enumerate(emo_pred): | |
if np.max(curr_pred) > config_data.CONFIDENCE_BE: | |
pred.append(np.argmax(curr_pred)) | |
else: | |
pred.append(ce_prob[idx]+6) | |
return pred | |
def get_c_expr_db_pred( | |
stat_df: pd.DataFrame, | |
dyn_df: pd.DataFrame, | |
audio_df: pd.DataFrame, | |
name_video: str, | |
weights_1: list[float], | |
frame_indices: list[int], | |
) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, list[str]]: | |
""" | |
Predict compound expressions using audio-visual emotional probabilities, optimized weights, and rules. | |
Args: | |
stat_df (pd.DataFrame): DataFrame containing static visual probabilities. | |
dyn_df (pd.DataFrame): DataFrame containing dynamic visual probabilities. | |
audio_df (pd.DataFrame): DataFrame containing audio probabilities. | |
name_video (str): Name of the video. | |
weights_1 (List[float]): List of weights for the Dirichlet-based fusion. | |
Returns: | |
Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, List[str]]: Predictions for compound expressions, | |
and list of image locations. | |
""" | |
stat_df["image_location"] = [ | |
f"{name_video}/{str(f+1).zfill(5)}.jpg" for f in stat_df.index | |
] | |
dyn_df["image_location"] = [ | |
f"{name_video}/{str(f+1).zfill(5)}.jpg" for f in dyn_df.index | |
] | |
image_location = dyn_df.image_location.tolist() | |
stat_df = stat_df[stat_df.image_location.isin(image_location)][NAME_EMO_AUDIO[:-1]].values | |
dyn_df = softmax( | |
dyn_df[dyn_df.image_location.isin(image_location)][NAME_EMO_AUDIO[:-1]].values | |
) | |
audio_df = audio_df.groupby(["frames"]).mean().reset_index() | |
audio_df = audio_df.rename(columns={"frames": "image_location"}) | |
audio_df["image_location"] = [ | |
get_image_location(name_video, i) for i in audio_df.image_location | |
] | |
audio_df = softmax( | |
audio_df[audio_df.image_location.isin(image_location)][NAME_EMO_AUDIO[:-1]].values | |
) | |
if len(image_location) > len(audio_df): | |
last_pred_audio = audio_df[-1] | |
audio_df = np.vstack( | |
(audio_df, [last_pred_audio] * (len(image_location) - len(audio_df))) | |
) | |
predictions = [stat_df, dyn_df, audio_df] | |
num_predictions = len(predictions) | |
if weights_1: | |
final_predictions = predictions[0] * weights_1[0] | |
for i in range(1, num_predictions): | |
final_predictions += predictions[i] * weights_1[i] | |
else: | |
final_predictions = np.sum(predictions, axis=0) / num_predictions | |
av_prob = np.argmax(get_compound_expression( | |
final_predictions, DICT_CE, | |
), axis=1) | |
vs_prob = get_compound_expression( | |
predictions[0], DICT_CE) | |
vd_prob = get_compound_expression( | |
predictions[1], DICT_CE) | |
a_prob = get_compound_expression( | |
predictions[2], DICT_CE) | |
av_pred = get_mix_pred(final_predictions, av_prob) | |
vs_pred = get_mix_pred(predictions[0], np.argmax(vs_prob, axis=1)) | |
vd_pred = get_mix_pred(predictions[1], np.argmax(vd_prob, axis=1)) | |
a_pred = get_mix_pred(predictions[2], np.argmax(a_prob, axis=1)) | |
dict_pred_final = {'Audio-visual fusion':av_pred, 'Static visual model':vs_pred,'Dynamic visual model':vd_pred,'Audio model':a_pred} | |
plt = plot_compound_expression_prediction( | |
dict_preds = dict_pred_final, | |
save_path = None, | |
frame_indices = frame_indices, | |
title = "Basic emotion and compound expression predictions") | |
df = pd.DataFrame(dict_pred_final) | |
return df, plt | |
def get_evenly_spaced_frame_indices(total_frames, num_frames=10): | |
if total_frames <= num_frames: | |
return list(range(total_frames)) | |
step = total_frames / num_frames | |
return [int(np.round(i * step)) for i in range(num_frames)] |