python / model.py
Princess3's picture
Upload model.py
9a38883 verified
raw
history blame
13.6 kB
import os
import xml.etree.ElementTree as ET
import torch
import torch.nn as nn
import torch.nn.functional as F
from typing import List, Dict, Any, Optional
from collections import defaultdict
from accelerate import Accelerator
class DynamicModel(nn.Module):
def __init__(self, sections: Dict[str, List[Dict[str, Any]]]):
"""
Initialize the DynamicModel with configurable neural network sections.
Args:
sections (Dict[str, List[Dict[str, Any]]]): Dictionary mapping section names to lists of layer configurations.
Each layer configuration is a dictionary containing:
- input_size (int): Size of input features
- output_size (int): Size of output features
- activation (str, optional): Activation function name ('relu', 'tanh', 'sigmoid', etc.)
- dropout (float, optional): Dropout rate
- batch_norm (bool, optional): Whether to use batch normalization
- hidden_layers (List[Dict[str, Any]], optional): List of hidden layer configurations
- memory_augmentation (bool, optional): Whether to add a memory augmentation layer
- hybrid_attention (bool, optional): Whether to add a hybrid attention layer
- dynamic_flash_attention (bool, optional): Whether to add a dynamic flash attention layer
Example:
sections = {
'encoder': [
{'input_size': 128, 'output_size': 256, 'activation': 'relu', 'batch_norm': True},
{'input_size': 256, 'output_size': 512, 'activation': 'leaky_relu', 'dropout': 0.1}
],
'decoder': [
{'input_size': 512, 'output_size': 256, 'activation': 'elu'},
{'input_size': 256, 'output_size': 128, 'activation': 'tanh'}
]
}
"""
super(DynamicModel, self).__init__()
self.sections = nn.ModuleDict()
# Default section configuration if none provided
if not sections:
sections = {
'default': [{
'input_size': 128,
'output_size': 256,
'activation': 'relu',
'batch_norm': True,
'dropout': 0.1
}]
}
# Initialize each section with its layer configurations
for section_name, layers in sections.items():
self.sections[section_name] = nn.ModuleList()
for layer_params in layers:
self.sections[section_name].append(self.create_layer(layer_params))
def create_layer(self, layer_params: Dict[str, Any]) -> nn.Module:
"""
Creates a neural network layer based on provided parameters.
Args:
layer_params (Dict[str, Any]): Dictionary containing layer configuration
Required keys:
- input_size (int): Size of input features
- output_size (int): Size of output features
Optional keys:
- activation (str): Activation function name ('relu', 'tanh', 'sigmoid', None)
- dropout (float): Dropout rate if needed
- batch_norm (bool): Whether to use batch normalization
- hidden_layers (List[Dict[str, Any]]): List of hidden layer configurations
- memory_augmentation (bool): Whether to add a memory augmentation layer
- hybrid_attention (bool): Whether to add a hybrid attention layer
- dynamic_flash_attention (bool): Whether to add a dynamic flash attention layer
Returns:
nn.Module: Configured neural network layer with activation
Raises:
KeyError: If required parameters are missing
ValueError: If activation function is not supported
"""
layers = []
# Add linear layer
layers.append(nn.Linear(layer_params['input_size'], layer_params['output_size']))
# Add batch normalization if specified
if layer_params.get('batch_norm', False):
layers.append(nn.BatchNorm1d(layer_params['output_size']))
# Add activation function
activation = layer_params.get('activation', 'relu')
if activation == 'relu':
layers.append(nn.ReLU(inplace=True))
elif activation == 'tanh':
layers.append(nn.Tanh())
elif activation == 'sigmoid':
layers.append(nn.Sigmoid())
elif activation == 'leaky_relu':
layers.append(nn.LeakyReLU(negative_slope=0.01, inplace=True))
elif activation == 'elu':
layers.append(nn.ELU(alpha=1.0, inplace=True))
elif activation is not None:
raise ValueError(f"Unsupported activation function: {activation}")
# Add dropout if specified
if dropout_rate := layer_params.get('dropout', 0.0):
layers.append(nn.Dropout(p=dropout_rate))
# Add hidden layers if specified
if hidden_layers := layer_params.get('hidden_layers', []):
for hidden_layer_params in hidden_layers:
layers.append(self.create_layer(hidden_layer_params))
# Add memory augmentation layer if specified
if layer_params.get('memory_augmentation', False):
layers.append(MemoryAugmentationLayer(layer_params['output_size']))
# Add hybrid attention layer if specified
if layer_params.get('hybrid_attention', False):
layers.append(HybridAttentionLayer(layer_params['output_size']))
# Add dynamic flash attention layer if specified
if layer_params.get('dynamic_flash_attention', False):
layers.append(DynamicFlashAttentionLayer(layer_params['output_size']))
return nn.Sequential(*layers)
def forward(self, x: torch.Tensor, section_name: Optional[str] = None) -> torch.Tensor:
"""
Forward pass through the dynamic model architecture.
Args:
x (torch.Tensor): Input tensor to process
section_name (Optional[str]): Specific section to process. If None, processes all sections
Returns:
torch.Tensor: Processed output tensor
Raises:
KeyError: If specified section_name doesn't exist
"""
if section_name is not None:
if section_name not in self.sections:
raise KeyError(f"Section '{section_name}' not found in model")
for layer in self.sections[section_name]:
x = layer(x)
else:
for section_name, layers in self.sections.items():
for layer in layers:
x = layer(x)
return x
class MemoryAugmentationLayer(nn.Module):
def __init__(self, size: int):
super(MemoryAugmentationLayer, self).__init__()
self.memory = nn.Parameter(torch.randn(size))
def forward(self, x: torch.Tensor) -> torch.Tensor:
return x + self.memory
class HybridAttentionLayer(nn.Module):
def __init__(self, size: int):
super(HybridAttentionLayer, self).__init__()
self.attention = nn.MultiheadAttention(size, num_heads=8)
def forward(self, x: torch.Tensor) -> torch.Tensor:
x = x.unsqueeze(1) # Add sequence dimension
attn_output, _ = self.attention(x, x, x)
return attn_output.squeeze(1)
class DynamicFlashAttentionLayer(nn.Module):
def __init__(self, size: int):
super(DynamicFlashAttentionLayer, self).__init__()
self.attention = nn.MultiheadAttention(size, num_heads=8)
def forward(self, x: torch.Tensor) -> torch.Tensor:
x = x.unsqueeze(1) # Add sequence dimension
attn_output, _ = self.attention(x, x, x)
return attn_output.squeeze(1)
def parse_xml_file(file_path: str) -> List[Dict[str, Any]]:
"""
Parses an XML configuration file to extract layer parameters for neural network construction.
Args:
file_path (str): Path to the XML configuration file
Returns:
List[Dict[str, Any]]: List of dictionaries containing layer configurations
Raises:
ET.ParseError: If XML file is malformed
KeyError: If required attributes are missing in XML
"""
tree = ET.parse(file_path)
root = tree.getroot()
layers = []
for layer in root.findall('.//layer'):
layer_params = {}
layer_params['input_size'] = int(layer.get('input_size', 128))
layer_params['output_size'] = int(layer.get('output_size', 256))
layer_params['activation'] = layer.get('activation', 'relu').lower()
# Validate activation function
if layer_params['activation'] not in ['relu', 'tanh', 'sigmoid', 'none']:
raise ValueError(f"Unsupported activation function: {layer_params['activation']}")
# Validate dimensions
if layer_params['input_size'] <= 0 or layer_params['output_size'] <= 0:
raise ValueError("Layer dimensions must be positive integers")
layers.append(layer_params)
if not layers:
# Fallback to default configuration if no layers found
layers.append({
'input_size': 128,
'output_size': 256,
'activation': 'relu'
})
return layers
def create_model_from_folder(folder_path: str) -> DynamicModel:
"""
Creates a DynamicModel instance by parsing XML files in the specified folder structure.
Each subfolder represents a model section, and XML files within contain layer configurations.
The function recursively walks through the folder structure, processing all XML files to build
the model architecture.
Args:
folder_path (str): Path to the root folder containing XML configuration files
Returns:
DynamicModel: A configured neural network model based on the XML specifications
Raises:
FileNotFoundError: If the specified folder path doesn't exist
ET.ParseError: If XML parsing fails for any configuration file
"""
sections = defaultdict(list)
if not os.path.exists(folder_path):
print(f"Warning: Folder {folder_path} does not exist. Creating model with default configuration.")
return DynamicModel({})
xml_files_found = False
for root, dirs, files in os.walk(folder_path):
for file in files:
if file.endswith('.xml'):
xml_files_found = True
file_path = os.path.join(root, file)
try:
layers = parse_xml_file(file_path)
section_name = os.path.basename(root)
sections[section_name].extend(layers)
except Exception as e:
print(f"Error processing {file_path}: {str(e)}")
if not xml_files_found:
print("Warning: No XML files found. Creating model with default configuration.")
return DynamicModel({})
return DynamicModel(dict(sections))
def main():
"""
Main function that demonstrates the creation and training of a dynamic PyTorch model.
This function:
1. Creates a dynamic model from XML configurations
2. Sets up distributed training environment using Accelerator
3. Configures optimization components (optimizer, loss function)
4. Creates synthetic dataset for demonstration
5. Implements distributed training loop with loss tracking
The model architecture is determined by XML files in the 'Xml_Data' folder,
where each subfolder represents a model section containing layer configurations.
"""
folder_path = 'Xml_Data'
model = create_model_from_folder(folder_path)
print(f"Created dynamic PyTorch model with sections: {list(model.sections.keys())}")
# Dynamically determine input size from first layer configuration
first_section = next(iter(model.sections.keys()))
first_layer = model.sections[first_section][0]
input_features = first_layer[0].in_features
# Validate model with sample input
sample_input = torch.randn(1, input_features)
output = model(sample_input)
print(f"Sample output shape: {output.shape}")
# Initialize distributed training components
accelerator = Accelerator()
# Configure training parameters and optimization components
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()
num_epochs = 10
# Generate synthetic dataset for demonstration purposes
dataset = torch.utils.data.TensorDataset(
torch.randn(100, input_features),
torch.randint(0, 2, (100,))
)
train_dataloader = torch.utils.data.DataLoader(
dataset,
batch_size=16,
shuffle=True
)
# Prepare model, optimizer, and dataloader for distributed training
model, optimizer, train_dataloader = accelerator.prepare(
model,
optimizer,
train_dataloader
)
# Execute training loop with distributed processing
for epoch in range(num_epochs):
model.train()
total_loss = 0
for batch_idx, (inputs, labels) in enumerate(train_dataloader):
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, labels)
accelerator.backward(loss)
optimizer.step()
total_loss += loss.item()
avg_loss = total_loss / len(train_dataloader)
print(f"Epoch {epoch+1}/{num_epochs}, Average Loss: {avg_loss:.4f}")
if __name__ == "__main__":
main()