|
import os,time,re,glob,pickle,itertools,json,hashlib,asyncio,threading,concurrent.futures,warnings,logging |
|
from pathlib import Path;from collections import defaultdict,Counter;from concurrent.futures import * |
|
from typing import *;import numpy as np,torch,torch.nn as nn,torch.nn.functional as F,torch.optim as optim |
|
from torch.utils.data import *;from torch_geometric.nn import *;from torch.distributions import Categorical |
|
import nltk,networkx as nx,scipy.sparse as sp;from tqdm import tqdm;from queue import Queue |
|
from nltk.tokenize import *;from nltk.stem import *;from nltk.corpus import *;from nltk.tag import * |
|
from nltk.chunk import *;warnings.filterwarnings('ignore');P=print |
|
|
|
print("Starting module initialization...") |
|
|
|
class Cfg: |
|
E,H,N,C,B=512,32,1024,256,128;M,S,V=20000,2048,1e5;W,L,D=4000,2e-4,.15 |
|
@classmethod |
|
def d(cls): |
|
print(f"Retrieving configuration dictionary with {len([k for k,v in cls.__dict__.items() if not k.startswith('_')])} items") |
|
return{k:v for k,v in cls.__dict__.items()if not k.startswith('_')} |
|
|
|
class Log: |
|
def __init__(s,f='r.log'): |
|
print(f"Initializing logger with file: {f}") |
|
s.l=logging.getLogger('R');s.l.setLevel(logging.INFO) |
|
for h in[logging.FileHandler(f),logging.StreamHandler()]: |
|
h.setFormatter(logging.Formatter('%(asctime)s-%(name)s-%(levelname)s-%(message)s')) |
|
s.l.addHandler(h) |
|
def i(s,m): |
|
print(f"INFO: {m}") |
|
s.l.info(m) |
|
def e(s,m): |
|
print(f"ERROR: {m}") |
|
s.l.error(m) |
|
def s(s,m): |
|
print(f"SUCCESS: {m}") |
|
s.l.info(f"\033[92m{m}\033[0m") |
|
|
|
class Res: |
|
R={'t':{'p','a'},'g':{'u','a'},'c':{'s','w','b','t'}} |
|
def __init__(s): |
|
print("Initializing Resource Manager...") |
|
s.l=Log();s.c={};s._i() |
|
P('Resource manager initialized') |
|
|
|
def _i(s): |
|
print("Initializing NLP components...") |
|
s.d={'l':WordNetLemmatizer(),'s':PorterStemmer(),'t':ToktokTokenizer(), |
|
'p':s._p(),'r':RegexpParser(s.G)} |
|
P('Components initialized') |
|
|
|
def _p(s): |
|
print("Processing tagged sentences...") |
|
raw_sents = nltk.corpus.brown.tagged_sents()[:10000] |
|
t = [] |
|
for sent in raw_sents: |
|
if sent: |
|
t.extend((word, tag) for word, tag in sent if word and tag) |
|
return TrigramTagger(train=[t], backoff=BigramTagger([t], backoff=UnigramTagger([t]))) |
|
|
|
def p(s,t): |
|
print(f"Processing text input of length: {len(t)}") |
|
k=s.d['t'].tokenize(t) |
|
f={'t':k,'p':s.d['p'].tag(k),'l':[s.d['l'].lemmatize(x)for x in k], |
|
's':[s.d['s'].stem(x)for x in k]} |
|
with ThreadPoolExecutor(2)as e: |
|
f['r']=e.submit(s.d['r'].parse,f['p']).result() |
|
P(f'Processed text: {len(k)} tokens') |
|
return f |
|
|
|
G = """ |
|
NP: {<DT|PP\$>?<JJ>*<NN.*>+} |
|
VP: {<VB.*><NP|PP|CLAUSE>+} |
|
CLAUSE: {<NP><VP>} |
|
""" |
|
|
|
def _i(s): |
|
s.d = { |
|
'l': WordNetLemmatizer(), |
|
's': PorterStemmer(), |
|
't': ToktokTokenizer(), |
|
'p': s._p(), |
|
'r': RegexpParser(s.G.strip()) |
|
} |
|
P('Components initialized with enhanced grammar') |
|
class TB(nn.Module): |
|
def __init__(s,d,h,r=4,p=Cfg.D): |
|
super().__init__() |
|
s.a=nn.MultiheadAttention(d,h,p);s.m=nn.Sequential(nn.Linear(d,int(d*r)),nn.GELU(), |
|
nn.Dropout(p),nn.Linear(int(d*r),d),nn.Dropout(p));s.n=nn.ModuleList([nn.LayerNorm(d)for _ in'123']) |
|
s.g=GATConv(d,d,4,p);s.f=nn.Sequential(nn.Linear(d,d),nn.Sigmoid()) |
|
P(f'Transformer block initialized: dim={d}, heads={h}') |
|
|
|
def forward(s,x,e=None,m=None): |
|
x=x+s.a(s.n[0](x),m)[0];x=x+s.m(s.n[1](x)) |
|
if e is not None:x=x+s.g(s.n[2](x).view(-1,x.size(-1)),e).view(x.size()) |
|
return x*s.f(x) |
|
|
|
class MA(nn.Module): |
|
def __init__(s,nc=Cfg.C,vs=Cfg.V,ed=Cfg.E,d=12,h=Cfg.H): |
|
super().__init__() |
|
s.e=nn.Embedding(vs,ed);s.p=nn.Parameter(torch.zeros(1,Cfg.S,ed)) |
|
s.t=nn.ModuleList([TB(ed,h)for _ in range(d)]);s.m=nn.Parameter(torch.zeros(1,Cfg.N,ed)) |
|
s.o=nn.Sequential(nn.Linear(ed,ed),nn.Tanh());s.c=nn.Sequential(nn.Linear(ed,ed//2), |
|
nn.ReLU(),nn.Dropout(Cfg.D),nn.Linear(ed//2,nc));s._i() |
|
P(f'Model architecture initialized: classes={nc}, vocab={vs}, dim={ed}') |
|
|
|
def _i(s): |
|
def i(l): |
|
if isinstance(l,nn.Linear):nn.init.xavier_uniform_(l.weight) |
|
if getattr(l,'bias',None)is not None:nn.init.zeros_(l.bias) |
|
s.apply(i) |
|
|
|
def forward(s,x,e=None): |
|
B,N=x.shape;x=s.e(x)+s.p[:,:N] |
|
for b in s.t:x=b(x,e,s.m) |
|
return s.c(s.o(x.mean(1))) |
|
|
|
class Opt: |
|
def __init__(s,p,l=Cfg.L,w=Cfg.W,d=.01): |
|
s.o=optim.AdamW(p,l,(.9,.999),1e-8,d) |
|
s.s=optim.lr_scheduler.OneCycleLR(s.o,l,w,.1,'cos',True,25,1e4) |
|
s.c=torch.cuda.amp.GradScaler();s.g=1.0 |
|
P('Optimizer initialized with AdamW and OneCycleLR') |
|
|
|
def step(s,l): |
|
s.c.scale(l).backward();s.c.unscale_(s.o) |
|
torch.nn.utils.clip_grad_norm_(s.o.param_groups[0]['params'],s.g) |
|
s.c.step(s.o);s.c.update();s.s.step();s.o.zero_grad() |
|
class T: |
|
def __init__(s,m,t,v,d): |
|
s.m=m.to(d);s.t=t;s.v=v;s.d=d;s.o=Opt(m.parameters()) |
|
s.mt=defaultdict(list);s.c=nn.CrossEntropyLoss(label_smoothing=.1) |
|
s.l=Log();s.b=-float('inf');s._m={} |
|
P('Trainer initialized with device: '+str(d)) |
|
|
|
def e(s): |
|
s.m.train();m=defaultdict(float) |
|
for i,(x,y)in enumerate(tqdm(s.t,desc='Training')): |
|
x,y=x.to(s.d),y.to(s.d) |
|
with torch.cuda.amp.autocast():o=s.m(x);l=s.c(o,y) |
|
s.o.step(l);b=s._c(o,y,l) |
|
for k,v in b.items():m[k]+=v |
|
if i%10==0:P(f'Batch {i}: Loss={l.item():.4f}') |
|
return {k:v/len(s.t)for k,v in m.items()} |
|
|
|
def v(s): |
|
s.m.eval();m=defaultdict(float) |
|
with torch.no_grad(): |
|
for x,y in tqdm(s.v,desc='Validating'): |
|
x,y=x.to(s.d),y.to(s.d);o=s.m(x) |
|
for k,v in s._c(o,y).items():m[k]+=v |
|
r={k:v/len(s.v)for k,v in m.items()};s._u(r) |
|
P(f'Validation metrics: {r}') |
|
return r |
|
|
|
def _c(s,o,t,l=None): |
|
m={};m['l']=l.item()if l else 0 |
|
p=o.argmax(1);c=p.eq(t).sum().item();m['a']=c/t.size(0) |
|
with torch.no_grad(): |
|
pb=F.softmax(o,1);cf=pb.max(1)[0].mean().item() |
|
et=-torch.sum(pb*torch.log(pb+1e-10),1).mean().item() |
|
m.update({'c':cf,'e':et}) |
|
return m |
|
|
|
def t(s,e,p=None,es=5): |
|
b=-float('inf');pc=0 |
|
for i in range(e): |
|
tm=s.e();vm=s.v() |
|
s.l.i(f'E{i+1}/{e}-TL:{tm["l"]:.4f},VL:{vm["l"]:.4f},VA:{vm["a"]:.4f}') |
|
if vm['a']>b: |
|
b=vm['a'];pc=0 |
|
else: |
|
pc+=1 |
|
if p: |
|
s._s(p,i,vm) |
|
if pc>=es: |
|
s.l.i(f'Early stop after {i+1} epochs');break |
|
P(f'Epoch {i+1} completed') |
|
|
|
def _s(s,p,e,m): |
|
torch.save({'e':e,'m':s.m.state_dict(),'o':s.o.o.state_dict(), |
|
's':s.o.s.state_dict(),'m':m,'c':Cfg.d(),'t':time.strftime('%Y%m%d-%H%M%S')},p) |
|
s.l.s(f'Checkpoint saved: {p}') |
|
class D: |
|
def __init__(s,p,b=Cfg.B,w=os.cpu_count()): |
|
s.p=Path(p);s.b=b;s.w=w;s.pr=Res();s.l=Log() |
|
s.t=s.v=s.e=None;P('DataModule initialized') |
|
|
|
def s(s): |
|
d=s._l();t,v,e=s._sp(d) |
|
s.t,s.v,s.e=map(s._c,[t,v,e]) |
|
P(f'Datasets created: {len(s.t)}/{len(s.v)}/{len(s.e)} samples') |
|
|
|
def _l(s): |
|
d=[];f=list(s.p.rglob('*.xml')) |
|
with ProcessPoolExecutor(s.w)as e: |
|
fs=[e.submit(s._pf,f)for f in f] |
|
for f in tqdm(as_completed(fs),total=len(f)): |
|
if r:=f.result():d.append(r) |
|
P(f'Loaded {len(d)} files') |
|
return d |
|
|
|
def _pf(s,f): |
|
try: |
|
t=ET.parse(f);r=t.getroot() |
|
tx=' '.join(e.text for e in r.findall('.//text')if e.text) |
|
p=s.pr.p(tx);l=r.find('.//label') |
|
return{'f':p,'m':{'l':len(tx)},'l':l.text if l is not None else'UNK','p':str(f)} |
|
except Exception as e:s.l.e(f'Error:{f}-{str(e)}');return None |
|
|
|
def _sp(s,d): |
|
np.random.seed(42);i=np.random.permutation(len(d)) |
|
t,v=int(.8*len(d)),int(.9*len(d)) |
|
return [d[j]for j in i[:t]],[d[j]for j in i[t:v]],[d[j]for j in i[v:]] |
|
|
|
def _c(s,d): |
|
f=torch.stack([torch.tensor(i['f'])for i in d]) |
|
l={x:i for i,x in enumerate(sorted(set(i['l']for i in d)))} |
|
y=torch.tensor([l[i['l']]for i in d]) |
|
return TensorDataset(f,y) |
|
|
|
def dl(s,t): |
|
d=getattr(s,t);sh=t=='t' |
|
return DataLoader(d,s.b,sh,s.w,True,t=='t') |
|
|
|
class P: |
|
def __init__(s, cfg_dict): |
|
|
|
s.c = cfg_dict if isinstance(cfg_dict, dict) else {'p': cfg_dict, 'o': 'r_out'} |
|
s.l = Log() |
|
s.o = Path(s.c['o'] if 'o' in s.c else 'r_out') |
|
s.o.mkdir(parents=True, exist_ok=True) |
|
s.d = D(s.c['p'], s.c.get('b', Cfg.B)) |
|
s.v = torch.device('cuda' if torch.cuda.is_available() else 'cpu') |
|
P('Pipeline initialized with configuration') |
|
|
|
|
|
def r(s): |
|
s.l.i('Init pipeline');s.d.s() |
|
m=s._im();t=T(m,s.d.dl('t'),s.d.dl('v'),s.v) |
|
t.t(s.c.get('e',50),s.o/'ckpt'/'best.pth') |
|
s._f(m,t);P('Pipeline completed') |
|
|
|
def _im(s): |
|
s.l.i('Init model') |
|
return MA(len(s.d.t.tensors[1].unique()),Cfg.V,Cfg.E,s.c.get('md',12),Cfg.H).to(s.v) |
|
|
|
def _f(s,m,t): |
|
s.l.i('Finalizing');r=s._em(m,s.d.dl('e')) |
|
s._ex(m,t,r);P('Results exported') |
|
|
|
def _em(s,m,d): |
|
m.eval();p,t=[],[];mt=defaultdict(list) |
|
with torch.no_grad(): |
|
for x,y in tqdm(d,'Evaluating'): |
|
x,y=x.to(s.v),y.to(s.v);o=m(x) |
|
p.extend(o.argmax(1).cpu());t.extend(y.cpu()) |
|
for k,v in s._cm(o,y).items():mt[k].append(v) |
|
return{'p':p,'t':t,'m':mt} |
|
class M: |
|
def __init__(s):s.h=defaultdict(list);s.c=defaultdict(float);P('Metrics initialized') |
|
|
|
def u(s,m): |
|
for k,v in m.items():s.h[k].append(v);s.c[k]=v |
|
if len(s.h['l'])%10==0:P(f'Metrics updated: {dict(s.c)}') |
|
|
|
def g(s):return{'h':dict(s.h),'c':dict(s.c)} |
|
|
|
def _ce(c,a,n=15): |
|
b=np.linspace(0,1,n+1);l,u=b[:-1],b[1:] |
|
e=sum(abs(np.mean(c[np.logical_and(c>l,c<=h)])-np.mean(a[np.logical_and(c>l,c<=h)]))* |
|
np.mean(np.logical_and(c>l,c<=h))for l,h in zip(l,u)) |
|
return float(e) |
|
|
|
def _pd(p):return float(-torch.sum(p.mean(0)*torch.log(p.mean(0)+1e-10))) |
|
|
|
def _exp(p,d,m,c): |
|
p.mkdir(parents=True,exist_ok=True) |
|
torch.save({'m':m.state_dict(),'c':c},p/'model.pt') |
|
with open(p/'metrics.json','w')as f:json.dump(m,f) |
|
P(f'Exported to {p}') |
|
def main(): |
|
|
|
cfg = { |
|
'p': 'data', |
|
'o': 'output', |
|
'm': Cfg.d(), |
|
'b': Cfg.B, |
|
'md': 12, |
|
'e': 50 |
|
} |
|
P("Starting pipeline with configuration...") |
|
pipeline = P(cfg) |
|
pipeline.r() |
|
P("Pipeline completed successfully!") |
|
|
|
if __name__=='__main__': |
|
print("Starting main execution...") |
|
main() |
|
print("Main execution completed.") |