Spaces:
Runtime error
Runtime error
File size: 7,424 Bytes
2457015 ad52c1a 2457015 ad52c1a 2457015 ad52c1a 2457015 ad52c1a 2457015 ad52c1a 2457015 ad52c1a 2457015 ad52c1a 2457015 ad52c1a 2457015 ad52c1a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 |
import os, xml.etree.ElementTree as ET, torch, torch.nn as nn, torch.nn.functional as F, numpy as np
from typing import List, Dict, Any, Optional
from collections import defaultdict
from accelerate import Accelerator
from transformers import AutoTokenizer, AutoModel
from termcolor import colored
from sklearn.metrics.pairwise import cosine_similarity
class DM(nn.Module):
def __init__(self, s: Dict[str, List[Dict[str, Any]]]):
super(DM, self).__init__()
self.s = nn.ModuleDict()
if not s: s = {'default': [{'input_size': 128, 'output_size': 256, 'activation': 'relu', 'batch_norm': True, 'dropout': 0.1}]}
for sn, l in s.items():
self.s[sn] = nn.ModuleList()
for lp in l:
print(colored(f"Creating layer in section '{sn}' with params: {lp}", 'cyan'))
self.s[sn].append(self.cl(lp))
def cl(self, lp: Dict[str, Any]) -> nn.Module:
l = [nn.Linear(lp['input_size'], lp['output_size'])]
if lp.get('batch_norm', True): l.append(nn.BatchNorm1d(lp['output_size']))
a = lp.get('activation', 'relu')
if a == 'relu': l.append(nn.ReLU(inplace=True))
elif a == 'tanh': l.append(nn.Tanh())
elif a == 'sigmoid': l.append(nn.Sigmoid())
elif a == 'leaky_relu': l.append(nn.LeakyReLU(negative_slope=0.01, inplace=True))
elif a == 'elu': l.append(nn.ELU(alpha=1.0, inplace=True))
elif a is not None: raise ValueError(f"Unsupported activation function: {a}")
if dr := lp.get('dropout', 0.0): l.append(nn.Dropout(p=dr))
if hl := lp.get('hidden_layers', []):
for hlp in hl: l.append(self.cl(hlp))
if lp.get('memory_augmentation', True): l.append(MAL(lp['output_size']))
if lp.get('hybrid_attention', True): l.append(HAL(lp['output_size']))
if lp.get('dynamic_flash_attention', True): l.append(DFAL(lp['output_size']))
return nn.Sequential(*l)
def forward(self, x: torch.Tensor, sn: Optional[str] = None) -> torch.Tensor:
if sn is not None:
if sn not in self.s: raise KeyError(f"Section '{sn}' not found in model")
for l in self.s[sn]: x = l(x)
else:
for sn, l in self.s.items():
for l in l: x = l(x)
return x
class MAL(nn.Module):
def __init__(self, s: int):
super(MAL, self).__init__()
self.m = nn.Parameter(torch.randn(s))
def forward(self, x: torch.Tensor) -> torch.Tensor:
return x + self.m
class HAL(nn.Module):
def __init__(self, s: int):
super(HAL, self).__init__()
self.a = nn.MultiheadAttention(s, num_heads=8)
def forward(self, x: torch.Tensor) -> torch.Tensor:
x = x.unsqueeze(1)
ao, _ = self.a(x, x, x)
return ao.squeeze(1)
class DFAL(nn.Module):
def __init__(self, s: int):
super(DFAL, self).__init__()
self.a = nn.MultiheadAttention(s, num_heads=8)
def forward(self, x: torch.Tensor) -> torch.Tensor:
x = x.unsqueeze(1)
ao, _ = self.a(x, x, x)
return ao.squeeze(1)
def px(file_path: str) -> List[Dict[str, Any]]:
t = ET.parse(file_path)
r = t.getroot()
l = []
for ly in r.findall('.//layer'):
lp = {'input_size': int(ly.get('input_size', 128)), 'output_size': int(ly.get('output_size', 256)), 'activation': ly.get('activation', 'relu').lower()}
if lp['activation'] not in ['relu', 'tanh', 'sigmoid', 'none']: raise ValueError(f"Unsupported activation function: {lp['activation']}")
if lp['input_size'] <= 0 or lp['output_size'] <= 0: raise ValueError("Layer dimensions must be positive integers")
l.append(lp)
if not l: l.append({'input_size': 128, 'output_size': 256, 'activation': 'relu'})
return l
def cmf(folder_path: str) -> DM:
s = defaultdict(list)
if not os.path.exists(folder_path):
print(colored(f"Warning: Folder {folder_path} does not exist. Creating model with default configuration.", 'yellow'))
return DM({})
xf = True
for r, d, f in os.walk(folder_path):
for file in f:
if file.endswith('.xml'):
xf = True
fp = os.path.join(r, file)
try:
l = px(fp)
sn = os.path.basename(r).replace('.', '_')
s[sn].extend(l)
except Exception as e:
print(colored(f"Error processing {fp}: {str(e)}", 'red'))
if not xf:
print(colored("Warning: No XML files found. Creating model with default configuration.", 'yellow'))
return DM({})
return DM(dict(s))
def ceas(folder_path: str, model_name: str = "sentence-transformers/all-MiniLM-L6-v2"):
t = AutoTokenizer.from_pretrained(model_name)
m = AutoModel.from_pretrained(model_name)
embeddings = []
ds = []
for r, d, f in os.walk(folder_path):
for file in f:
if file.endswith('.xml'):
fp = os.path.join(r, file)
try:
tree = ET.parse(fp)
root = tree.getroot()
for e in root.iter():
if e.text:
text = e.text.strip()
i = t(text, return_tensors="pt", truncation=True, padding=True)
with torch.no_grad():
emb = m(**i).last_hidden_state.mean(dim=1).numpy()
embeddings.append(emb)
ds.append(text)
except Exception as e:
print(colored(f"Error processing {fp}: {str(e)}", 'red'))
embeddings = np.vstack(embeddings)
return embeddings, ds
def qvs(query: str, embeddings, ds, model_name: str = "sentence-transformers/all-MiniLM-L6-v2"):
t = AutoTokenizer.from_pretrained(model_name)
m = AutoModel.from_pretrained(model_name)
i = t(query, return_tensors="pt", truncation=True, padding=True)
with torch.no_grad():
qe = m(**i).last_hidden_state.mean(dim=1).numpy()
similarities = cosine_similarity(qe, embeddings)
top_k_indices = similarities[0].argsort()[-5:][::-1]
return [ds[i] for i in top_k_indices]
def main():
fp = 'data'
m = cmf(fp)
print(colored(f"Created dynamic PyTorch model with sections: {list(m.s.keys())}", 'green'))
fs = next(iter(m.s.keys()))
fl = m.s[fs][0]
ife = fl[0].in_features
si = torch.randn(1, ife)
o = m(si)
print(colored(f"Sample output shape: {o.shape}", 'green'))
embeddings, ds = ceas(fp)
a = Accelerator()
o = torch.optim.Adam(m.parameters(), lr=0.001)
c = nn.CrossEntropyLoss()
ne = 10
d = torch.utils.data.TensorDataset(torch.randn(100, ife), torch.randint(0, 2, (100,)))
td = torch.utils.data.DataLoader(d, batch_size=16, shuffle=True)
m, o, td = a.prepare(m, o, td)
for e in range(ne):
m.train()
tl = 0
for bi, (i, l) in enumerate(td):
o.zero_grad()
o = m(i)
l = c(o, l)
a.backward(l)
o.step()
tl += l.item()
al = tl / len(td)
print(colored(f"Epoch {e+1}/{ne}, Average Loss: {al:.4f}", 'blue'))
uq = "example query text"
r = qvs(uq, embeddings, ds)
print(colored(f"Query results: {r}", 'magenta'))
if __name__ == "__main__":
main() |