import random from typing import Tuple, Any import numpy as np import pandas as pd import torch # SEQUENCE GENERATION PADDING_VALUE = float('-100') # ANIMATION_PARAMETER_INDICES = { # 0: [], # EOS # 1: [10, 11, 12, 13], # translate: begin, dur, x, y # 2: [10, 11, 14, 15], # curve: begin, dur, via_x, via_y # 3: [10, 11, 16], # scale: begin, dur, from_factor # 4: [10, 11, 17], # rotate: begin, dur, from_degree # 5: [10, 11, 18], # skewX: begin, dur, from_x # 6: [10, 11, 19], # skewY: begin, dur, from_y # 7: [10, 11, 20, 21, 22], # fill: begin, dur, from_r, from_g, from_b # 8: [10, 11, 23], # opcaity: begin, dur, from_f # 9: [10, 11, 24], # blur: begin, dur, from_f # } ANIMATION_PARAMETER_INDICES = { 0: [], # EOS 1: [0, 1, 2, 3], # translate: begin, dur, x, y 2: [0, 1, 4, 5], # curve: begin, dur, via_x, via_y 3: [0, 1, 6], # scale: begin, dur, from_factor 4: [0, 1, 7], # rotate: begin, dur, from_degree 5: [0, 1, 8], # skewX: begin, dur, from_x 6: [0, 1, 9], # skewY: begin, dur, from_y 7: [0, 1, 10, 11, 12], # fill: begin, dur, from_r, from_g, from_b 8: [0, 1, 13], # opcaity: begin, dur, from_f 9: [0, 1, 14], # blur: begin, dur, from_f } def unpack_embedding(embedding: torch.Tensor, dim=0, device="cpu") -> tuple[torch.Tensor, torch.Tensor, torch.Tensor]: """ Args: device: cpu / gpu dim: dimension where the embedding is positioned embedding: embedding of dimension 270 Returns: tuple of tensors: deep-svg embedding, type of prediction, animation parameters """ if embedding.shape[dim] != 282: print(embedding.shape) raise ValueError('Dimension of 270 required.') if dim == 0: deep_svg = embedding[: -26].to(device) types = embedding[-26: -15].to(device) parameters = embedding[-15:].to(device) elif dim == 1: deep_svg = embedding[:, : -26].to(device) types = embedding[:, -26: -15].to(device) parameters = embedding[:, -15:].to(device) elif dim == 2: deep_svg = embedding[:, :, : -26].to(device) types = embedding[:, :, -26: -15].to(device) parameters = embedding[:, :, -15:].to(device) else: raise ValueError('Dimension > 2 not possible.') return deep_svg, types, parameters def generate_dataset(dataframe_index: pd.DataFrame, input_sequences_dict_used: dict, input_sequences_dict_unused: dict, output_sequences: pd.DataFrame, logos_list: dict, sequence_length_input: int, sequence_length_output: int, ) -> dict: """ Builds the dataset and returns it Args: input_sequences_dict_used: dictionary containing input sequences per logo input_sequences_dict_unused: dictionary containing all unused paths dataframe_index: dataframe containing the relevant indexes for the dataframes output_sequences: dataframe containing animations logos_list: dictionary in train/test split containing list for logo ids sequence_length_input: length of input sequence for padding sequence_length_output: length of output sequence for padding Returns: dictionary containing the dataset for training/testing """ dataset = { "is_bucketing": False, "train": { "input": [], "output": [] }, "test": { "input": [], "output": [] } } for i, logo_info in dataframe_index.iterrows(): logo = logo_info['filename'] # e.g. logo_1 file = logo_info['file'] # e.g. logo_1_animation_2 oversample = logo_info['repeat'] print(f"Processing {logo} with {file}: ") if input_sequences_dict_used.keys().__contains__(logo) and input_sequences_dict_unused.keys().__contains__(logo): for j in range(oversample): input_tensor = _generate_input_sequence( input_sequences_dict_used[logo].copy(), input_sequences_dict_unused[logo].copy(), #pd.DataFrame(), null_features=26, # TODO depends on architecture later sequence_length=sequence_length_input, # is_randomized=True, always now is_padding=True ) output_tensor = _generate_output_sequence( output_sequences[(output_sequences['filename'] == logo) & (output_sequences['file'] == file)].copy(), sequence_length=sequence_length_output, is_randomized=False, is_padding=True ) # append to lists if logo in logos_list["train"]: random_index = random.randint(0, len(dataset["train"]["input"])) dataset["train"]["input"].insert(random_index, input_tensor) dataset["train"]["output"].insert(random_index, output_tensor) elif logo in logos_list["test"]: dataset["test"]["input"].append(input_tensor) dataset["test"]["output"].append(output_tensor) break # no oversampling in testing else: print(f"Some problem with {logo}. Neither in train or test set list.") break dataset["train"]["input"] = torch.stack(dataset["train"]["input"]) dataset["train"]["output"] = torch.stack(dataset["train"]["output"]) dataset["test"]["input"] = torch.stack(dataset["test"]["input"]) dataset["test"]["output"] = torch.stack(dataset["test"]["output"]) return dataset def _generate_input_sequence(logo_embeddings_used: pd.DataFrame, logo_embeddings_unused: pd.DataFrame, null_features: int, sequence_length: int, is_padding: bool) -> torch.Tensor: """ Build a torch tensor for the transformer input sequences. Includes - Ensuring all used embeddings are included - Filling the remainder with unused embeddings up to sequence length - Generation of padding Args: logo_embeddings (pd.DataFrame): DataFrame containing logo embeddings. null_features (int): Number of null features to add to each embedding. sequence_length (int): Target length for padding sequences. is_padding: if true, function adds padding Returns: torch.Tensor: Tensor representing the input sequences. """ logo_embeddings_used.drop(columns=['filename', 'animation_id'], inplace=True) logo_embeddings_unused.drop(columns=['filename', 'animation_id'], inplace=True) # Combine used and unused. Fill used with random unused samples logo_embeddings = logo_embeddings_unused remaining_slots = sequence_length - len(logo_embeddings) if remaining_slots > 0: sample_size = min(len(logo_embeddings_unused), remaining_slots) additional_embeddings = logo_embeddings_unused.sample(n=sample_size, replace=False) logo_embeddings = pd.concat([logo_embeddings, additional_embeddings], ignore_index=True) logo_embeddings.reset_index() # Randomization logo_embeddings = logo_embeddings.sample(frac=1).reset_index(drop=True) # Null Features if null_features > 0: logo_embeddings = pd.concat([logo_embeddings, pd.DataFrame(0, index=logo_embeddings.index, columns=range(logo_embeddings.shape[1], logo_embeddings.shape[1] + null_features))], axis=1, ignore_index=True) if is_padding: logo_embeddings = _add_padding(logo_embeddings, sequence_length) return torch.tensor(logo_embeddings.values) def _generate_output_sequence(animation: pd.DataFrame, sequence_length: int, is_randomized: bool, is_padding: bool) -> torch.Tensor: """ Build a torch tensor for the transformer output sequences. Includes - Randomization (later, when same start time) - Generation of padding - Add EOS Token Args: animation (pd.DataFrame): DataFrame containing logo embeddings. sequence_length (int): Target length for padding sequences. is_randomized: shuffle order of paths, applies when same start time is_padding: if true, function adds padding Returns: torch.Tensor: Tensor representing the input sequences. """ if is_randomized: animation = animation.sample(frac=1).reset_index(drop=True) print("Note: Randomization not implemented yet") animation.sort_values(by=['a10'], inplace=True) # again ordered by time start. animation.drop(columns=['file', 'filename', "Unnamed: 0", "id"], inplace=True) # Append the EOS row to the DataFrame sos_eos_row = {col: 0 for col in animation.columns} sos_eos_row["a0"] = 1 sos_eos_row = pd.DataFrame([sos_eos_row]) animation = pd.concat([sos_eos_row, animation, sos_eos_row], ignore_index=True) # Padding Generation: Add padding rows or cut off excess rows if is_padding: animation = _add_padding(animation, sequence_length) return torch.Tensor(animation.values) def _add_padding(dataframe: pd.DataFrame, sequence_length: int) -> pd.DataFrame: """ Add padding to a dataframe Args: dataframe: dataframe to add padding to sequence_length: length of final sequences Returns: """ if len(dataframe) < sequence_length: padding_rows = pd.DataFrame([[PADDING_VALUE] * len(dataframe.columns)] * (sequence_length - len(dataframe)), columns=dataframe.columns) dataframe = pd.concat([dataframe, padding_rows], ignore_index=True) elif len(dataframe) > sequence_length: # Cut off excess rows dataframe = dataframe.iloc[:sequence_length] return dataframe # BUCKETING def generate_buckets_2D(dataset, column1, column2, quantiles1, quantiles2, print_histogram=True): """ Args: dataset: dataset to generate buckets for column1: first column name column2: second column name quantiles1: initial quantiles for column1 quantiles2: initial quantiles for column2 print_histogram: if true, a histogram of the 2D buckets is printed Returns: dictionary object with bucket edges """ x_edges = dataset[column1].quantile(quantiles1) y_edges = dataset[column2].quantile(quantiles2) x_edges = np.array(x_edges) y_edges = np.unique(y_edges) if print_histogram: hist, x_edges, y_edges = np.histogram2d(dataset[column1], dataset[column2], bins=[x_edges, y_edges]) print(hist) return { "input_edges": list(x_edges), "output_edges": list(y_edges) } def get_bucket(input_length, output_length, buckets): bucket_name = "" for i, input_edge in enumerate(buckets["input_edges"]): # print(f"{i}: {input_length} < {input_edge}") if input_length > input_edge: continue bucket_name = bucket_name + str(int(i)) # chr(ord('A')+i) break bucket_name = bucket_name + "-" for i, output_edge in enumerate(buckets["output_edges"]): if output_length > output_edge: continue bucket_name = bucket_name + str(int(i)) break return bucket_name def warn_if_contains_NaN(dataset: torch.Tensor): if torch.isnan(dataset).any(): print("There are NaN values in the dataset")