Spaces:
Sleeping
Sleeping
import warnings | |
from functions.models import models_dict | |
warnings.filterwarnings('ignore', category=UserWarning, module='tensorflow') | |
import os | |
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' | |
os.environ['TF_ENABLE_ONEDNN_OPTS'] = '0' | |
import logging | |
logging.getLogger('absl').setLevel(logging.ERROR) | |
from moviepy.editor import VideoFileClip | |
import pandas as pd | |
from tqdm import tqdm | |
import time | |
import json | |
import cv2 | |
import dlib | |
from collections import Counter | |
import statistics | |
import shutil | |
import asyncio | |
import traceback | |
from functions.valence_arousal import va_predict | |
from functions.speech import speech_predict | |
from functions.eye_track import Facetrack, eye_track_predict | |
from functions.fer import extract_face,fer_predict,plot_graph,filter | |
# from app.utils.session import send_analytics, send_individual_analytics_files, send_combined_analytics_files, send_error | |
# from app.utils.socket import ConnectionManager | |
from typing import Callable | |
session_data={} | |
dnn_net=models_dict['face'][0] | |
predictor=models_dict['face'][1] | |
speech_model=models_dict['speech'] | |
valence_dict_path=models_dict['vad'][0] | |
arousal_dict_path=models_dict['vad'][1] | |
dominance_dict_path=models_dict['vad'][2] | |
valence_arousal_model=models_dict['valence_fer'][1] | |
val_ar_feat_model=models_dict['valence_fer'][0] | |
fer_model=models_dict['fer'] | |
def analyze_live_video(video_path: str, uid: str, user_id: str, count: int, final: bool, log: Callable[[str], None]): | |
try: | |
#initilalizing lists | |
global session_data | |
if uid not in session_data: | |
session_data[uid] = { | |
"vcount":[], | |
"duration":[], | |
"eye": [], | |
"fer": [], | |
"valence":[], | |
"arousal":[], | |
"stress":[], | |
"blinks": [], | |
"class_wise_frame_counts": [], | |
"speech_emotions": [], | |
"speech_data":[], | |
"word_weights_list": [] | |
} | |
print(f"UID: {uid}, User ID: {user_id}, Count: {count}, Final: {final}, Video: {video_path}") | |
log(f"Analyzing video for question - {count}") | |
output_dir = os.path.join('output',str(uid)) | |
print(output_dir) | |
if not os.path.exists(output_dir): | |
os.makedirs(output_dir) | |
# Wait for previous files to be written if final | |
if final and count > 1: | |
for i in range(1, count): | |
previous_file_name = os.path.join(output_dir, f"{i}.json") | |
print(previous_file_name) | |
while not os.path.exists(previous_file_name): | |
time.sleep(1) | |
video_clip = VideoFileClip(video_path) | |
video_clip = video_clip.set_fps(30) | |
print("Duration: ", video_clip.duration) | |
session_data[uid]['vcount'].append(count) | |
session_data[uid]['duration'].append(video_clip.duration) | |
fps = video_clip.fps | |
audio = video_clip.audio | |
audio_path = os.path.join(output_dir,'extracted_audio.wav') | |
audio.write_audiofile(audio_path) | |
video_frames = [frame for frame in video_clip.iter_frames()] | |
#Face extraction | |
print("extracting faces") | |
faces=[extract_face(frame,dnn_net,predictor) for frame in tqdm(video_frames)] | |
print(f'{len([face for face in faces if face is not None])} faces found.') | |
##EYE TRACKING | |
fc=Facetrack() | |
log(f"Extracting eye features for question - {count}") | |
eye_preds,blink_durations,total_blinks=eye_track_predict(fc,faces,fps) | |
print(len(eye_preds)) | |
print("total_blinks- ",total_blinks) | |
session_data[uid]['eye'].append(eye_preds) | |
session_data[uid]['blinks'].append(blink_durations) | |
#FACIAL EXPRESSION RECOGNITION | |
log(f"Extracting facial features for question - {count}") | |
fer_emotions,class_wise_frame_count,em_tensors=fer_predict(faces,fps,fer_model) | |
print("face emotions",len(fer_emotions)) | |
session_data[uid]['fer'].append(fer_emotions) | |
session_data[uid]['class_wise_frame_counts'].append(class_wise_frame_count) | |
#VALENCE AROUSAL STRESS | |
valence_list,arousal_list,stress_list=va_predict(valence_arousal_model,val_ar_feat_model,faces,list(em_tensors)) | |
session_data[uid]['valence'].append(valence_list) | |
session_data[uid]['arousal'].append(arousal_list) | |
session_data[uid]['stress'].append(stress_list) | |
log(f"Extracting speech features for question - {count}") | |
emotions,major_emotion,word=speech_predict(audio_path,speech_model,valence_dict_path,arousal_dict_path,dominance_dict_path) | |
session_data[uid]['speech_emotions'].append(emotions) | |
session_data[uid]['word_weights_list'].append(word['word_weights']) | |
session_data[uid]['speech_data'].append([float(word['average_pause_length'] if word and word['average_pause_length'] else 0),float(word['articulation_rate'] if word and word['articulation_rate'] else 0),float(word['speaking_rate'] if word and word['speaking_rate'] else 0)]) | |
log(f"Generating the metadata for question - {count}") | |
# Create Meta Data | |
meta_data={} | |
try: | |
avg_blink_duration= float(sum(blink_durations)/(len(blink_durations))) | |
except: | |
avg_blink_duration=0 | |
meta_data['vcount']=count | |
meta_data['eye_emotion_recognition'] = { | |
"blink_durations": blink_durations, | |
"avg_blink_duration":avg_blink_duration, | |
"total_blinks": total_blinks, | |
"duration":video_clip.duration | |
} | |
meta_data['facial_emotion_recognition'] = { | |
"class_wise_frame_count": class_wise_frame_count, | |
} | |
meta_data['speech_emotion_recognition'] = { | |
'major_emotion':str(major_emotion), | |
'pause_length':float(word['average_pause_length']), | |
'articulation_rate':float(word['articulation_rate']), | |
'speaking_rate':float(word['speaking_rate']), | |
'word_weights':word['word_weights'] | |
} | |
file_path=audio_path | |
if os.path.exists(file_path): | |
os.remove(file_path) | |
print(f"{file_path} deleted") | |
file_path='segment.wav' | |
if os.path.exists(file_path): | |
os.remove(file_path) | |
print(f"{file_path} deleted") | |
print("Individual: ", meta_data) | |
if not final: | |
print("Not final Executing") | |
log(f"Saving analytics for question - {count}") | |
# send_analytics(valence_plot, arousal_plot,{ | |
# "uid": uid, | |
# "user_id": user_id, | |
# "individual": meta_data, | |
# "count": count | |
# }) | |
print("Sent analytics") | |
# send_individual_analytics_files(uid, output_dir, count) | |
dummy_file_path = os.path.join(output_dir, f'{count}.json') | |
print("Writing dummy file: ", dummy_file_path) | |
with open(dummy_file_path, 'w') as dummy_file: | |
json.dump({"status": "completed"}, dummy_file) | |
return | |
# Process combined | |
log(f"Processing gathered data for final output") | |
vcount=session_data[uid]['vcount'] | |
sorted_indices = sorted(range(len(vcount)), key=lambda i: vcount[i]) | |
for key in session_data[uid]: | |
# Only sort lists that are the same length as vcount | |
if len(session_data[uid][key]) == len(vcount): | |
session_data[uid][key] = [session_data[uid][key][i] for i in sorted_indices] | |
videos=len(session_data[uid]['vcount']) | |
#INDIV PLOT SAVING | |
combined_speech=[] | |
combined_valence=[] | |
combined_arousal=[] | |
combined_stress=[] | |
combined_fer=[] | |
combined_eye=[] | |
vid_index=[] | |
combined_speech=[] | |
combined_blinks=[] | |
for i in range(videos): | |
for j in range(len(session_data[uid]['speech_emotions'][i])): | |
vid_index.append(i+1) | |
combined_speech+=session_data[uid]['speech_emotions'][i] | |
timestamps=[i*3 for i in range(len(combined_speech))] | |
df = pd.DataFrame({ | |
'timestamps':timestamps, | |
'video_index':vid_index, | |
'speech_emotion':combined_speech | |
}) | |
df.to_csv(os.path.join(output_dir,'combined_speech.csv'), index=False) | |
vid_index=[] | |
for i in range(videos): | |
timestamps=[j/30 for j in range(len(session_data[uid]['valence'][i]))] | |
for j in range(len(timestamps)): | |
vid_index.append(i+1) | |
folder_path=os.path.join(output_dir,f"{session_data[uid]['vcount'][i]}") | |
os.makedirs(folder_path, exist_ok=True) | |
plot_graph(timestamps,session_data[uid]['valence'][i],'valence',os.path.join(folder_path,'valence.png')) | |
plot_graph(timestamps,session_data[uid]['arousal'][i],'arousal',os.path.join(folder_path,'arousal.png')) | |
plot_graph(timestamps,session_data[uid]['stress'][i],'stress',os.path.join(folder_path,'stress.png')) | |
combined_arousal+=session_data[uid]['arousal'][i] | |
combined_valence+=session_data[uid]['valence'][i] | |
combined_stress+=session_data[uid]['stress'][i] | |
combined_fer+=session_data[uid]['fer'][i] | |
combined_blinks+=session_data[uid]['blinks'][i] | |
# combined_class_wise_frame_count+=session_data[uid]['class_wise_frame_counts'][i] | |
try: | |
max_value=max([x for x in combined_eye if isinstance(x, (int, float))]) | |
except: | |
max_value=0 | |
session_data[uid]['eye'][i]=[x + max_value if isinstance(x, (int, float)) else x for x in session_data[uid]['eye'][i]] | |
combined_eye+=session_data[uid]['eye'][i] | |
timestamps=[i/fps for i in range(len(combined_arousal))] | |
plot_graph(timestamps,combined_valence,'valence',os.path.join(output_dir,'valence.png')) | |
plot_graph(timestamps,combined_arousal,'arousal',os.path.join(output_dir,'arousal.png')) | |
plot_graph(timestamps,combined_stress,'stress',os.path.join(output_dir,'stress.png')) | |
print(len(timestamps),len(vid_index),len(combined_fer),len(combined_valence),len(combined_arousal),len(combined_stress),len(combined_eye)) | |
df = pd.DataFrame({ | |
'timestamps':timestamps, | |
'video_index': vid_index, # Add a column for video index | |
'fer': combined_fer, | |
'valence': combined_valence, | |
'arousal': combined_arousal, | |
'stress': combined_stress, | |
'eye': combined_eye, | |
}) | |
df.to_csv(os.path.join(output_dir,'combined_data.csv'), index=False) | |
#generate metadata for Combined | |
comb_meta_data={} | |
try: | |
avg_blink_duration= float(sum(combined_blinks)/(len(combined_blinks))) | |
except: | |
avg_blink_duration=0 | |
total_blinks=max([x for x in combined_eye if isinstance(x, (int, float))]) | |
comb_meta_data['eye_emotion_recognition'] = { | |
"avg_blink_duration":avg_blink_duration, | |
"total_blinks": total_blinks, | |
} | |
dict_list = session_data[uid]['class_wise_frame_counts'] | |
result = {} | |
for d in dict_list: | |
for key,value in d.items(): | |
result[key]=result.get(key,0)+value | |
comb_meta_data['facial_emotion_recognition'] = { | |
"class_wise_frame_count": result, | |
} | |
combined_weights = Counter() | |
for word_weight in session_data[uid]['word_weights_list']: | |
combined_weights.update(word_weight) | |
combined_weights_dict = dict(combined_weights) | |
print(combined_weights_dict) | |
comb_meta_data['speech_emotion_recognition'] = { | |
'major_emotion':str(major_emotion), | |
'pause_length':statistics.mean([row[0] for row in session_data[uid]['speech_data']]), | |
'articulation_rate':statistics.mean([row[1] for row in session_data[uid]['speech_data']]), | |
'speaking_rate':statistics.mean([row[2] for row in session_data[uid]['speech_data']]), | |
'word_weights':combined_weights_dict | |
} | |
with open(os.path.join(output_dir,'combined.json'), 'w') as json_file: | |
json.dump(comb_meta_data, json_file) | |
log(f"Saving analytics for final output") | |
# send_analytics(valence_plot, arousal_plot,{ | |
# "uid": uid, | |
# "user_id": user_id, | |
# "individual": meta_data, | |
# "combined": combined_meta_data, | |
# "count": count | |
# }) | |
# send_individual_analytics_files(uid, output_dir, count) | |
# send_combined_analytics_files(uid, output_dir) | |
# shutil.rmtree(output_dir) | |
# print(f"Deleted output directory: {output_dir}") | |
except Exception as e: | |
print("Error analyzing video...: ", e) | |
error_trace = traceback.format_exc() | |
print("Error Trace: ", error_trace) | |
log(f"Error analyzing video for question - {count}") | |
# send_error(uid, { | |
# "message": str(e), | |
# "trace": error_trace | |
# }) | |
shutil.rmtree('output') | |
print(f"Deleted output directory: {output_dir}") | |
# st=time.time() | |
# # analyze_live_video(video_path, uid, user_id, count, final, log) | |
# analyze_live_video('videos/s2.webm', 1,1,1,False,print) | |
# analyze_live_video('videos/a4.webm', 1,1,2,True,print) | |
# analyze_live_video('videos/s2.webm', 1,1,2,True,print) | |
# print("time taken - ",time.time()-st) |