Spaces:
Runtime error
Runtime error
File size: 4,906 Bytes
a67dc80 2ab54e6 a67dc80 2ab54e6 a67dc80 2ab54e6 a67dc80 2ab54e6 a67dc80 2ab54e6 a67dc80 2ab54e6 a67dc80 cd02111 a67dc80 2ab54e6 a67dc80 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 |
import os
import xml.etree.ElementTree as ET
import torch
import torch.nn as nn
import torch.nn.functional as F
from typing import List, Dict, Any, Optional
from collections import defaultdict
from accelerate import Accelerator
class DynamicModel(nn.Module):
def __init__(self, sections: Dict[str, List[Dict[str, Any]]]):
super(DynamicModel, self).__init__()
self.sections = nn.ModuleDict()
# Default section if none provided
if not sections:
sections = {
'default': [{
'input_size': 128,
'output_size': 256,
'activation': 'relu'
}]
}
for section_name, layers in sections.items():
self.sections[section_name] = nn.ModuleList()
for layer_params in layers:
self.sections[section_name].append(self.create_layer(layer_params))
def create_layer(self, layer_params: Dict[str, Any]) -> nn.Module:
layer = nn.Linear(layer_params['input_size'], layer_params['output_size'])
activation = layer_params.get('activation', 'relu')
if activation == 'relu':
return nn.Sequential(layer, nn.ReLU())
elif activation == 'tanh':
return nn.Sequential(layer, nn.Tanh())
elif activation == 'sigmoid':
return nn.Sequential(layer, nn.Sigmoid())
else:
return layer
def forward(self, x: torch.Tensor) -> torch.Tensor:
for section_name, layers in self.sections.items():
for layer in layers:
x = layer(x)
return x
def parse_xml_file(file_path: str) -> List[Dict[str, Any]]:
tree = ET.parse(file_path)
root = tree.getroot()
layers = []
for prov in root.findall('.//prov'):
layer_params = {
'input_size': 128,
'output_size': 256,
'activation': 'relu'
}
layers.append(layer_params)
return layers
def create_model_from_folder(folder_path: str) -> DynamicModel:
sections = defaultdict(list)
if not os.path.exists(folder_path):
print(f"Warning: Folder {folder_path} does not exist. Creating model with default configuration.")
return DynamicModel({})
xml_files_found = False
for root, dirs, files in os.walk(folder_path):
for file in files:
if file.endswith('.xml'):
xml_files_found = True
file_path = os.path.join(root, file)
try:
layers = parse_xml_file(file_path)
section_name = os.path.basename(root)
sections[section_name].extend(layers)
except Exception as e:
print(f"Error processing {file_path}: {str(e)}")
if not xml_files_found:
print("Warning: No XML files found. Creating model with default configuration.")
return DynamicModel({})
return DynamicModel(dict(sections))
def main():
folder_path = 'data/'
model = create_model_from_folder(folder_path)
print(f"Created dynamic PyTorch model with sections: {list(model.sections.keys())}")
# Get first section's first layer's input size dynamically
first_section = next(iter(model.sections.keys()))
first_layer = model.sections[first_section][0]
input_features = first_layer[0].in_features
# Create sample input tensor matching the model's expected input size
sample_input = torch.randn(1, input_features)
output = model(sample_input)
print(f"Sample output shape: {output.shape}")
# Initialize accelerator for distributed training
accelerator = Accelerator()
# Setup optimization components
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()
num_epochs = 10
# Create synthetic dataset for demonstration
dataset = torch.utils.data.TensorDataset(
torch.randn(100, input_features),
torch.randint(0, 2, (100,))
)
train_dataloader = torch.utils.data.DataLoader(
dataset,
batch_size=16,
shuffle=True
)
# Prepare for distributed training
model, optimizer, train_dataloader = accelerator.prepare(
model,
optimizer,
train_dataloader
)
# Training loop
for epoch in range(num_epochs):
model.train()
total_loss = 0
for batch_idx, (inputs, labels) in enumerate(train_dataloader):
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, labels)
accelerator.backward(loss)
optimizer.step()
total_loss += loss.item()
avg_loss = total_loss / len(train_dataloader)
print(f"Epoch {epoch+1}/{num_epochs}, Average Loss: {avg_loss:.4f}")
if __name__ == "__main__":
main() |