Spaces:
Runtime error
Runtime error
import os | |
import stat | |
import xml.etree.ElementTree as ET | |
import torch | |
import torch.nn as nn | |
import torch.nn.functional as F | |
import logging | |
import requests | |
from collections import defaultdict | |
from typing import List, Dict, Any | |
from colorama import Fore, Style, init | |
from accelerate import Accelerator | |
from torch.utils.data import DataLoader, TensorDataset | |
from transformers import AutoTokenizer, AutoModel | |
from sentence_transformers import SentenceTransformer | |
import numpy as np | |
# Initialize colorama | |
init(autoreset=True) | |
logging.basicConfig(level=logging.INFO, format='\033[92m%(asctime)s - %(levelname)s - %(message)s\033[0m') | |
file_path = 'data/' | |
output_path = 'output/' | |
# Create output path if it doesn't exist | |
if not os.path.exists(output_path): | |
try: | |
os.makedirs(output_path) | |
os.chmod(output_path, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) # Set full r/w permissions | |
except PermissionError: | |
print(f"Permission denied: '{output_path}'") | |
# Handle the error or try a different path | |
# Ensure necessary files are created with full r/w permissions | |
def ensure_file(file_path): | |
if not os.path.exists(file_path): | |
with open(file_path, 'w') as f: | |
pass | |
os.chmod(file_path, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) # Set full r/w permissions | |
class MagicStateLayer(nn.Module): | |
def __init__(self, size): | |
super().__init__() | |
self.state = nn.Parameter(torch.randn(size)) | |
def forward(self, x): | |
return x + self.state | |
class MemoryAugmentationLayer(nn.Module): | |
def __init__(self, size): | |
super().__init__() | |
self.memory = nn.Parameter(torch.randn(size)) | |
def forward(self, x): | |
return x + self.memory | |
class HybridAttentionLayer(nn.Module): | |
def __init__(self, size): | |
super().__init__() | |
self.attention = nn.MultiheadAttention(size, num_heads=8) | |
def forward(self, x): | |
x = x.unsqueeze(1) | |
attn_output, _ = self.attention(x, x, x) | |
return attn_output.squeeze(1) | |
class DynamicFlashAttentionLayer(nn.Module): | |
def __init__(self, size): | |
super().__init__() | |
self.attention = nn.MultiheadAttention(size, num_heads=8) | |
def forward(self, x): | |
x = x.unsqueeze(1) | |
attn_output, _ = self.attention(x, x, x) | |
return attn_output.squeeze(1) | |
class DynamicModel(nn.Module): | |
def __init__(self, sections: Dict[str, List[Dict[str, Any]]]): | |
super().__init__() | |
self.sections = nn.ModuleDict({sn: nn.ModuleList([self.create_layer(lp) for lp in layers]) for sn, layers in sections.items()}) | |
def create_layer(self, lp): | |
layers = [nn.Linear(lp['input_size'], lp['output_size'])] | |
if lp.get('batch_norm', True): | |
layers.append(nn.BatchNorm1d(lp['output_size'])) | |
activation = lp.get('activation', 'relu') | |
if activation == 'relu': | |
layers.append(nn.ReLU(inplace=True)) | |
elif activation == 'tanh': | |
layers.append(nn.Tanh()) | |
elif activation == 'sigmoid': | |
layers.append(nn.Sigmoid()) | |
elif activation == 'leaky_relu': | |
layers.append(nn.LeakyReLU(negative_slope=0.01, inplace=True)) | |
elif activation == 'elu': | |
layers.append(nn.ELU(alpha=1.0, inplace=True)) | |
if dropout := lp.get('dropout', 0.1): | |
layers.append(nn.Dropout(p=dropout)) | |
if lp.get('memory_augmentation', True): | |
layers.append(MemoryAugmentationLayer(lp['output_size'])) | |
if lp.get('hybrid_attention', True): | |
layers.append(HybridAttentionLayer(lp['output_size'])) | |
if lp.get('dynamic_flash_attention', True): | |
layers.append(DynamicFlashAttentionLayer(lp['output_size'])) | |
if lp.get('magic_state', True): | |
layers.append(MagicStateLayer(lp['output_size'])) | |
return nn.Sequential(*layers) | |
def forward(self, x, section_name=None): | |
if section_name: | |
for layer in self.sections[section_name]: | |
x = layer(x) | |
else: | |
for section_name, layers in self.sections.items(): | |
for layer in layers: | |
x = layer(x) | |
return x | |
def parse_xml_file(file_path): | |
tree, root, layers = ET.parse(file_path), ET.parse(file_path).getroot(), [] | |
for layer in root.findall('.//label'): | |
lp = { | |
'input_size': int(layer.get('input_size', 128)), | |
'output_size': int(layer.get('output_size', 256)), | |
'activation': layer.get('activation', 'relu').lower() | |
} | |
if lp['activation'] not in ['relu', 'tanh', 'sigmoid', 'none']: | |
raise ValueError(f"Unsupported activation function: {lp['activation']}") | |
if lp['input_size'] <= 0 or lp['output_size'] <= 0: | |
raise ValueError("Layer dimensions must be positive integers") | |
layers.append(lp) | |
if not layers: | |
layers.append({'input_size': 128, 'output_size': 256, 'activation': 'relu'}) | |
return layers | |
def create_model_from_folder(folder_path): | |
sections = defaultdict(list) | |
if not os.path.exists(folder_path): | |
logging.warning(f"Folder {folder_path} does not exist. Creating model with default configuration.") | |
return DynamicModel({}) | |
xml_files_found = False | |
for root, dirs, files in os.walk(folder_path): | |
for file in files: | |
if file.endswith('.xml'): | |
xml_files_found = True | |
file_path = os.path.join(root, file) | |
try: | |
sections[os.path.basename(root).replace('.', '_')].extend(parse_xml_file(file_path)) | |
except Exception as e: | |
logging.error(f"Error processing {file_path}: {str(e)}") | |
if not xml_files_found: | |
logging.warning("No XML files found. Creating model with default configuration.") | |
return DynamicModel({}) | |
return DynamicModel(dict(sections)) | |
def create_embeddings_and_stores(folder_path, model_name="sentence-transformers/all-MiniLM-L6-v2"): | |
tokenizer = AutoTokenizer.from_pretrained(model_name) | |
model = AutoModel.from_pretrained(model_name) | |
doc_store = [] | |
embeddings_list = [] | |
for root, dirs, files in os.walk(folder_path): | |
for file in files: | |
if file.endswith('.xml'): | |
file_path = os.path.join(root, file) | |
try: | |
tree, root = ET.parse(file_path), ET.parse(file_path).getroot() | |
for elem in root.iter(): | |
if elem.text: | |
text = elem.text.strip() | |
inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True) | |
with torch.no_grad(): | |
embeddings = model(**inputs).last_hidden_state.mean(dim=1).cpu().numpy() | |
embeddings_list.append(embeddings) | |
doc_store.append(text) | |
except Exception as e: | |
logging.error(f"Error processing {file_path}: {str(e)}") | |
return embeddings_list, doc_store | |
def query_embeddings(query, embeddings_list, doc_store, model_name="sentence-transformers/all-MiniLM-L6-v2"): | |
tokenizer = AutoTokenizer.from_pretrained(model_name) | |
model = AutoModel.from_pretrained(model_name) | |
inputs = tokenizer(query, return_tensors="pt", truncation=True, padding=True) | |
with torch.no_grad(): | |
query_embedding = model(**inputs).last_hidden_state.mean(dim=1).cpu().numpy() | |
similarities = [np.dot(query_embedding, emb.T) for emb in embeddings_list] | |
top_k_indices = np.argsort(similarities, axis=0)[-5:][::-1] | |
return [doc_store[i] for i in top_k_indices] | |
def fetch_courtlistener_data(query): | |
base_url = "https://nzlii.org/cgi-bin/sinosrch.cgi" | |
params = {"method": "auto", "query": query, "meta": "/nz", "results": "50", "format": "json"} | |
try: | |
response = requests.get(base_url, params=params, headers={"Accept": "application/json"}, timeout=10) | |
response.raise_for_status() | |
return [{"title": r.get("title", ""), "citation": r.get("citation", ""), "date": r.get("date", ""), "court": r.get("court", ""), "summary": r.get("summary", ""), "url": r.get("url", "")} for r in response.json().get("results", [])] | |
except requests.exceptions.RequestException as e: | |
logging.error(f"Failed to fetch data from NZLII API: {str(e)}") | |
return [] | |
class CustomModel(nn.Module): | |
def __init__(self, model_name="distilbert-base-uncased"): | |
super().__init__() | |
self.tokenizer = AutoTokenizer.from_pretrained(model_name) | |
self.encoder = AutoModel.from_pretrained(model_name) | |
self.hidden_size = self.encoder.config.hidden_size | |
self.dropout = nn.Dropout(p=0.3) | |
self.fc1 = nn.Linear(self.hidden_size, 128) | |
self.fc2 = nn.Linear(128, 64) | |
self.fc3 = nn.Linear(64, 32) | |
self.fc4 = nn.Linear(32, 16) | |
self.memory = nn.LSTM(self.hidden_size, 64, bidirectional=True, batch_first=True) | |
self.memory_fc1 = nn.Linear(64 * 2, 32) | |
self.memory_fc2 = nn.Linear(32, 16) | |
def forward(self, data): | |
tokens = self.tokenizer(data, return_tensors="pt", truncation=True, padding=True) | |
outputs = self.encoder(**tokens) | |
x = outputs.last_hidden_state.mean(dim=1) | |
x = self.dropout(F.relu(self.fc1(x))) | |
x = self.dropout(F.relu(self.fc2(x))) | |
x = self.dropout(F.relu(self.fc3(x))) | |
x = self.fc4(x) | |
return x | |
def training_step(self, data, labels, optimizer, criterion): | |
optimizer.zero_grad() | |
outputs = self.forward(data) | |
loss = criterion(outputs, labels) | |
loss.backward() | |
optimizer.step() | |
return loss.item() | |
def validation_step(self, data, labels, criterion): | |
with torch.no_grad(): | |
outputs = self.forward(data) | |
loss = criterion(outputs, labels) | |
return loss.item() | |
def predict(self, input): | |
self.eval() | |
with torch.no_grad(): | |
return self.forward(input) | |
def main(): | |
folder_path = 'data' | |
model = create_model_from_folder(folder_path) | |
logging.info(f"Created dynamic PyTorch model with sections: {list(model.sections.keys())}") | |
embeddings_list, doc_store = create_embeddings_and_stores(folder_path) | |
accelerator = Accelerator() | |
optimizer = torch.optim.Adam(model.parameters(), lr=0.001) | |
criterion = nn.CrossEntropyLoss() | |
num_epochs = 10 | |
dataset = TensorDataset(torch.randn(100, 128), torch.randint(0, 2, (100,))) | |
dataloader = DataLoader(dataset, batch_size=16, shuffle=True) | |
model, optimizer, dataloader = accelerator.prepare(model, optimizer, dataloader) | |
for epoch in range(num_epochs): | |
model.train() | |
total_loss = 0 | |
for batch_data, batch_labels in dataloader: | |
optimizer.zero_grad() | |
outputs = model(batch_data) | |
loss = criterion(outputs, batch_labels) | |
accelerator.backward(loss) | |
optimizer.step() | |
total_loss += loss.item() | |
avg_loss = total_loss / len(dataloader) | |
logging.info(f"Epoch {epoch+1}/{num_epochs}, Average Loss: {avg_loss:.4f}") | |
query = "example query text" | |
results = query_embeddings(query, embeddings_list, doc_store) | |
logging.info(f"Query results: {results}") | |
courtlistener_data = fetch_courtlistener_data(query) | |
logging.info(f"CourtListener API results: {courtlistener_data}") | |
if __name__ == "__main__": | |
main() |