|
import argparse |
|
import os |
|
import cv2 |
|
import glob |
|
import numpy as np |
|
import torch |
|
from tqdm import tqdm |
|
from easydict import EasyDict as edict |
|
|
|
from vbench.utils import load_dimension_info |
|
|
|
from vbench.third_party.RAFT.core.raft import RAFT |
|
from vbench.third_party.RAFT.core.utils_core.utils import InputPadder |
|
|
|
class DynamicDegree: |
|
def __init__(self, args, device): |
|
self.args = args |
|
self.device = device |
|
self.load_model() |
|
|
|
|
|
def load_model(self): |
|
self.model = torch.nn.DataParallel(RAFT(self.args)) |
|
self.model.load_state_dict(torch.load(self.args.model)) |
|
|
|
self.model = self.model.module |
|
self.model.to(self.device) |
|
self.model.eval() |
|
|
|
|
|
|
|
def get_score(self, img, flo): |
|
img = img[0].permute(1,2,0).cpu().numpy() |
|
flo = flo[0].permute(1,2,0).cpu().numpy() |
|
|
|
u = flo[:,:,0] |
|
v = flo[:,:,1] |
|
rad = np.sqrt(np.square(u) + np.square(v)) |
|
|
|
h, w = rad.shape |
|
rad_flat = rad.flatten() |
|
cut_index = int(h*w*0.05) |
|
|
|
max_rad = np.mean(abs(np.sort(-rad_flat))[:cut_index]) |
|
|
|
return max_rad.item() |
|
|
|
|
|
def set_params(self, frame, count): |
|
scale = min(list(frame.shape)[-2:]) |
|
self.params = {"thres":6.0*(scale/256.0), "count_num":round(4*(count/16.0))} |
|
|
|
|
|
def infer(self, video_path): |
|
with torch.no_grad(): |
|
if video_path.endswith('.mp4'): |
|
frames = self.get_frames(video_path) |
|
elif os.path.isdir(video_path): |
|
frames = self.get_frames_from_img_folder(video_path) |
|
else: |
|
raise NotImplementedError |
|
self.set_params(frame=frames[0], count=len(frames)) |
|
static_score = [] |
|
for image1, image2 in zip(frames[:-1], frames[1:]): |
|
padder = InputPadder(image1.shape) |
|
image1, image2 = padder.pad(image1, image2) |
|
_, flow_up = self.model(image1, image2, iters=20, test_mode=True) |
|
max_rad = self.get_score(image1, flow_up) |
|
static_score.append(max_rad) |
|
whether_move = self.check_move(static_score) |
|
return whether_move |
|
|
|
|
|
def check_move(self, score_list): |
|
thres = self.params["thres"] |
|
count_num = self.params["count_num"] |
|
count = 0 |
|
for score in score_list: |
|
if score > thres: |
|
count += 1 |
|
if count >= count_num: |
|
return True |
|
return False |
|
|
|
|
|
def get_frames(self, video_path): |
|
frame_list = [] |
|
video = cv2.VideoCapture(video_path) |
|
fps = video.get(cv2.CAP_PROP_FPS) |
|
interval = round(fps/8) |
|
while video.isOpened(): |
|
success, frame = video.read() |
|
if success: |
|
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
|
frame = torch.from_numpy(frame.astype(np.uint8)).permute(2, 0, 1).float() |
|
frame = frame[None].to(self.device) |
|
frame_list.append(frame) |
|
else: |
|
break |
|
video.release() |
|
assert frame_list != [] |
|
frame_list = self.extract_frame(frame_list, interval) |
|
return frame_list |
|
|
|
|
|
def extract_frame(self, frame_list, interval=1): |
|
extract = [] |
|
for i in range(0, len(frame_list), interval): |
|
extract.append(frame_list[i]) |
|
return extract |
|
|
|
|
|
def get_frames_from_img_folder(self, img_folder): |
|
exts = ['jpg', 'png', 'jpeg', 'bmp', 'tif', |
|
'tiff', 'JPG', 'PNG', 'JPEG', 'BMP', |
|
'TIF', 'TIFF'] |
|
frame_list = [] |
|
imgs = sorted([p for p in glob.glob(os.path.join(img_folder, "*")) if os.path.splitext(p)[1][1:] in exts]) |
|
|
|
for img in imgs: |
|
frame = cv2.imread(img, cv2.IMREAD_COLOR) |
|
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
|
frame = torch.from_numpy(frame.astype(np.uint8)).permute(2, 0, 1).float() |
|
frame = frame[None].to(self.device) |
|
frame_list.append(frame) |
|
assert frame_list != [] |
|
return frame_list |
|
|
|
|
|
|
|
def dynamic_degree(dynamic, video_list): |
|
sim = [] |
|
video_results = [] |
|
for video_path in tqdm(video_list): |
|
score_per_video = dynamic.infer(video_path) |
|
video_results.append({'video_path': video_path, 'video_results': score_per_video}) |
|
sim.append(score_per_video) |
|
avg_score = np.mean(sim) |
|
return avg_score, video_results |
|
|
|
|
|
|
|
def compute_dynamic_degree(json_dir, device, submodules_list): |
|
model_path = submodules_list["model"] |
|
|
|
args_new = edict({"model":model_path, "small":False, "mixed_precision":False, "alternate_corr":False}) |
|
dynamic = DynamicDegree(args_new, device) |
|
video_list, _ = load_dimension_info(json_dir, dimension='dynamic_degree', lang='en') |
|
all_results, video_results = dynamic_degree(dynamic, video_list) |
|
return all_results, video_results |
|
|