File size: 16,485 Bytes
0f90f73 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 |
import os
os.environ['CURL_CA_BUNDLE'] = ''
import torch
# from simplet5 import SimpleT5
import torchvision.transforms as transforms
import openai
import ffmpeg
from .tag2text import tag2text_caption
from .utils import *
from .load_internvideo import *
from .grit_model import DenseCaptioning
from .lang import SimpleLanguageModel
from scipy.io.wavfile import write as write_wav
from bark import SAMPLE_RATE, generate_audio
class VideoCaption:
def __init__(self, device):
self.device = device
self.image_size = 384
# self.threshold = 0.68
self.video_path = None
self.result = None
self.tags = None
self.normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
self.transform = transforms.Compose([transforms.ToPILImage(),transforms.Resize((self.image_size, self.image_size)), transforms.ToTensor(),self.normalize])
self.model = tag2text_caption(pretrained="model_zoo/tag2text_swin_14m.pth", image_size=self.image_size, vit='swin_b').eval().to(device)
self.load_video = LoadVideo()
print("[INFO] initialize Caption model success!")
def framewise_details(self, inputs):
video_path = inputs.strip()
caption = self.inference(video_path)
frame_caption = ""
prev_caption = ""
start_time = 0
end_time = 0
for i, j in enumerate(caption):
current_caption = f"{j}."
current_dcs = f"{i+1}"
if len(current_dcs) > 0:
last_valid_dcs = current_dcs
if current_caption == prev_caption:
end_time = i+1
else:
if prev_caption:
frame_caption += f"Second {start_time} - {end_time}: {prev_caption}{last_valid_dcs}\n"
start_time = i+1
end_time = i+1
prev_caption = current_caption
if prev_caption:
frame_caption += f"Second {start_time} - {end_time}: {prev_caption}{current_dcs}\n"
total_dur = end_time
frame_caption += f"| Total Duration: {total_dur} seconds.\n"
print(frame_caption)
# self.result = frame_caption
self.video_path = video_path
# video_prompt = f"""The tags for this vieo are: {prediction}, {','.join(tag_1)};
# The temporal description of the video is: {frame_caption}
# The dense caption of the video is: {dense_caption}
# The general description of the video is: {synth_caption[0]}"""
return frame_caption
@prompts(name="Video Caption",
description="useful when you want to generate a description for video. "
"like: generate a description or caption for this video. "
"The input to this tool should be a string, "
"representing the video_path")
def inference(self, inputs):
video_path = inputs.strip()
data = self.load_video(video_path)
# progress(0.2, desc="Loading Videos")
tmp = []
for _, img in enumerate(data):
tmp.append(self.transform(img).to(self.device).unsqueeze(0))
# Video Caption
image = torch.cat(tmp).to(self.device)
# self.threshold = 0.68
input_tag_list = None
with torch.no_grad():
caption, tags = self.model.generate(image,tag_input = input_tag_list, max_length = 50, return_tag_predict = True)
# print(frame_caption, dense_caption, synth_caption)
# print(caption)
del data, image, tmp
torch.cuda.empty_cache()
torch.cuda.ipc_collect()
self.result = caption
self.tags = tags
# return '. '.join(caption)
return caption
class Summarization:
def __init__(self, device):
self.device = device
self.model = SimpleT5()
self.model.load_model(
"t5", "./model_zoo/flan-t5-large-finetuned-openai-summarize_from_feedback", use_gpu=False)
self.model.model = self.model.model.to(self.device)
self.model.device = device
print("[INFO] initialize Summarize model success!")
@prompts(name="Video Summarization",
description="useful when you want to Summarize video content for input video. "
"like: summarize this video. "
"The input to this tool should be a string, "
"representing the video_path")
def inference(self, inputs):
caption = inputs.strip()
sum_res = self.model.predict(caption)
return sum_res
class ActionRecognition:
def __init__(self, device):
self.device = device
self.video_path = None
# self.result = None
self.model = load_intern_action(device)
self.transform = transform_action()
self.toPIL = T.ToPILImage()
self.load_video = LoadVideo()
print("[INFO] initialize InternVideo model success!")
@prompts(name="Action Recognition",
description="useful when you want to recognize the action category in this video. "
"like: recognize the action or classify this video"
"The input to this tool should be a string, "
"representing the video_path")
def inference(self, inputs):
video_path = inputs.strip()
# if self.video_path == video_path:
# return self.result
# self.video_path = video_path
# data = loadvideo_decord_origin(video_path)
data = self.load_video(video_path)
# InternVideo
action_index = np.linspace(0, len(data)-1, 8).astype(int)
tmp_pred = []
for i,img in enumerate(data):
if i in action_index:
tmp_pred.append(self.toPIL(img))
action_tensor = self.transform(tmp_pred)
TC, H, W = action_tensor.shape
action_tensor = action_tensor.reshape(1, TC//3, 3, H, W).permute(0, 2, 1, 3, 4).to(self.device)
with torch.no_grad():
prediction = self.model(action_tensor)
prediction = F.softmax(prediction, dim=1).flatten()
prediction = kinetics_classnames[str(int(prediction.argmax()))]
# self.result = prediction
return prediction
class DenseCaption:
def __init__(self, device):
self.device = device
self.model = DenseCaptioning(device)
self.model.initialize_model()
# self.model = self.model.to(device)
self.load_video = LoadVideo()
print("[INFO] initialize DenseCaptioe model success!")
@prompts(name="Video Dense Caption",
description="useful when you want to generate a dense caption for video. "
"like: generate a dense caption or description for this video. "
"The input to this tool should be a string, "
"representing the video_path")
def inference(self, inputs):
video_path = inputs.strip()
# data = loadvideo_decord_origin(video_path)
data = self.load_video(video_path)
dense_caption = []
dense_index = np.arange(0, len(data)-1, 5)
original_images = data[dense_index,:,:,::-1]
with torch.no_grad():
for original_image in original_images:
dense_caption.append(self.model.run_caption_tensor(original_image))
dense_caption = ' '.join([f"Second {i+1} : {j}.\n" for i,j in zip(dense_index,dense_caption)])
return dense_caption
class GenerateTikTokVideo:
template_model = True
def __init__(self, ActionRecognition, VideoCaption, DenseCaption):
self.ActionRecognition = ActionRecognition
self.VideoCaption = VideoCaption
# self.Summarization = Summarization
self.DenseCaption = DenseCaption
self.SimpleLanguageModel = None
@prompts(name="Generate TikTok Video",
description="useful when you want to generate a video with TikTok style based on prompt."
"like: cut this video to a TikTok video based on prompt."
"The input to this tool should be a comma separated string of two, "
"representing the video_path and prompt")
def inference(self, inputs):
video_path = inputs.split(',')[0].strip()
text = ', '.join(inputs.split(',')[1: ])
if self.SimpleLanguageModel == None:
self.SimpleLanguageModel = SimpleLanguageModel()
action_classes = self.ActionRecognition.inference(video_path)
print(f'action_classes = {action_classes}')
dense_caption = self.DenseCaption.inference(video_path)
print(f'dense_caption = {dense_caption}')
caption = self.VideoCaption.inference(video_path)
caption = '. '.join(caption)
print(f'caption = {caption}')
tags = self.VideoCaption.tags
print(f'tags = {tags}')
framewise_caption = self.VideoCaption.framewise_details(video_path)
print(f'framewise_caption = {framewise_caption}')
video_prompt = f"""The tags for this video are: {action_classes}, {','.join(tags)};
The temporal description of the video is: {framewise_caption}
The dense caption of the video is: {dense_caption}"""
timestamp = self.run_text_with_time(video_prompt, text)
print(f'timestamp = {timestamp}')
if not timestamp:
return 'Error! Please try it again.'
start_time, end_time = min(timestamp), max(timestamp)
print(f'start_time, end_time = = {start_time}, {end_time}')
video_during = end_time - start_time + 1
# prompt=f"忘记之前的回答模板,请使用中文回答这个问题。如果情节里遇到男生就叫小帅,女生就叫小美,请以’注意看,这个人叫’开始写一段的视频营销文案。尽量根据第{start_time}秒到第{end_time}秒左右的视频内容生成文案,不要生成重复句子。"
# prompt=f"忘记之前的回答模板,请使用中文回答这个问题。如果情节里遇到男生就叫小帅,女生就叫小美,请以’注意看,这个人叫’为开头,根据第{start_time}秒到第{end_time}秒左右的视频内容生成一段视频营销文案。"
prompt=f"忘记之前的回答模板,请使用中文回答这个问题。视频里如果出现男生就叫小帅,出现女生就叫小美,如果不确定性别,就叫大聪明。请以’注意看,这个人叫’为开头生成一段视频营销文案。"
texts = self.run_text_with_tiktok(video_prompt, prompt).strip()
# if texts.endswith('')
texts += '。'
print(f"before polishing: {texts}")
print('*' * 40)
# texts = openai.ChatCompletion.create(model="gpt-3.5-turbo",messages=[{"role":"user","content":f"请用润色下面的句子,去除重复的片段,但尽量保持原文内容且不许更改人物名字,并且以“注意看,这个人叫”作为开头:{texts}"}]).choices[0].message['content']
texts = openai.ChatCompletion.create(model="gpt-3.5-turbo",messages=[{"role":"user","content":f"使用中文回答这个问题,请用润色下面的句子,去除重复的片段,并且仍以’注意看,这个人叫’为开头:{texts}"}]).choices[0].message['content']
print(f"after polishing: {texts}")
clipped_video_path = gen_new_name(video_path, 'tmp', 'mp4')
wav_file = clipped_video_path.replace('.mp4', '.wav')
audio_path = self.gen_audio(texts, wav_file)
audio_duration = int(float(ffmpeg.probe(audio_path)['streams'][0]['duration']))+1
os.system(f"ffmpeg -y -v quiet -ss {start_time} -t {video_during} -i {video_path} -c:v libx264 -c:a copy -movflags +faststart {clipped_video_path}")
# output_path = self.image_filename.replace('.mp4','_tiktok.mp4')
new_video_path = gen_new_name(video_path, 'GenerateTickTokVideo', 'mp4')
if video_during < audio_duration:
# 鬼畜hou
# video_concat = os.path.join(os.path.dirname(clipped_video_path), 'concat.info')
# video_concat = gen_new_name(clipped_video_path, '', 'info')
video_concat = os.path.join(os.path.dirname(clipped_video_path), 'concat.info')
video_concat = gen_new_name(video_concat, '', 'info')
with open(video_concat,'w') as f:
for _ in range(audio_duration//video_during+1):
f.write(f"file \'{os.path.basename(clipped_video_path)}\'\n")
tmp_path = gen_new_name(video_path, 'tmp', 'mp4')
os.system(f"ffmpeg -y -f concat -i {video_concat} {tmp_path}")
print(f"ffmpeg -y -i {tmp_path} -i {wav_file} {new_video_path}")
os.system(f"ffmpeg -y -i {tmp_path} -i {wav_file} {new_video_path}")
else:
print(f"ffmpeg -y -i {clipped_video_path} -i {wav_file} {new_video_path}")
os.system(f"ffmpeg -y -i {clipped_video_path} -i {wav_file} {new_video_path}")
if not os.path.exists(new_video_path):
import pdb
pdb.set_trace()
# state = state + [(text, f"Here is the video in *{new_file_path}*")] +[("show me the video.", (new_file_path,))]
# print(f"\nProcessed run_video, Input video: {new_file_path}\nCurrent state: {state}\n"
# f"Current Memory: {self.agent.memory.buffer}")
return (new_video_path, )
def run_text_with_time(self, video_caption, text):
# self.agent.memory.buffer = cut_dialogue_history(self.agent.memory.buffer, keep_last_n_words=500)
prompt = "Only in this conversation, \
You must find the text-related start time \
and end time based on video caption. Your answer \
must end with the format {answer} [start time: end time]."
response = self.SimpleLanguageModel(f"Video content: {video_caption}. Text: {text.strip()}." + prompt)
# res['output'] = res['output'].replace("\\", "/")
# print(response)
import re
pattern = r"\d+"
# response = res['output']#rsplit(']')[-1]
try:
# matches = re.findall(pattern, res['output'])
matches = re.findall(pattern, response)
start_idx , end_idx = matches[-2:]
start_idx , end_idx = int(start_idx), int(end_idx)
except:
return None
import pdb
pdb.set_trace()
# state = state + [(text, response)]
print(f"\nProcessed run_text_with_time, Input text: {text}\n")
return (start_idx, end_idx)
def run_text_with_tiktok(self, video_content, prompt):
# self.agent.memory.buffer = cut_dialogue_history(self.agent.memory.buffer, keep_last_n_words=500)
inputs = f"Video description: {video_content}. {prompt}"
response = self.SimpleLanguageModel(inputs)
response = response.replace("\\", "/")
# res = self.agent({"input":text})
# res['output'] = res['output'].replace("\\", "/")
# response = res['output']
# state = state + [(prompt, response)]
print(f"\nProcessed run_text_with_tiktok, Input text: {prompt}\n, Response: {response}")
return response
def gen_audio(self, text, save_path):
audio_array = generate_audio(text)
write_wav(save_path, SAMPLE_RATE, audio_array)
return save_path
if __name__ == '__main__':
# model = VideoCaption('cuda:0')
# print(model.inference('./assets/f4236666.mp4'))
# model = ActionRecognition('cuda:0')
# print(model.inference('./assets/f4236666.mp4'))
video_path = './tmp_files/f4236666.mp4'
device = 'cuda:0'
# caption_model = VideoCaption('cuda:0')
# caption = caption_model.inference('./assets/f4236666.mp4')
# sum_model = Summarize('cuda:0')
# res = sum_model.inference(caption)
# ds = DenseCaption(device)
# res = ds.inference(video_path)
from lang import SimpleLanguageModel
model = GenerateTikTokVideo(ActionRecognition(device),
VideoCaption(device),
DenseCaption(device)
)
out = model.inference(video_path+",帮我剪辑出最精彩的片段")
print(out) |