Spaces:
Paused
Paused
File size: 4,219 Bytes
598d165 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
import torch
from transformers import TextStreamer
import numpy as np
import os
import json
import torch
import numpy as np
import base64
from PIL import Image
from io import BytesIO
import matplotlib.pyplot as plt
from torchvision.transforms import Compose, Lambda, ToTensor
from torchvision import transforms
from transformers import ProcessorMixin, BatchEncoding
from transformers.image_processing_utils import BatchFeature
from pytorchvideo.data.encoded_video import EncodedVideo
from torchvision.transforms import Compose, Lambda, ToTensor
from torchvision.transforms._transforms_video import NormalizeVideo, RandomCropVideo, RandomHorizontalFlipVideo, CenterCropVideo
from pytorchvideo.transforms import ApplyTransformToKey, ShortSideScale, UniformTemporalSubsample
def load_frames(frames_dir):
results = []
frame_names = os.listdir(frames_dir)
frame_names.sort()
for frame_name in frame_names:
image_path = f"{frames_dir}/{frame_name}"
results.append(image_path)
return results
def sample_frames(frames, num_segments):
duration = len(frames)
frame_id_array = np.linspace(0, duration-1, num_segments, dtype=int)
frame_id_list = frame_id_array.tolist()
sampled_frames = []
for frame_idx in frame_id_list:
single_frame_path = frames[frame_idx]
sampled_frames.append(single_frame_path)
return sampled_frames
class VideoProcessor:
def __init__(self, image_transform):
self.image_transform = image_transform
def __call__(self, video_path, transform=None,
video_decode_backend='opencv',
clip_start_sec=0.0, clip_end_sec=None,
num_frames=50, **kwargs):
if transform is None: transform = self.image_transform
if video_decode_backend == 'pytorchvideo':
# decord pyav
video = EncodedVideo.from_path(video_path, decoder="decord", decode_audio=False)
duration = video.duration
start_sec = clip_start_sec # secs
end_sec = clip_end_sec if clip_end_sec is not None else duration # secs
video_data = video.get_clip(start_sec=start_sec, end_sec=end_sec)
video_outputs = transform(video_data)
elif video_decode_backend == 'decord':
import decord
from decord import VideoReader, cpu
decord.bridge.set_bridge('torch')
decord_vr = VideoReader(video_path, ctx=cpu(0))
ori_duration = len(decord_vr)
# frame_id_list = np.linspace(0, duration-1, num_frames, dtype=int)
fps_vid = decord_vr.get_avg_fps()
valid_duration = min(int(fps_vid * 10), ori_duration)
frame_id_list = np.linspace(0, valid_duration-1, num_frames, dtype=int)
video_data = decord_vr.get_batch(frame_id_list)
video_data = video_data.permute(3, 0, 1, 2) # (T, H, W, C) -> (C, T, H, W)
video_outputs = transform(video_data)
elif video_decode_backend == 'opencv':
import cv2
cv2_vr = cv2.VideoCapture(video_path)
duration = int(cv2_vr.get(cv2.CAP_PROP_FRAME_COUNT))
frame_id_list = np.linspace(0, duration-1, num_frames, dtype=int)
video_data = []
for frame_idx in frame_id_list:
cv2_vr.set(1, frame_idx)
_, frame = cv2_vr.read()
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
video_data.append(torch.from_numpy(frame).permute(2, 0, 1))
cv2_vr.release()
video_data = torch.stack(video_data, dim=1)
video_outputs = transform(video_data)
elif video_decode_backend == 'frames':
# FIXME does not input start and end clip timestamps. Require duration info to deal with.
frames = load_frames(video_path)
frames = sample_frames(frames, num_frames)
to_tensor = ToTensor()
video_data = torch.stack([to_tensor(_) for _ in frames]).permute(1, 0, 2, 3) # (T, C, H, W) -> (C, T, H, W)
else:
raise NameError('video_decode_backend should specify in (pytorchvideo, decord, opencv, frames)')
|