Spaces:
Running
Running
import gradio as gr | |
import os | |
from PIL import Image | |
import numpy as np | |
import pickle | |
import io | |
import sys | |
import torch | |
import subprocess | |
import h5py | |
from sklearn.metrics import confusion_matrix | |
import matplotlib.pyplot as plt | |
import pandas as pd | |
from sklearn.metrics import f1_score | |
import seaborn as sns | |
#################### BEAM PREDICTION #########################} | |
def beam_prediction_task(data_percentage, task_complexity, theme='Dark'): | |
# Folder naming convention based on input_type, data_percentage, and task_complexity | |
raw_folder = f"images/raw_{data_percentage/100:.1f}_{task_complexity}" | |
embeddings_folder = f"images/embedding_{data_percentage/100:.1f}_{task_complexity}" | |
# Process raw confusion matrix | |
raw_cm = compute_average_confusion_matrix(raw_folder) | |
if raw_cm is not None: | |
raw_cm_path = os.path.join(raw_folder, "confusion_matrix_raw.png") | |
plot_confusion_matrix_beamPred(raw_cm, | |
classes=np.arange(raw_cm.shape[0]), | |
title=f"Confusion Matrix (Raw Channels)\n{data_percentage}% data, {task_complexity} beams", | |
save_path=raw_cm_path, | |
theme=theme) | |
raw_img = Image.open(raw_cm_path) | |
else: | |
raw_img = None | |
# Process embeddings confusion matrix | |
embeddings_cm = compute_average_confusion_matrix(embeddings_folder) | |
if embeddings_cm is not None: | |
embeddings_cm_path = os.path.join(embeddings_folder, "confusion_matrix_embeddings.png") | |
plot_confusion_matrix_beamPred(embeddings_cm, | |
classes=np.arange(embeddings_cm.shape[0]), | |
title=f"Confusion Matrix (LWM Embeddings)\n{data_percentage}% data, {task_complexity} beams", | |
save_path=embeddings_cm_path, | |
theme=theme) | |
embeddings_img = Image.open(embeddings_cm_path) | |
else: | |
embeddings_img = None | |
return raw_img, embeddings_img | |
# Function to compute the F1-score based on the confusion matrix | |
def compute_f1_score(cm): | |
# Compute precision and recall | |
TP = np.diag(cm) | |
FP = np.sum(cm, axis=0) - TP | |
FN = np.sum(cm, axis=1) - TP | |
precision = TP / (TP + FP) | |
recall = TP / (TP + FN) | |
# Handle division by zero in precision or recall | |
precision = np.nan_to_num(precision) | |
recall = np.nan_to_num(recall) | |
# Compute F1 score | |
f1 = 2 * (precision * recall) / (precision + recall) | |
f1 = np.nan_to_num(f1) # Replace NaN with 0 | |
return np.mean(f1) # Return the mean F1-score across all classes | |
def plot_confusion_matrix_beamPred(cm, classes, title, save_path, theme='Dark'): | |
# Compute the average F1-score | |
avg_f1 = compute_f1_score(cm) | |
# Choose the color scheme based on the user's mode | |
if theme == 'Dark': | |
plt.style.use('dark_background') # Use dark mode styling | |
#text_color = 'white' | |
text_color = 'gray' | |
#cmap = 'cividis' # Dark-mode-friendly colormap | |
cmap = 'coolwarm' | |
else: | |
plt.style.use('default') # Use default (light) mode styling | |
#text_color = 'black' | |
text_color = 'gray' | |
cmap = 'Blues' # Light-mode-friendly colormap | |
plt.figure(figsize=(10, 10)) | |
# Plot the confusion matrix with a colormap compatible for the mode | |
ax = sns.heatmap(cm, cmap=cmap, cbar=True) | |
cbar = ax.collections[0].colorbar | |
cbar.ax.yaxis.set_tick_params(color=text_color) | |
cbar.ax.yaxis.set_tick_params(labelcolor=text_color) | |
# Add F1-score to the title | |
plt.title(f"{title}\nF1 Score: {avg_f1:.3f}", color=text_color, fontsize=23) | |
tick_marks = np.arange(len(classes)) | |
plt.xticks(tick_marks, classes, color=text_color, fontsize=14) # Adjust text color based on the mode | |
plt.yticks(tick_marks, classes, color=text_color, fontsize=14) # Adjust text color based on the mode | |
plt.ylabel('True label', color=text_color, fontsize=20) | |
plt.xlabel('Predicted label', color=text_color, fontsize=20) | |
plt.tight_layout() | |
plt.savefig(save_path, transparent=True) # Transparent to blend with the site background | |
plt.close() | |
# Return the saved image | |
return Image.open(save_path) | |
def compute_average_confusion_matrix(folder): | |
confusion_matrices = [] | |
max_num_labels = 0 | |
# First pass to determine the maximum number of labels | |
for file in os.listdir(folder): | |
if file.endswith(".csv"): | |
data = pd.read_csv(os.path.join(folder, file)) | |
num_labels = len(np.unique(data["Target"])) | |
max_num_labels = max(max_num_labels, num_labels) | |
# Second pass to calculate the confusion matrices and pad if necessary | |
for file in os.listdir(folder): | |
if file.endswith(".csv"): | |
data = pd.read_csv(os.path.join(folder, file)) | |
y_true = data["Target"] | |
y_pred = data["Top-1 Prediction"] | |
num_labels = len(np.unique(y_true)) | |
# Compute confusion matrix | |
cm = confusion_matrix(y_true, y_pred, labels=np.arange(max_num_labels)) | |
# If the confusion matrix is smaller, pad it to match the largest size | |
if cm.shape[0] < max_num_labels: | |
padded_cm = np.zeros((max_num_labels, max_num_labels)) | |
padded_cm[:cm.shape[0], :cm.shape[1]] = cm | |
confusion_matrices.append(padded_cm) | |
else: | |
confusion_matrices.append(cm) | |
if confusion_matrices: | |
avg_cm = np.mean(confusion_matrices, axis=0) | |
return avg_cm | |
else: | |
return None | |
########################## LOS/NLOS CLASSIFICATION #############################3 | |
# Paths to the predefined images folder | |
LOS_PATH = "images_LoS" | |
# Define the percentage values | |
percentage_values_los = np.linspace(0.001, 1, 20) * 100 #np.linspace(0.05, 1, 20) * 100 # np.linspace(0.001, 1, 20) * 100 # 20 percentage values | |
from sklearn.metrics import f1_score | |
import seaborn as sns | |
# Function to compute confusion matrix, F1-score and plot it with dark mode style | |
def plot_confusion_matrix_from_csv(csv_file_path, title, save_path, light_mode=False): | |
# Load CSV file | |
data = pd.read_csv(csv_file_path) | |
# Extract ground truth and predictions | |
y_true = data['Target'] | |
y_pred = data['Top-1 Prediction'] | |
# Compute confusion matrix | |
cm = confusion_matrix(y_true, y_pred) | |
# Compute F1-score | |
f1 = f1_score(y_true, y_pred, average='macro') # Macro-average F1-score | |
# Set styling based on light or dark mode | |
if light_mode: | |
plt.style.use('default') # Light mode styling | |
text_color = 'black' | |
cmap = 'Blues' # Light-mode-friendly colormap | |
else: | |
plt.style.use('dark_background') # Dark mode styling | |
text_color = 'gray' | |
#cmap = 'magma' # Dark-mode-friendly colormap | |
cmap = 'coolwarm' | |
plt.figure(figsize=(5, 5)) | |
# Plot the confusion matrix with the chosen colormap | |
sns.heatmap(cm, annot=True, fmt="d", cmap=cmap, cbar=False, annot_kws={"size": 12}, linewidths=0.5, linecolor='white') | |
# Add F1-score to the title | |
plt.title(f"{title}\nF1 Score: {f1:.3f}", color=text_color, fontsize=18) | |
# Customize tick labels for light/dark mode | |
plt.xticks([0.5, 1.5], labels=['NLoS', 'LoS'], color=text_color, fontsize=12) | |
plt.yticks([0.5, 1.5], labels=['NLoS', 'LoS'], color=text_color, fontsize=12) | |
plt.ylabel('True label', color=text_color, fontsize=14) | |
plt.xlabel('Predicted label', color=text_color, fontsize=14) | |
plt.tight_layout() | |
# Save the plot as an image | |
plt.savefig(save_path, transparent=True) # Use transparent to blend with the website | |
plt.close() | |
# Return the saved image | |
return Image.open(save_path) | |
# Function to load confusion matrix based on percentage and input_type | |
def display_confusion_matrices_los(percentage): | |
#percentage = percentage_values_los[percentage_idx] | |
# Construct folder names | |
raw_folder = os.path.join(LOS_PATH, f"raw_{percentage/100:.3f}_los_noTraining") | |
embeddings_folder = os.path.join(LOS_PATH, f"embedding_{percentage/100:.3f}_los_noTraining") | |
# Process raw confusion matrix | |
raw_csv_file = os.path.join(raw_folder, f"test_predictions_raw_{percentage/100:.3f}_los.csv") | |
raw_cm_img_path = os.path.join(raw_folder, "confusion_matrix_raw.png") | |
raw_img = plot_confusion_matrix_from_csv(raw_csv_file, | |
f"Confusion Matrix (Raw Channels)\n{percentage:.1f}% data", | |
raw_cm_img_path) | |
# Process embeddings confusion matrix | |
embeddings_csv_file = os.path.join(embeddings_folder, f"test_predictions_embedding_{percentage/100:.3f}_los.csv") | |
embeddings_cm_img_path = os.path.join(embeddings_folder, "confusion_matrix_embeddings.png") | |
embeddings_img = plot_confusion_matrix_from_csv(embeddings_csv_file, | |
f"Confusion Matrix (LWM Embeddings)\n{percentage:.1f}% data", | |
embeddings_cm_img_path) | |
return raw_img, embeddings_img | |
# Main function to handle user choice | |
def handle_user_choice(choice, percentage=None, uploaded_file=None, emb_type='CLS Embedding'): | |
if choice == "Use Default Dataset": | |
raw_img, embeddings_img = display_confusion_matrices_los(percentage) | |
return raw_img, embeddings_img, "" # Return empty string for console output | |
elif choice == "Upload Dataset": | |
if uploaded_file is not None: | |
raw_img, embeddings_img, console_output = process_hdf5_file(uploaded_file, percentage, emb_type) | |
return raw_img, embeddings_img, console_output | |
else: | |
return "Please upload a dataset", "Please upload a dataset", "" # Return empty string for console output | |
else: | |
return "Invalid choice", "Invalid choice", "" # Return empty string for console output | |
# Custom class to capture print output | |
class PrintCapture(io.StringIO): | |
def __init__(self): | |
super().__init__() | |
self.output = [] | |
def write(self, txt): | |
self.output.append(txt) | |
super().write(txt) | |
def get_output(self): | |
return ''.join(self.output) | |
# Function to load and display predefined images based on user selection | |
def display_predefined_images(percentage): | |
#percentage = percentage_values_los[percentage_idx] | |
raw_image_path = os.path.join(RAW_PATH, f"percentage_{percentage}_complexity_16.png") | |
embeddings_image_path = os.path.join(EMBEDDINGS_PATH, f"percentage_{percentage}_complexity_16.png") | |
# Check if the images exist | |
if os.path.exists(raw_image_path): | |
raw_image = Image.open(raw_image_path) | |
else: | |
raw_image = create_random_image() # Use a fallback random image | |
if os.path.exists(embeddings_image_path): | |
embeddings_image = Image.open(embeddings_image_path) | |
else: | |
embeddings_image = create_random_image() # Use a fallback random image | |
return raw_image, embeddings_image | |
def los_nlos_classification(file, percentage): | |
if file is not None: | |
raw_cm_image, emb_cm_image, console_output = process_hdf5_file(file, percentage) | |
return raw_cm_image, emb_cm_image, console_output # Returning all three: two images and console output | |
else: | |
raw_image, embeddings_image = display_predefined_images(percentage) | |
return raw_image, embeddings_image, "" # Return an empty string for console output when no file is uploaded | |
# Function to create random images for LoS/NLoS classification results | |
def create_random_image(size=(300, 300)): | |
random_image = np.random.rand(*size, 3) * 255 | |
return Image.fromarray(random_image.astype('uint8')) | |
import importlib.util | |
# Function to dynamically load a Python module from a given file path | |
def load_module_from_path(module_name, file_path): | |
spec = importlib.util.spec_from_file_location(module_name, file_path) | |
module = importlib.util.module_from_spec(spec) | |
spec.loader.exec_module(module) | |
return module | |
# Function to split dataset into training and test sets based on user selection | |
def split_dataset(channels, labels, percentage): | |
#percentage = percentage_values_los[percentage_idx] / 100 | |
num_samples = channels.shape[0] | |
train_size = int(num_samples * percentage/100) | |
print(f'Number of Training Samples: {train_size}') | |
indices = np.arange(num_samples) | |
np.random.shuffle(indices) | |
train_idx, test_idx = indices[:train_size], indices[train_size:] | |
train_data, test_data = channels[train_idx], channels[test_idx] | |
train_labels, test_labels = labels[train_idx], labels[test_idx] | |
return train_data, test_data, train_labels, test_labels | |
# Function to calculate Euclidean distance between a point and a centroid | |
def euclidean_distance(x, centroid): | |
return np.linalg.norm(x - centroid) | |
import torch | |
def classify_based_on_distance(train_data, train_labels, test_data): | |
# Compute the centroids for the two classes | |
centroid_0 = train_data[train_labels == 0].mean(dim=0) # Use torch.mean | |
centroid_1 = train_data[train_labels == 1].mean(dim=0) # Use torch.mean | |
predictions = [] | |
for test_point in test_data: | |
# Compute Euclidean distance between the test point and each centroid | |
dist_0 = euclidean_distance(test_point, centroid_0) | |
dist_1 = euclidean_distance(test_point, centroid_1) | |
predictions.append(0 if dist_0 < dist_1 else 1) | |
return torch.tensor(predictions) # Return predictions as a PyTorch tensor | |
def plot_confusion_matrix(y_true, y_pred, title, light_mode=False): | |
cm = confusion_matrix(y_true, y_pred) | |
# Calculate F1 Score | |
f1 = f1_score(y_true, y_pred, average='weighted') | |
#plt.style.use('dark_background') | |
# Set styling based on light or dark mode | |
if light_mode: | |
plt.style.use('default') # Light mode styling | |
text_color = 'black' | |
cmap = 'Blues' # Light-mode-friendly colormap | |
else: | |
plt.style.use('dark_background') # Dark mode styling | |
text_color = 'gray' | |
#cmap = 'magma' # Dark-mode-friendly colormap | |
cmap = 'coolwarm' | |
plt.figure(figsize=(5, 5)) | |
# Plot the confusion matrix with a dark-mode compatible colormap | |
sns.heatmap(cm, annot=True, fmt="d", cmap=cmap, cbar=False, annot_kws={"size": 12}, linewidths=0.5, linecolor='white') | |
# Add F1-score to the title | |
plt.title(f"{title}\nF1 Score: {f1:.3f}", color=text_color, fontsize=18) | |
# Customize tick labels for dark mode | |
plt.xticks([0.5, 1.5], labels=['NLoS', 'LoS'], color=text_color, fontsize=12) | |
plt.yticks([0.5, 1.5], labels=['NLoS', 'LoS'], color=text_color, fontsize=12) | |
plt.ylabel('True label', color=text_color, fontsize=14) | |
plt.xlabel('Predicted label', color=text_color, fontsize=14) | |
plt.tight_layout() | |
# Save the plot as an image | |
plt.savefig(f"{title}.png", transparent=True) # Use transparent to blend with the dark mode website | |
plt.close() | |
# Return the saved image | |
return Image.open(f"{title}.png") | |
def identical_train_test_split(output_emb, output_raw, labels, train_percentage): | |
torch.manual_seed(42) | |
N = output_emb.shape[0] | |
indices = torch.randperm(N) | |
test_split_index = int(N * 0.20) | |
test_indices = indices[:test_split_index] | |
remaining_indices = indices[test_split_index:] | |
train_split_index = int(len(remaining_indices) * train_percentage / 100) | |
print(f'Training Size: {train_split_index} out of remaining {len(remaining_indices)}') | |
print(f'Test Size: {test_split_index}') | |
train_indices = remaining_indices[:train_split_index] | |
train_emb = output_emb[train_indices] | |
test_emb = output_emb[test_indices] | |
train_raw = output_raw[train_indices] | |
test_raw = output_raw[test_indices] | |
train_labels = labels[train_indices] | |
test_labels = labels[test_indices] | |
return train_emb, test_emb, train_raw, test_raw, train_labels, test_labels | |
# Store the original working directory when the app starts | |
original_dir = os.getcwd() | |
def process_hdf5_file(uploaded_file, percentage, emb_type='CLS Embedding'): | |
capture = PrintCapture() | |
sys.stdout = capture # Redirect print statements to capture | |
try: | |
model_repo_url = "https://huggingface.co/wi-lab/lwm" | |
model_repo_dir = "./LWM" | |
# Step 1: Clone the repository if not already done | |
if not os.path.exists(model_repo_dir): | |
print(f"Cloning model repository from {model_repo_url}...") | |
subprocess.run(["git", "clone", model_repo_url, model_repo_dir], check=True) | |
# Step 2: Verify the repository was cloned and change the working directory | |
repo_work_dir = os.path.join(original_dir, model_repo_dir) | |
if os.path.exists(repo_work_dir): | |
os.chdir(repo_work_dir) # Change the working directory only once | |
print(f"Changed working directory to {os.getcwd()}") | |
#print(f"Directory content: {os.listdir(os.getcwd())}") # Debugging: Check repo content | |
else: | |
print(f"Directory {repo_work_dir} does not exist.") | |
return | |
# Step 3: Dynamically load lwm_model.py, input_preprocess.py, and inference.py | |
lwm_model_path = os.path.join(os.getcwd(), 'lwm_model.py') | |
input_preprocess_path = os.path.join(os.getcwd(), 'input_preprocess.py') | |
inference_path = os.path.join(os.getcwd(), 'inference.py') | |
# Load lwm_model | |
lwm_model = load_module_from_path("lwm_model", lwm_model_path) | |
# Load input_preprocess | |
input_preprocess = load_module_from_path("input_preprocess", input_preprocess_path) | |
# Load inference | |
inference = load_module_from_path("inference", inference_path) | |
# Step 4: Load the model from lwm_model module | |
device = 'cuda' if torch.cuda.is_available() else 'cpu' | |
print(f"Loading the LWM model on {device}...") | |
model = lwm_model.lwm.from_pretrained(device=device).float() | |
# Step 5: Load the HDF5 file and extract the channels and labels | |
with h5py.File(uploaded_file.name, 'r') as f: | |
channels = np.array(f['channels']).astype(np.complex64) | |
labels = np.array(f['labels']).astype(np.int32) | |
print(f"Loaded dataset with {channels.shape[0]} samples.") | |
channels = channels * (10**(-3-np.floor(np.log10(np.abs(np.mean(channels).real))))) | |
# Step 7: Tokenize the data using the tokenizer from input_preprocess | |
preprocessed_chs = input_preprocess.tokenizer(manual_data=channels) | |
# Step 7: Perform inference using the functions from inference.py | |
if emb_type == 'Channel Embedding': | |
embedding_type = 'channel_emb' | |
elif emb_type == 'CLS Embedding': | |
embedding_type = 'cls_emb' | |
output_emb = inference.lwm_inference(preprocessed_chs, embedding_type, model, device) | |
output_raw = inference.create_raw_dataset(preprocessed_chs, device) | |
print(f"Output Embeddings Shape: {output_emb.shape}") | |
print(f"Output Raw Shape: {output_raw.shape}") | |
print(f'percentage_value: {percentage}') | |
train_data_emb, test_data_emb, train_data_raw, test_data_raw, train_labels, test_labels = identical_train_test_split(output_emb.view(len(output_emb),-1), | |
output_raw.view(len(output_raw),-1), | |
labels, | |
percentage) | |
# Step 8: Perform classification using the Euclidean distance for both raw and embeddings | |
#print(f'train_data_emb: {train_data_emb.shape}') | |
#print(f'train_labels: {train_labels.shape}') | |
#print(f'test_data_emb: {test_data_emb.shape}') | |
pred_raw = classify_based_on_distance(train_data_raw, train_labels, test_data_raw) | |
pred_emb = classify_based_on_distance(train_data_emb, train_labels, test_data_emb) | |
# Step 9: Generate confusion matrices for both raw and embeddings | |
raw_cm_image = plot_confusion_matrix(test_labels, pred_raw, title="Confusion Matrix (Raw Channels)") | |
emb_cm_image = plot_confusion_matrix(test_labels, pred_emb, title="Confusion Matrix (Embeddings)") | |
return raw_cm_image, emb_cm_image, capture.get_output() | |
except Exception as e: | |
return str(e), str(e), capture.get_output() | |
finally: | |
# Always return to the original working directory after processing | |
os.chdir(original_dir) | |
sys.stdout = sys.__stdout__ # Reset print statements | |
######################## Define the Gradio interface ############################### | |
js_code = """ | |
() => { | |
const isDarkMode = window.matchMedia && window.matchMedia('(prefers-color-scheme: dark)').matches; | |
return isDarkMode ? 'dark' : 'light'; | |
} | |
""" | |
with gr.Blocks(css=""" | |
.slider-container { | |
display: inline-block; | |
margin-right: 50px; | |
text-align: center; | |
} | |
.explanation-box { | |
font-size: 16px; | |
font-style: italic; | |
color: #4a4a4a; | |
padding: 15px; | |
background-color: #f0f0f0; | |
border-radius: 10px; | |
margin-bottom: 20px; | |
} | |
.bold-highlight { | |
font-weight: bold; | |
color: #2c3e50; | |
font-size: 18px; | |
text-align: center; | |
margin-bottom: 20px; | |
} | |
#console-output { | |
background-color: #ffffff; /* Light background for light mode */ | |
color: #000000; /* Dark text color for contrast */ | |
padding: 10px; | |
border-radius: 5px; | |
} | |
.plot-title { | |
font-weight: bold; | |
color: #2c3e50; | |
} | |
""") as demo: | |
# Contact Section | |
gr.Markdown(""" | |
<div style="text-align: center;"> | |
<a target="_blank" href="https://www.wi-lab.net"> | |
<img src="https://www.wi-lab.net/wp-content/uploads/2021/08/WI-name.png" alt="Wireless Model" style="height: 30px;"> | |
</a> | |
<a target="_blank" href="mailto:[email protected]" style="margin-left: 10px;"> | |
<img src="https://img.shields.io/badge/[email protected]?logo=gmail" alt="Email"> | |
</a> | |
</div> | |
""") | |
gr.Markdown(""" | |
<div class="bold-highlight"> | |
π Explore the pre-trained <b>LWM Model<b> here: | |
<a target="_blank" href="https://huggingface.co/wi-lab/lwm">https://huggingface.co/wi-lab/lwm</a> | |
</div> | |
""") | |
# Tab for Beam Prediction Task | |
with gr.Tab("Beam Prediction Task"): | |
#gr.Markdown("### Beam Prediction Task") | |
# Explanation section with creative spacing and minimal design | |
gr.Markdown(""" | |
<div style="background-color: var(--primary-background); padding: 15px; border-radius: 10px; color: var(--text-primary);"> | |
<h3 style="color: var(--text-primary);">π‘ <b>Sub-6GHz to mmWave Beam Prediction Task</b></h3> | |
<ul style="padding-left: 20px;"> | |
<li><b>π― Goal</b>: Predict the strongest <b>mmWave beam</b> from a predefined codebook using Sub-6 GHz channels.</li> | |
<li><b>βοΈ Adjust Settings</b>: Use the sliders to control the training data percentage and task complexity (beam count) to explore model performance.</li> | |
<li><b>π§ Inferences</b>: | |
<ul> | |
<li>π First, the LWM model extracts features.</li> | |
<li>π€ Then, the downstream residual 1D-CNN model (500K parameters) makes beam predictions.</li> | |
</ul> | |
</li> | |
<li><b>πΊοΈ Dataset</b>: A combination of six scenarios from the DeepMIMO dataset (excluded from LWM pre-training) highlights the model's strong generalization abilities.</li> | |
</ul> | |
</div> | |
""") | |
with gr.Row(): | |
with gr.Column(): | |
data_percentage_slider = gr.Slider(label="Data Percentage for Training", minimum=10, maximum=100, step=10, value=10) | |
task_complexity_dropdown = gr.Dropdown(label="Task Complexity (Number of Beams)", choices=[16, 32, 64, 128, 256], value=16) | |
#theme_dropdown = gr.Dropdown(label="Select Theme", choices=['Light', 'Dark'], value='Light') | |
with gr.Row(): | |
raw_img_bp = gr.Image(label="Raw Channels", type="pil", width=300, height=500) | |
embeddings_img_bp = gr.Image(label="Embeddings", type="pil", width=300, height=500) | |
theme_dropdown = 'Dark' | |
# Update the confusion matrices whenever sliders change | |
data_percentage_slider.change(fn=beam_prediction_task, inputs=[data_percentage_slider, task_complexity_dropdown], outputs=[raw_img_bp, embeddings_img_bp]) | |
task_complexity_dropdown.change(fn=beam_prediction_task, inputs=[data_percentage_slider, task_complexity_dropdown], outputs=[raw_img_bp, embeddings_img_bp]) | |
#theme_dropdown.change(fn=beam_prediction_task, inputs=[data_percentage_slider, task_complexity_dropdown, theme_dropdown], outputs=[raw_img_bp, embeddings_img_bp]) | |
# Add a conclusion section at the bottom | |
gr.Markdown(""" | |
<div class="explanation-box"> | |
The LWM embeddings demonstrate remarkable generalization capabilities, enabling impressive performance even with minimal training samples. This highlights their ability to effectively handle diverse tasks with limited data. | |
</div> | |
""") | |
# Separate Tab for LoS/NLoS Classification Task | |
with gr.Tab("LoS/NLoS Classification Task"): | |
#gr.Markdown("### LoS/NLoS Classification Task") | |
# Explanation section with creative spacing | |
gr.Markdown(""" | |
<div style="background-color: var(--primary-background); padding: 15px; border-radius: 10px; color: var(--text-primary);"> | |
<h3 style="color: var(--text-primary);">π <b>LoS/NLoS Classification Task</b></h3> | |
<ul style="padding-left: 20px;"> | |
<li><b>π― Goal</b>: Classify whether a channel is <b>LoS</b> (Line-of-Sight) or <b>NLoS</b> (Non-Line-of-Sight) with very small LWM CLS embeddings.</li> | |
<li><b>π Dataset</b>: Use the default dataset (a combination of six scenarios from the DeepMIMO dataset) or upload your own dataset in <b>H5</b> format.</li> | |
<li><b>π‘ Custom Dataset Requirements:</b> | |
<ul> | |
<li>π‘ <b>channels</b> array: Shape (N,32,32), rows: 32 antennas at BS, columns: 32 subcarriers</li> | |
<li>π·οΈ <b>labels</b> array: Binary LoS/NLoS values (1/0)</li> | |
</ul> | |
</li> | |
<li><b>π Tip 1</b>: Instructions for organizing your dataset are available at the bottom of the page.</li> | |
<li><b>π Tip 2</b>: As the computations and inference are performed on HuggingFace CPUs, please use small datasets for faster demo experience (say <400 samples). Clone the model from <a href="https://huggingface.co/wi-lab/lwm" target="_blank">here</a> and use any number of samples locally.</li> | |
<li><b>π Tip 3</b>: Your dataset will be normalized automatically based on outdoor environments. </li> | |
<li><b>πΌ No Downstream Model</b>: Instead of a complex downstream model, we classify each sample based on its distance to the centroid of training samples from each class (LoS/NLoS).</il> | |
</ul> | |
</div> | |
""") | |
# Radio button for user choice: predefined data or upload dataset | |
choice_radio = gr.Radio(choices=["Use Default Dataset", "Upload Dataset"], | |
label="Choose how to proceed", | |
value="Use Default Dataset") | |
percentage_slider_los = gr.Slider(minimum=float(percentage_values_los[0]), | |
maximum=float(percentage_values_los[-1]), | |
step=float((percentage_values_los[-1] - percentage_values_los[0]) / (len(percentage_values_los) - 1)), | |
value=float(percentage_values_los[0]), | |
label="Percentage of Data for Training", | |
interactive=True) | |
# File uploader for dataset (only visible if user chooses to upload a dataset) | |
file_input = gr.File(label="Upload HDF5 Dataset", file_types=[".h5"], visible=False) | |
# Dropdown for embedding type, also only visible when "Upload Dataset" is selected | |
emb_type = gr.Dropdown(choices=["Channel Embedding", "CLS Embedding"], | |
value="CLS Embedding", | |
label="Embedding Type", | |
visible=False) | |
# Confusion matrices display | |
with gr.Row(): | |
raw_img_los = gr.Image(label="Raw Channels", type="pil", width=300, height=300) | |
embeddings_img_los = gr.Image(label="Embeddings", type="pil", width=300, height=300) | |
output_textbox = gr.Textbox(label="Console Output", lines=10, elem_id="console-output") | |
# Update the visibility of file_input and emb_type based on user choice | |
def toggle_file_input_and_emb_type(choice): | |
visible = (choice == "Upload Dataset") | |
return gr.update(visible=visible), gr.update(visible=visible) | |
# Change visibility of file input and embedding type dropdown based on choice | |
choice_radio.change(fn=toggle_file_input_and_emb_type, | |
inputs=[choice_radio], | |
outputs=[file_input, emb_type]) | |
# When percentage slider changes (for predefined data) | |
percentage_slider_los.change(fn=handle_user_choice, | |
inputs=[choice_radio, percentage_slider_los, file_input, emb_type], | |
outputs=[raw_img_los, embeddings_img_los, output_textbox]) | |
# Layout for the UI - this part does NOT need .render(), Gradio will render these automatically | |
with gr.Row(): | |
file_input # No need for .render() | |
emb_type # No need for .render() | |
# Add a conclusion section at the bottom | |
gr.Markdown(""" | |
<div class="explanation-box"> | |
Despite their compact size (1/32 of the raw channels), LWM CLS embeddings capture rich, holistic information about the channels. This makes them exceptionally well-suited for tasks like LoS/NLoS classification, especially when working with very limited data. | |
</div> | |
""") | |
gr.Markdown(""" | |
<div class="explanation-box"> | |
To create a custom dataset, you'll need to structure your data with 32x32 channel matrices, where the rows correspond to antennas at the base station and the columns represent subcarriers. Hereβs how to organize and store the channels and labels in an H5 file format for the demo: | |
</div> | |
```python | |
# How to pack channels and labels in a h5 file format as a custom dataset for the demo: | |
import h5py | |
with h5py.File('dataset.h5', 'w') as hdf: | |
hdf.create_dataset('channels', data=channels) | |
hdf.create_dataset('labels', data=labels) | |
print("Dataset saved!") | |
""") | |
gr.Markdown(""" | |
<div class="explanation-box"> | |
To use your preferred DeepMIMO scenarios for the custom dataset, please | |
<a href="https://huggingface.co/wi-lab/lwm" target="_blank">clone the model and datasets</a> | |
and follow the instructions below: | |
</div> | |
```python | |
from input_preprocess import DeepMIMO_data_gen deepmimo_data_cleaning label_gen # Import required modules from the model repository | |
import numpy as np | |
scenario_names = np.array([ | |
"city_18_denver", "city_15_indianapolis", "city_19_oklahoma", | |
"city_12_fortworth", "city_11_santaclara", "city_7_sandiego" | |
]) | |
scenario_name = scenario_names[0] # Select the scenario by choosing its index. | |
deepmimo_data = DeepMIMO_data_gen(scenario_name) # Generates ray-traced wireless channels for the selected scenario. | |
cleaned_deepmimo_data = deepmimo_data_cleaning(deepmimo_data) # Filters out users with no direct path to the base station (i.e., users with zero-valued channels). | |
channels = np.squeeze(np.array(cleaned_deepmimo_data), axis=1) # The "channels" array is now prepared for packing into the custom dataset in H5 format. | |
labels = label_gen('LoS/NLoS Classification', deepmimo_data, scenario_name) # Generates labels for each user, classifying them as Line-of-Sight (LoS) or Non-Line-of-Sight (NLoS), and prepares the "labels" array for inclusion in the custom dataset H5 file. | |
``` | |
""") | |
#with gr.Tab("LWM Model and Framework"): | |
# gr.Image("images/lwm_model_v2.png") | |
# gr.Markdown("This figure depicts the offline pre-training and online embedding generation process for LWM. The channel is divided into fixed-size patches, which are linearly embedded and combined with positional encodings before being passed through a Transformer encoder. During self-supervised pre-training, some embeddings are masked, and LWM leverages self-attention to extract deep features, allowing the decoder to reconstruct the masked values. For downstream tasks, the generated LWM embeddings enhance performance.") | |
# Launch the app | |
if __name__ == "__main__": | |
demo.launch() | |