|
import torch |
|
import torch.nn as nn |
|
import torch.functional as F |
|
|
|
|
|
class FloweR(nn.Module): |
|
def __init__(self, input_size = (384, 384), window_size = 4): |
|
super(FloweR, self).__init__() |
|
|
|
self.input_size = input_size |
|
self.window_size = window_size |
|
|
|
|
|
|
|
|
|
self.out_channels = 6 |
|
|
|
|
|
|
|
|
|
|
|
self.conv_block_1 = nn.Sequential( |
|
nn.Conv2d(3 * self.window_size, 128, kernel_size=3, stride=1, padding='same'), |
|
nn.ReLU(), |
|
) |
|
|
|
self.conv_block_2 = nn.Sequential( |
|
nn.AvgPool2d(2), |
|
nn.Conv2d(128, 128, kernel_size=3, stride=1, padding='same'), |
|
nn.ReLU(), |
|
) |
|
|
|
self.conv_block_3 = nn.Sequential( |
|
nn.AvgPool2d(2), |
|
nn.Conv2d(128, 128, kernel_size=3, stride=1, padding='same'), |
|
nn.ReLU(), |
|
) |
|
|
|
self.conv_block_4 = nn.Sequential( |
|
nn.AvgPool2d(2), |
|
nn.Conv2d(128, 128, kernel_size=3, stride=1, padding='same'), |
|
nn.ReLU(), |
|
) |
|
|
|
self.conv_block_5 = nn.Sequential( |
|
nn.AvgPool2d(2), |
|
nn.Conv2d(128, 128, kernel_size=3, stride=1, padding='same'), |
|
nn.ReLU(), |
|
) |
|
|
|
self.conv_block_6 = nn.Sequential( |
|
nn.AvgPool2d(2), |
|
nn.Conv2d(128, 128, kernel_size=3, stride=1, padding='same'), |
|
nn.ReLU(), |
|
) |
|
|
|
self.conv_block_7 = nn.Sequential( |
|
nn.AvgPool2d(2), |
|
nn.Conv2d(128, 128, kernel_size=3, stride=1, padding='same'), |
|
nn.ReLU(), |
|
) |
|
|
|
self.conv_block_8 = nn.Sequential( |
|
nn.AvgPool2d(2), |
|
nn.Conv2d(128, 128, kernel_size=3, stride=1, padding='same'), |
|
nn.ReLU(), |
|
) |
|
|
|
|
|
|
|
|
|
|
|
self.conv_block_9 = nn.Sequential( |
|
nn.Upsample(scale_factor=2), |
|
nn.Conv2d(128, 128, kernel_size=3, stride=1, padding='same'), |
|
nn.ReLU(), |
|
) |
|
|
|
self.conv_block_10 = nn.Sequential( |
|
nn.Upsample(scale_factor=2), |
|
nn.Conv2d(128, 128, kernel_size=3, stride=1, padding='same'), |
|
nn.ReLU(), |
|
) |
|
|
|
self.conv_block_11 = nn.Sequential( |
|
nn.Upsample(scale_factor=2), |
|
nn.Conv2d(128, 128, kernel_size=3, stride=1, padding='same'), |
|
nn.ReLU(), |
|
) |
|
|
|
self.conv_block_12 = nn.Sequential( |
|
nn.Upsample(scale_factor=2), |
|
nn.Conv2d(128, 128, kernel_size=3, stride=1, padding='same'), |
|
nn.ReLU(), |
|
) |
|
|
|
self.conv_block_13 = nn.Sequential( |
|
nn.Upsample(scale_factor=2), |
|
nn.Conv2d(128, 128, kernel_size=3, stride=1, padding='same'), |
|
nn.ReLU(), |
|
) |
|
|
|
self.conv_block_14 = nn.Sequential( |
|
nn.Upsample(scale_factor=2), |
|
nn.Conv2d(128, 128, kernel_size=3, stride=1, padding='same'), |
|
nn.ReLU(), |
|
) |
|
|
|
self.conv_block_15 = nn.Sequential( |
|
nn.Upsample(scale_factor=2), |
|
nn.Conv2d(128, 128, kernel_size=3, stride=1, padding='same'), |
|
nn.ReLU(), |
|
) |
|
|
|
self.conv_block_16 = nn.Conv2d(128, self.out_channels, kernel_size=3, stride=1, padding='same') |
|
|
|
def forward(self, input_frames): |
|
|
|
if input_frames.size(1) != self.window_size: |
|
raise Exception(f'Shape of the input is not compatable. There should be exactly {self.window_size} frames in an input video.') |
|
|
|
h, w = self.input_size |
|
|
|
input_frames_permuted = input_frames.permute((0, 1, 4, 2, 3)) |
|
|
|
|
|
in_x = input_frames_permuted.reshape(-1, self.window_size * 3, self.input_size[0], self.input_size[1]) |
|
|
|
|
|
block_1_out = self.conv_block_1(in_x) |
|
block_2_out = self.conv_block_2(block_1_out) |
|
block_3_out = self.conv_block_3(block_2_out) |
|
block_4_out = self.conv_block_4(block_3_out) |
|
block_5_out = self.conv_block_5(block_4_out) |
|
block_6_out = self.conv_block_6(block_5_out) |
|
block_7_out = self.conv_block_7(block_6_out) |
|
block_8_out = self.conv_block_8(block_7_out) |
|
|
|
|
|
block_9_out = block_7_out + self.conv_block_9(block_8_out) |
|
block_10_out = block_6_out + self.conv_block_10(block_9_out) |
|
block_11_out = block_5_out + self.conv_block_11(block_10_out) |
|
block_12_out = block_4_out + self.conv_block_12(block_11_out) |
|
block_13_out = block_3_out + self.conv_block_13(block_12_out) |
|
block_14_out = block_2_out + self.conv_block_14(block_13_out) |
|
block_15_out = block_1_out + self.conv_block_15(block_14_out) |
|
|
|
block_16_out = self.conv_block_16(block_15_out) |
|
out = block_16_out.reshape(-1, self.out_channels, self.input_size[0], self.input_size[1]) |
|
|
|
|
|
device = out.get_device() |
|
|
|
pred_flow = out[:,:2,:,:] * 255 |
|
pred_occl = (out[:,2:3,:,:] + 1) / 2 |
|
pred_next = out[:,3:6,:,:] |
|
|
|
|
|
|
|
|
|
''' |
|
d = torch.linspace(-1, 1, 8) |
|
meshx, meshy = torch.meshgrid((d, d)) |
|
grid = torch.stack((meshy, meshx), 2) |
|
grid = grid.unsqueeze(0) ''' |
|
|
|
grid_y, grid_x = torch.meshgrid(torch.arange(0, h), torch.arange(0, w)) |
|
flow_grid = torch.stack((grid_x, grid_y), dim=0).float() |
|
flow_grid = flow_grid.unsqueeze(0).to(device=device) |
|
flow_grid = flow_grid + pred_flow |
|
|
|
flow_grid[:, 0, :, :] = 2 * flow_grid[:, 0, :, :] / (w - 1) - 1 |
|
flow_grid[:, 1, :, :] = 2 * flow_grid[:, 1, :, :] / (h - 1) - 1 |
|
|
|
flow_grid = flow_grid.permute(0, 2, 3, 1) |
|
|
|
|
|
previous_frame = input_frames_permuted[:, -1, :, :, :] |
|
sampling_mode = "bilinear" if self.training else "nearest" |
|
warped_frame = torch.nn.functional.grid_sample(previous_frame, flow_grid, mode=sampling_mode, padding_mode="reflection", align_corners=False) |
|
alpha_mask = torch.clip(pred_occl * 10, 0, 1) * 0.04 |
|
pred_next = torch.clip(pred_next, -1, 1) |
|
warped_frame = torch.clip(warped_frame, -1, 1) |
|
next_frame = pred_next * alpha_mask + warped_frame * (1 - alpha_mask) |
|
|
|
res = torch.cat((pred_flow / 255, pred_occl * 2 - 1, next_frame), dim=1) |
|
|
|
|
|
res = res.permute((0, 2, 3, 1)) |
|
|
|
return res |