Spaces:
Running
Running
import random | |
from typing import Tuple, Any | |
import numpy as np | |
import pandas as pd | |
import torch | |
# SEQUENCE GENERATION | |
PADDING_VALUE = float('-100') | |
# ANIMATION_PARAMETER_INDICES = { | |
# 0: [], # EOS | |
# 1: [10, 11, 12, 13], # translate: begin, dur, x, y | |
# 2: [10, 11, 14, 15], # curve: begin, dur, via_x, via_y | |
# 3: [10, 11, 16], # scale: begin, dur, from_factor | |
# 4: [10, 11, 17], # rotate: begin, dur, from_degree | |
# 5: [10, 11, 18], # skewX: begin, dur, from_x | |
# 6: [10, 11, 19], # skewY: begin, dur, from_y | |
# 7: [10, 11, 20, 21, 22], # fill: begin, dur, from_r, from_g, from_b | |
# 8: [10, 11, 23], # opcaity: begin, dur, from_f | |
# 9: [10, 11, 24], # blur: begin, dur, from_f | |
# } | |
ANIMATION_PARAMETER_INDICES = { | |
0: [], # EOS | |
1: [0, 1, 2, 3], # translate: begin, dur, x, y | |
2: [0, 1, 4, 5], # curve: begin, dur, via_x, via_y | |
3: [0, 1, 6], # scale: begin, dur, from_factor | |
4: [0, 1, 7], # rotate: begin, dur, from_degree | |
5: [0, 1, 8], # skewX: begin, dur, from_x | |
6: [0, 1, 9], # skewY: begin, dur, from_y | |
7: [0, 1, 10, 11, 12], # fill: begin, dur, from_r, from_g, from_b | |
8: [0, 1, 13], # opcaity: begin, dur, from_f | |
9: [0, 1, 14], # blur: begin, dur, from_f | |
} | |
def unpack_embedding(embedding: torch.Tensor, dim=0, device="cpu") -> tuple[torch.Tensor, torch.Tensor, torch.Tensor]: | |
""" | |
Args: | |
device: cpu / gpu | |
dim: dimension where the embedding is positioned | |
embedding: embedding of dimension 270 | |
Returns: tuple of tensors: deep-svg embedding, type of prediction, animation parameters | |
""" | |
if embedding.shape[dim] != 282: | |
print(embedding.shape) | |
raise ValueError('Dimension of 270 required.') | |
if dim == 0: | |
deep_svg = embedding[: -26].to(device) | |
types = embedding[-26: -15].to(device) | |
parameters = embedding[-15:].to(device) | |
elif dim == 1: | |
deep_svg = embedding[:, : -26].to(device) | |
types = embedding[:, -26: -15].to(device) | |
parameters = embedding[:, -15:].to(device) | |
elif dim == 2: | |
deep_svg = embedding[:, :, : -26].to(device) | |
types = embedding[:, :, -26: -15].to(device) | |
parameters = embedding[:, :, -15:].to(device) | |
else: | |
raise ValueError('Dimension > 2 not possible.') | |
return deep_svg, types, parameters | |
def generate_dataset(dataframe_index: pd.DataFrame, | |
input_sequences_dict_used: dict, | |
input_sequences_dict_unused: dict, | |
output_sequences: pd.DataFrame, | |
logos_list: dict, | |
sequence_length_input: int, | |
sequence_length_output: int, | |
) -> dict: | |
""" | |
Builds the dataset and returns it | |
Args: | |
input_sequences_dict_used: dictionary containing input sequences per logo | |
input_sequences_dict_unused: dictionary containing all unused paths | |
dataframe_index: dataframe containing the relevant indexes for the dataframes | |
output_sequences: dataframe containing animations | |
logos_list: dictionary in train/test split containing list for logo ids | |
sequence_length_input: length of input sequence for padding | |
sequence_length_output: length of output sequence for padding | |
Returns: dictionary containing the dataset for training/testing | |
""" | |
dataset = { | |
"is_bucketing": False, | |
"train": { | |
"input": [], | |
"output": [] | |
}, | |
"test": { | |
"input": [], | |
"output": [] | |
} | |
} | |
for i, logo_info in dataframe_index.iterrows(): | |
logo = logo_info['filename'] # e.g. logo_1 | |
file = logo_info['file'] # e.g. logo_1_animation_2 | |
oversample = logo_info['repeat'] | |
print(f"Processing {logo} with {file}: ") | |
if input_sequences_dict_used.keys().__contains__(logo) and input_sequences_dict_unused.keys().__contains__(logo): | |
for j in range(oversample): | |
input_tensor = _generate_input_sequence( | |
input_sequences_dict_used[logo].copy(), | |
input_sequences_dict_unused[logo].copy(), | |
#pd.DataFrame(), | |
null_features=26, # TODO depends on architecture later | |
sequence_length=sequence_length_input, | |
# is_randomized=True, always now | |
is_padding=True | |
) | |
output_tensor = _generate_output_sequence( | |
output_sequences[(output_sequences['filename'] == logo) & (output_sequences['file'] == file)].copy(), | |
sequence_length=sequence_length_output, | |
is_randomized=False, | |
is_padding=True | |
) | |
# append to lists | |
if logo in logos_list["train"]: | |
random_index = random.randint(0, len(dataset["train"]["input"])) | |
dataset["train"]["input"].insert(random_index, input_tensor) | |
dataset["train"]["output"].insert(random_index, output_tensor) | |
elif logo in logos_list["test"]: | |
dataset["test"]["input"].append(input_tensor) | |
dataset["test"]["output"].append(output_tensor) | |
break # no oversampling in testing | |
else: | |
print(f"Some problem with {logo}. Neither in train or test set list.") | |
break | |
dataset["train"]["input"] = torch.stack(dataset["train"]["input"]) | |
dataset["train"]["output"] = torch.stack(dataset["train"]["output"]) | |
dataset["test"]["input"] = torch.stack(dataset["test"]["input"]) | |
dataset["test"]["output"] = torch.stack(dataset["test"]["output"]) | |
return dataset | |
def _generate_input_sequence(logo_embeddings_used: pd.DataFrame, | |
logo_embeddings_unused: pd.DataFrame, | |
null_features: int, | |
sequence_length: int, | |
is_padding: bool) -> torch.Tensor: | |
""" | |
Build a torch tensor for the transformer input sequences. | |
Includes | |
- Ensuring all used embeddings are included | |
- Filling the remainder with unused embeddings up to sequence length | |
- Generation of padding | |
Args: | |
logo_embeddings (pd.DataFrame): DataFrame containing logo embeddings. | |
null_features (int): Number of null features to add to each embedding. | |
sequence_length (int): Target length for padding sequences. | |
is_padding: if true, function adds padding | |
Returns: | |
torch.Tensor: Tensor representing the input sequences. | |
""" | |
logo_embeddings_used.drop(columns=['filename', 'animation_id'], inplace=True) | |
logo_embeddings_unused.drop(columns=['filename', 'animation_id'], inplace=True) | |
# Combine used and unused. Fill used with random unused samples | |
logo_embeddings = logo_embeddings_unused | |
remaining_slots = sequence_length - len(logo_embeddings) | |
if remaining_slots > 0: | |
sample_size = min(len(logo_embeddings_unused), remaining_slots) | |
additional_embeddings = logo_embeddings_unused.sample(n=sample_size, replace=False) | |
logo_embeddings = pd.concat([logo_embeddings, additional_embeddings], ignore_index=True) | |
logo_embeddings.reset_index() | |
# Randomization | |
logo_embeddings = logo_embeddings.sample(frac=1).reset_index(drop=True) | |
# Null Features | |
if null_features > 0: | |
logo_embeddings = pd.concat([logo_embeddings, | |
pd.DataFrame(0, | |
index=logo_embeddings.index, | |
columns=range(logo_embeddings.shape[1], | |
logo_embeddings.shape[1] + null_features))], | |
axis=1, | |
ignore_index=True) | |
if is_padding: | |
logo_embeddings = _add_padding(logo_embeddings, sequence_length) | |
return torch.tensor(logo_embeddings.values) | |
def _generate_output_sequence(animation: pd.DataFrame, | |
sequence_length: int, | |
is_randomized: bool, | |
is_padding: bool) -> torch.Tensor: | |
""" | |
Build a torch tensor for the transformer output sequences. | |
Includes | |
- Randomization (later, when same start time) | |
- Generation of padding | |
- Add EOS Token | |
Args: | |
animation (pd.DataFrame): DataFrame containing logo embeddings. | |
sequence_length (int): Target length for padding sequences. | |
is_randomized: shuffle order of paths, applies when same start time | |
is_padding: if true, function adds padding | |
Returns: | |
torch.Tensor: Tensor representing the input sequences. | |
""" | |
if is_randomized: | |
animation = animation.sample(frac=1).reset_index(drop=True) | |
print("Note: Randomization not implemented yet") | |
animation.sort_values(by=['a10'], inplace=True) # again ordered by time start. | |
animation.drop(columns=['file', 'filename', "Unnamed: 0", "id"], inplace=True) | |
# Append the EOS row to the DataFrame | |
sos_eos_row = {col: 0 for col in animation.columns} | |
sos_eos_row["a0"] = 1 | |
sos_eos_row = pd.DataFrame([sos_eos_row]) | |
animation = pd.concat([sos_eos_row, animation, sos_eos_row], | |
ignore_index=True) | |
# Padding Generation: Add padding rows or cut off excess rows | |
if is_padding: | |
animation = _add_padding(animation, sequence_length) | |
return torch.Tensor(animation.values) | |
def _add_padding(dataframe: pd.DataFrame, sequence_length: int) -> pd.DataFrame: | |
""" | |
Add padding to a dataframe | |
Args: | |
dataframe: dataframe to add padding to | |
sequence_length: length of final sequences | |
Returns: | |
""" | |
if len(dataframe) < sequence_length: | |
padding_rows = pd.DataFrame([[PADDING_VALUE] * len(dataframe.columns)] * (sequence_length - len(dataframe)), | |
columns=dataframe.columns) | |
dataframe = pd.concat([dataframe, padding_rows], ignore_index=True) | |
elif len(dataframe) > sequence_length: | |
# Cut off excess rows | |
dataframe = dataframe.iloc[:sequence_length] | |
return dataframe | |
# BUCKETING | |
def generate_buckets_2D(dataset, column1, column2, quantiles1, quantiles2, print_histogram=True): | |
""" | |
Args: | |
dataset: dataset to generate buckets for | |
column1: first column name | |
column2: second column name | |
quantiles1: initial quantiles for column1 | |
quantiles2: initial quantiles for column2 | |
print_histogram: if true, a histogram of the 2D buckets is printed | |
Returns: dictionary object with bucket edges | |
""" | |
x_edges = dataset[column1].quantile(quantiles1) | |
y_edges = dataset[column2].quantile(quantiles2) | |
x_edges = np.array(x_edges) | |
y_edges = np.unique(y_edges) | |
if print_histogram: | |
hist, x_edges, y_edges = np.histogram2d(dataset[column1], | |
dataset[column2], | |
bins=[x_edges, y_edges]) | |
print(hist) | |
return { | |
"input_edges": list(x_edges), | |
"output_edges": list(y_edges) | |
} | |
def get_bucket(input_length, output_length, buckets): | |
bucket_name = "" | |
for i, input_edge in enumerate(buckets["input_edges"]): | |
# print(f"{i}: {input_length} < {input_edge}") | |
if input_length > input_edge: | |
continue | |
bucket_name = bucket_name + str(int(i)) # chr(ord('A')+i) | |
break | |
bucket_name = bucket_name + "-" | |
for i, output_edge in enumerate(buckets["output_edges"]): | |
if output_length > output_edge: | |
continue | |
bucket_name = bucket_name + str(int(i)) | |
break | |
return bucket_name | |
def warn_if_contains_NaN(dataset: torch.Tensor): | |
if torch.isnan(dataset).any(): | |
print("There are NaN values in the dataset") | |