# -*- coding: utf-8 -*- """ Created on Fri Sep 13 16:13:29 2024 This script generates preprocessed data from wireless communication scenarios, including token generation, patch creation, and data sampling for machine learning models. @author: salikha4 """ import numpy as np import os from tqdm import tqdm import time import pickle #%% Scenarios List def scenarios_list(): """Returns an array of available scenarios.""" return np.array([ 'city_18_denver', 'city_15_indianapolis', 'city_19_oklahoma', 'city_12_fortworth', 'city_11_santaclara', 'city_7_sandiego' ]) #%% Token Generation def tokenizer(deepmimo_data, gen_raw=True): """ Generates tokens by preparing and preprocessing the dataset. Args: scenario_idxs (list): Indices of the scenarios. patch_gen (bool): Whether to generate patches. Defaults to True. patch_size (int): Size of each patch. Defaults to 16. gen_deepMIMO_data (bool): Whether to generate DeepMIMO data. Defaults to False. gen_raw (bool): Whether to generate raw data. Defaults to False. save_data (bool): Whether to save the preprocessed data. Defaults to False. Returns: preprocessed_data, sequence_length, element_length: Preprocessed data and related dimensions. """ # Patch generation or loading n_scenarios = len(deepmimo_data) patches = [patch_maker(deepmimo_data[scenario_idx]) for scenario_idx in range(n_scenarios)] patches = np.vstack(patches) # Define dimensions patch_size = patches.shape[2] n_patches = patches.shape[1] n_masks_half = int(0.15 * n_patches / 2) sequence_length = n_patches + 1 element_length = patch_size word2id = {'[CLS]': 0.2 * np.ones((patch_size)), '[MASK]': 0.1 * np.ones((patch_size))} # Generate preprocessed channels preprocessed_data = [] for user_idx in tqdm(range(len(patches)), desc="Processing items"): sample = make_sample(user_idx, patches, word2id, n_patches, n_masks_half, patch_size, gen_raw=gen_raw) preprocessed_data.append(sample) return preprocessed_data #%% Patch Creation def patch_maker(data, patch_size=16, norm_factor=1e6): """ Creates patches from the dataset based on the scenario. Args:- patch_size (int): Size of each patch. scenario (str): Selected scenario for data generation. gen_deepMIMO_data (bool): Whether to generate DeepMIMO data. norm_factor (int): Normalization factor for channels. Returns: patch (numpy array): Generated patches. """ idxs = np.where(data['user']['LoS'] != -1)[0] # Reshaping and normalizing channels original_ch = data['user']['channel'][idxs] flat_channels = original_ch.reshape((original_ch.shape[0], -1)).astype(np.csingle) flat_channels_complex = np.hstack((flat_channels.real, flat_channels.imag)) * norm_factor # Create patches n_patches = flat_channels_complex.shape[1] // patch_size patch = np.zeros((len(idxs), n_patches, patch_size)) for idx in range(n_patches): patch[:, idx, :] = flat_channels_complex[:, idx * patch_size:(idx + 1) * patch_size] return patch #%% Data Generation for Scenario Areas def DeepMIMO_data_gen(scenario): """ Generates or loads data for a given scenario. Args: scenario (str): Scenario name. gen_deepMIMO_data (bool): Whether to generate DeepMIMO data. save_data (bool): Whether to save generated data. Returns: data (dict): Loaded or generated data. """ parameters, row_column_users, n_ant_bs, n_ant_ue, n_subcarriers = get_parameters(scenario) deepMIMO_dataset = DeepMIMOv3.generate_data(parameters) uniform_idxs = uniform_sampling(deepMIMO_dataset, [1, 1], len(parameters['user_rows']), users_per_row=row_column_users[scenario]['n_per_row']) data = select_by_idx(deepMIMO_dataset, uniform_idxs)[0] return data #%%% def get_parameters(scenario): n_ant_bs = 32 #32 n_ant_ue = 1 n_subcarriers = 32 #32 scs = 30e3 row_column_users = { 'city_18_denver': { 'n_rows': 85, 'n_per_row': 82 }, 'city_15_indianapolis': { 'n_rows': 80, 'n_per_row': 79 }, 'city_19_oklahoma': { 'n_rows': 82, 'n_per_row': 75 }, 'city_12_fortworth': { 'n_rows': 86, 'n_per_row': 72 }, 'city_11_santaclara': { 'n_rows': 47, 'n_per_row': 114 }, 'city_7_sandiego': { 'n_rows': 71, 'n_per_row': 83 }} parameters = DeepMIMOv3.default_params() parameters['dataset_folder'] = './scenarios' parameters['scenario'] = scenario if scenario == 'O1_3p5': parameters['active_BS'] = np.array([4]) elif scenario in ['city_18_denver', 'city_15_indianapolis']: parameters['active_BS'] = np.array([3]) else: parameters['active_BS'] = np.array([1]) if scenario == 'Boston5G_3p5': parameters['user_rows'] = np.arange(row_column_users[scenario]['n_rows'][0], row_column_users[scenario]['n_rows'][1]) else: parameters['user_rows'] = np.arange(row_column_users[scenario]['n_rows']) parameters['bs_antenna']['shape'] = np.array([n_ant_bs, 1]) # Horizontal, Vertical parameters['bs_antenna']['rotation'] = np.array([0,0,-135]) # (x,y,z) parameters['ue_antenna']['shape'] = np.array([n_ant_ue, 1]) parameters['enable_BS2BS'] = False parameters['OFDM']['subcarriers'] = n_subcarriers parameters['OFDM']['selected_subcarriers'] = np.arange(n_subcarriers) parameters['OFDM']['bandwidth'] = scs * n_subcarriers / 1e9 parameters['num_paths'] = 20 return parameters, row_column_users, n_ant_bs, n_ant_ue, n_subcarriers #%% Sample Generation def make_sample(user_idx, patch, word2id, n_patches, n_masks, patch_size, gen_raw=False): """ Generates a sample for each user, including masking and tokenizing. Args: user_idx (int): Index of the user. patch (numpy array): Patches data. word2id (dict): Dictionary for special tokens. n_patches (int): Number of patches. n_masks (int): Number of masks. patch_size (int): Size of each patch. gen_raw (bool): Whether to generate raw tokens. Returns: sample (list): Generated sample for the user. """ tokens = patch[user_idx] input_ids = np.vstack((word2id['[CLS]'], tokens)) real_tokens_size = int(n_patches / 2) masks_pos_real = np.random.choice(range(0, real_tokens_size), size=n_masks, replace=False) masks_pos_imag = masks_pos_real + real_tokens_size masked_pos = np.hstack((masks_pos_real, masks_pos_imag)) + 1 masked_tokens = [] for pos in masked_pos: original_masked_tokens = input_ids[pos].copy() masked_tokens.append(original_masked_tokens) if not gen_raw: rnd_num = np.random.rand() if rnd_num < 0.1: input_ids[pos] = np.random.rand(patch_size) elif rnd_num < 0.9: input_ids[pos] = word2id['[MASK]'] return [input_ids, masked_tokens, masked_pos] #%% Sampling and Data Selection def uniform_sampling(dataset, sampling_div, n_rows, users_per_row): """ Performs uniform sampling on the dataset. Args: dataset (dict): DeepMIMO dataset. sampling_div (list): Step sizes along [x, y] dimensions. n_rows (int): Number of rows for user selection. users_per_row (int): Number of users per row. Returns: uniform_idxs (numpy array): Indices of the selected samples. """ cols = np.arange(users_per_row, step=sampling_div[0]) rows = np.arange(n_rows, step=sampling_div[1]) uniform_idxs = np.array([j + i * users_per_row for i in rows for j in cols]) return uniform_idxs def select_by_idx(dataset, idxs): """ Selects a subset of the dataset based on the provided indices. Args: dataset (dict): Dataset to trim. idxs (numpy array): Indices of users to select. Returns: dataset_t (list): Trimmed dataset based on selected indices. """ dataset_t = [] # Trimmed dataset for bs_idx in range(len(dataset)): dataset_t.append({}) for key in dataset[bs_idx].keys(): dataset_t[bs_idx]['location'] = dataset[bs_idx]['location'] dataset_t[bs_idx]['user'] = {k: dataset[bs_idx]['user'][k][idxs] for k in dataset[bs_idx]['user']} return dataset_t #%% Save and Load Utilities def save_var(var, path): """ Saves a variable to a pickle file. Args: var (object): Variable to be saved. path (str): Path to save the file. Returns: None """ path_full = path if path.endswith('.p') else (path + '.pickle') with open(path_full, 'wb') as handle: pickle.dump(var, handle) def load_var(path): """ Loads a variable from a pickle file. Args: path (str): Path of the file to load. Returns: var (object): Loaded variable. """ path_full = path if path.endswith('.p') else (path + '.pickle') with open(path_full, 'rb') as handle: var = pickle.load(handle) return var #%%