anchorxia's picture
add mmcm
a57c6eb
from copy import deepcopy
from typing import Iterable
import logging
import numpy as np
from ..utils.util import convert_class_attr_to_dict
logger = logging.getLogger(__name__) # pylint: disable=invalid-name
class Clip(object, Item):
"""媒体片段, 指转场点与转场点之间的部分"""
def __init__(
self,
time_start,
duration,
clipid=None,
media_type=None,
mediaid=None,
timepoint_type=None,
text=None,
stage=None,
path=None,
duration_num=None,
group_time_start=0,
group_clipid=None,
original_clipid=None,
emb=None,
multi_factor=None,
similar_clipseq=None,
rythm: float = None,
**kwargs
):
"""
Args:
time_start (float): 开始时间,秒为单位,对应该媒体文件的, 和media_map.json上的序号一一对应
duration (_type_): 片段持续时间
clipid (int, or [int]): 由media_map提供的片段序号, 和media_map.json上的序号一一对应
media_type (str, optional): music, video,text, Defaults to None.
mediaid (int): 多媒体id, 当clipid是列表时,表示该片段是个融合片段
timepoint_type(int, ): 开始点的转场类型. Defaults to None.
text(str, optional): 该片段的文本描述,音乐可以是歌词,视频可以是台词,甚至可以是弹幕. Defaults to None.
stage(str, optional): 该片段在整个媒体文件中的结构位置,如音乐的intro、chrous、vesa,视频的片头、片尾、开始、高潮、转场等. Defaults to None.
path (_type_, optional): 该媒体文件的路径,用于后续媒体读取、处理. Defaults to None.
duration_num (_type_, optional): 片段持续帧数, Defaults to None.
group_time_start (int, optional): 当多歌曲、多视频剪辑时,group_time_start 表示该片段所对应的子媒体前所有子媒体的片段时长总和。
默认0, 表示只有1个媒体文件. Defaults to 0.
group_clipid (int, optional): # MediaInfo.sub_meta_info 中的实际序号.
original_clipid (None or [int], optional): 有些片段由其他片段合并,该字段用于片段来源,id是 media_map.json 中的实际序号. Defaults to None.
emb (np.array, optional): 片段 综合emb,. Defaults to None.
multi_factor (MultiFactorFeature), optional): 多维度特征. Defaults to None.
similar_clipseq ([Clip]], optional): 与该片段相似的片段,具体结构待定义. Defaults to None.
"""
self.media_type = media_type
self.mediaid = mediaid
self.time_start = time_start
self.duration = duration
self.clipid = clipid
self.path = path
self.timepoint_type = timepoint_type
self.text = text
self.stage = stage
self.group_time_start = group_time_start
self.group_clipid = group_clipid
self.duration_num = duration_num
self.original_clipid = original_clipid if original_clipid is not None else []
self.emb = emb
self.multi_factor = multi_factor
self.similar_clipseq = similar_clipseq
self.rythm = rythm
# TODO: 目前谱面中会有一些不必要的中间结果,比较占内存,现在代码里删掉,待后续数据协议确定
kwargs = {k: v for k, v in kwargs.items()}
self.__dict__.update(kwargs)
self.preprocess()
def preprocess(self):
pass
def spread_parameters(self):
pass
@property
def time_end(
self,
):
return self.time_start + self.duration
@property
def mvp_clip(self):
"""读取实际的片段数据为moviepy格式
Raises:
NotImplementedError: _description_
"""
raise NotImplementedError
class ClipSeq(object):
"""媒体片段序列"""
ClipClass = Clip
def __init__(self, clips) -> None:
"""_summary_
Args:
clips ([Clip]]): 媒体片段序列
"""
if not isinstance(clips, list):
clips = [clips]
if len(clips) == 0:
self.clips = []
elif isinstance(clips[0], dict):
self.clips = [self.ClipClass(**d) for d in clips]
else:
self.clips = clips
def set_clip_value(self, k, v):
"""给序列中的每一个clip 赋值"""
for i in range(len(self.clips)):
self.clips[i].__setattr__(k, v)
def __len__(
self,
):
return len(self.clips)
def merge(self, other, group_time_start_delta=None, groupid_delta=None):
"""融合其他ClipSeq。media_info 融合时需要记录 clip 所在的 groupid 和 group_time_start,delta用于表示变化
Args:
other (ClipSeq): 待融合的ClipSeq
group_time_start_delta (float, optional): . Defaults to None.
groupid_delta (int, optional): _description_. Defaults to None.
"""
if group_time_start_delta is not None or groupid_delta is not None:
for i, clip in enumerate(other):
if group_time_start_delta is not None:
clip.group_time_start += group_time_start_delta
if groupid_delta is not None:
clip.groupid += groupid_delta
self.clips.extend(other.clips)
for i in range(len(self.clips)):
self.clips[i].group_clipid = i
@property
def duration(
self,
):
"""Clip.duration的和
Returns:
float: 序列总时长
"""
if len(self.clips) == 0:
return 0
else:
return sum([c.duration for c in self.clips])
def __getitem__(self, i) -> Clip:
"""支持索引和切片操作,如果输入是整数则返回Clip,如果是切片,则返回ClipSeq
Args:
i (int or slice): 索引
Raises:
ValueError: 需要按照给的输入类型索引
Returns:
Clip or ClipSeq:
"""
if "int" in str(type(i)):
i = int(i)
if isinstance(i, int):
clip = self.clips[i]
return clip
elif isinstance(i, Iterable):
clips = [self.__getitem__(x) for x in i]
clipseq = ClipSeq(clips)
return clipseq
elif isinstance(i, slice):
if i.step is None:
step = 1
else:
step = i.step
clips = [self.__getitem__(x) for x in range(i.start, i.stop, step)]
clipseq = ClipSeq(clips)
return clipseq
else:
raise ValueError(
"unsupported input, should be int or slice, but given {}, type={}".format(
i, type(i)
)
)
def insert(self, idx, obj):
self.clips.insert(idx, obj)
def append(self, obj):
self.clips.append(obj)
def extend(self, objs):
self.clips.extend(objs)
@property
def duration_seq_emb(
self,
):
emb = np.array([c.duration for c in self.clips])
return emb
@property
def timestamp_seq_emb(self):
emb = np.array([c.time_start for c in self.clips])
return emb
@property
def rela_timestamp_seq_emb(self):
emb = self.timestamp_seq_emb / self.duration
return emb
def get_factor_seq_emb(self, factor, dim):
emb = []
for c in self.clips:
if factor not in c.multi_factor or c.multi_factor[factor] is None:
v = np.full(dim, np.inf)
else:
v = c.multi_factor[factor]
emb.append(v)
emb = np.stack(emb, axis=0)
return emb
def semantic_seq_emb(self, dim):
return self.get_factor_seq_emb(factor="semantics", dim=dim)
def emotion_seq_emb(self, dim):
return self.get_factor_seq_emb(factor="emotion", dim=dim)
def theme_seq_emb(self, dim):
return self.get_factor_seq_emb(factor="theme", dim=dim)
def to_dct(
self,
target_keys=None,
ignored_keys=None,
):
if ignored_keys is None:
ignored_keys = ["kwargs", "audio_path", "lyric_path", "start", "end"]
clips = [
clip.to_dct(target_keys=target_keys, ignored_keys=ignored_keys)
for clip in self.clips
]
return clips
@property
def mvp_clip(self):
"""读取实际的片段数据为moviepy格式
Raises:
NotImplementedError: _description_
"""
raise NotImplementedError
class ClipIds(object):
def __init__(
self,
clipids: list or int,
) -> None:
"""ClipSeq 中的 Clip序号,主要用于多个 Clip 融合后的 Clip, 使用场景如
1. 一个 MusicClip 可以匹配到多个 VideoClip,VideoClip 的索引便可以使用 ClipIds 定义。
Args:
clipids (list or int): ClipSeq 中的序号
"""
self.clipids = clipids if isinstance(clipids, list) else [clipids]
class ClipIdsSeq(object):
def __init__(self, clipids_seq: list) -> None:
"""多个 ClipIds,使用场景可以是
1. 将MediaClipSeq 进行重组,拆分重组成更粗粒度的ClipSeq;
Args:
clipids_seq (list): 组合后的 ClipIds 列表
"""
self.clipids_seq = (
clipids_seq if isinstance(clipids_seq, ClipIds) else [clipids_seq]
)
# TODO: metric后续可能是字典
class MatchedClipIds(object):
def __init__(
self, id1: ClipIds, id2: ClipIds, metric: float = None, **kwargs
) -> None:
"""两种模态数据的片段匹配对,使用场景 可以是
1. 音乐片段和视频片段 之间的匹配关系,
Args:
id1 (ClipIds): 第一种模态的片段
id2 (ClipIds): 第二种模态的片段
metric (float): 匹配度量距离
"""
self.id1 = id1 if isinstance(id1, ClipIds) else ClipIds(id1)
self.id2 = id2 if isinstance(id2, ClipIds) else ClipIds(id2)
self.metric = metric
self.__dict__.update(**kwargs)
class MatchedClipIdsSeq(object):
def __init__(self, seq: list, metric: float = None, **kwargs) -> None:
"""两种模态数据的序列匹配对,使用场景可以是
1. 音乐片段序列和视频片段序列 之间的匹配,每一个元素都是MatchedClipIds:
Args:
seq (list): 两种模态数据的序列匹配对列表
metric (float): 匹配度量距离
"""
self.seq = seq
self.metric = metric
self.__dict__.update(**kwargs)