cs772_bert / app.py
shambhavi3's picture
update app.py
0d59a98 verified
import gradio as gr
import transformers
import torch
#import neptune
#from knockknock import slack_sender
from transformers import *
#import glob
from transformers import BertTokenizer
from transformers import BertForSequenceClassification, AdamW, BertConfig
import random
import pandas as pd
from transformers import BertTokenizer
#from Models.utils import masked_cross_entropy,fix_the_random,format_time,save_normal_model,save_bert_model
from sklearn.metrics import accuracy_score,f1_score
from tqdm import tqdm
'''from TensorDataset.datsetSplitter import createDatasetSplit
from TensorDataset.dataLoader import combine_features
from Preprocess.dataCollect import collect_data,set_name'''
from sklearn.metrics import accuracy_score,f1_score,roc_auc_score,recall_score,precision_score
import matplotlib.pyplot as plt
import time
import os
from transformers import BertTokenizer
#import GPUtil
from sklearn.utils import class_weight
#import json
#from Models.bertModels import *
#from Models.otherModels import *
import sys
#import time
#from waiting import wait
from sklearn.preprocessing import LabelEncoder
import numpy as np
#import threading
#import argparse
#import ast
#from manual_training_inference import select_model
#from Models.utils import save_normal_model,save_bert_model,load_model
#from Models.utils import return_params
from transformers import DistilBertTokenizer
#from TensorDataset.dataLoader import custom_att_masks
#from keras.preprocessing.sequence import pad_sequences
#import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np
import PIL.Image as Image
from torch import nn
from pyvene import embed_to_distrib, top_vals, format_token
from pyvene import (
IntervenableModel,
VanillaIntervention, Intervention,
RepresentationConfig,
IntervenableConfig,
ConstantSourceIntervention,
LocalistRepresentationIntervention
)
from pyvene import create_gpt2
#%config InlineBackend.figure_formats = ['svg']
from plotnine import (
ggplot,
geom_tile,
aes,
facet_wrap,
theme,
element_text,
geom_bar,
geom_hline,
scale_y_log10,
xlab, ylab, ylim,
scale_y_discrete, scale_y_continuous, ggsave
)
from plotnine.scales import scale_y_reverse, scale_fill_cmap
from tqdm import tqdm
global device
device=torch.device('cuda' if torch.cuda.is_available() else 'cpu')
def create_bert(cache_dir=None):
"""Creates a GPT2 model, config, and tokenizer from the given name and revision"""
from transformers import BertConfig
config = BertConfig.from_pretrained("./cs772_proj/bert_base/checkpoint-3848/config.json")
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
gpt = AutoModelForSequenceClassification.from_pretrained("./cs772_proj/bert_base/checkpoint-3848", config=config, cache_dir=cache_dir)
print("loaded model")
return config, tokenizer, gpt
def interpret(text,label):
titles={
"block_output": "single restored layer in BERT",
"mlp_activation": "center of interval of 5 patched mlp layer",
"attention_output": "center of interval of 5 patched attn layer"
}
colors={
"block_output": "Purples",
"mlp_activation": "Greens",
"attention_output": "Reds"
}
device = "cuda:0" if torch.cuda.is_available() else "cpu"
#config, tokenizer, gpt = pv.create_llama(name="sharpbai/alpaca-7b-merged")
config, tokenizer, gpt = create_bert()
#config, tokenizer, gpt = create_gpt2(name="gpt2-xl")
gpt.to(device)
base = text
inputs = [
tokenizer(base, return_tensors="pt").to(device),
]
#print(base)
base_token = tokenizer.convert_ids_to_tokens(inputs[0]['input_ids'][0])
res = gpt(**inputs[0])
probabilities = nn.functional.softmax(res[0], dim=-1)
if label=="hate":
l = 0
elif label=="normal":
l=1
else:l=2
#print(probabilities)
#print(res[0][0][0].item())
#print(res)
#distrib = embed_to_distrib(gpt, res.last_hidden_state, logits=False)
#top_vals(tokenizer, distrib[0][-1], n=20)
base = tokenizer(text, return_tensors="pt").to(device)
config = corrupted_config(type(gpt))
intervenable = IntervenableModel(config, gpt)
_, counterfactual_outputs = intervenable(
base, unit_locations={"base": ([[[0,1,2,3]]])}
)
#probabilities = nn.functional.softmax(counterfactual_outputs[0], dim=-1)
#print(probabilities)
for stream in ["block_output", "mlp_activation", "attention_output"]:
data = []
for layer_i in tqdm(range(gpt.config.num_hidden_layers)):
for pos_i in range(len(base_token)):
config = restore_corrupted_with_interval_config(
layer_i, stream,
window=1 if stream == "block_output" else 5
)
n_restores = len(config.representations) - 1
intervenable = IntervenableModel(config, gpt)
_, counterfactual_outputs = intervenable(
base,
[None] + [base]*n_restores,
{
"sources->base": (
[None] + [[[pos_i]]]*n_restores,
[[[0,1,2,3]]] + [[[pos_i]]]*n_restores,
)
},
)
#distrib = embed_to_distrib(
#gpt, counterfactual_outputs.last_hidden_state, logits=False
#)
#prob = distrib[0][-1][token].detach().cpu().item()
logits = counterfactual_outputs[0]
probabilities = nn.functional.softmax(logits, dim=-1)
prob_offense = probabilities[0][l].item()
data.append({"layer": layer_i, "pos": pos_i, "prob": prob_offense})
df = pd.DataFrame(data)
df.to_csv(f"./cs772_proj/tutorial_data/pyvene_rome_{stream}.csv")
for stream in ["block_output", "mlp_activation", "attention_output"]:
df = pd.read_csv(f"./cs772_proj/tutorial_data/pyvene_rome_{stream}.csv")
df["layer"] = df["layer"].astype(int)
df["pos"] = df["pos"].astype(int)
prob_type = "p"+"("+label+")"
df[prob_type] = df["prob"].astype(float)
#custom_labels = ["imagine*","the*", "riots*", "if", "people", "actually", "got" ,"food" ,"boxes" ,"instead", "of" ,"ebt", "cards", "every", "ghetto", "in", "america", "would" ,"look", "like", "ferguson"]
custom_labels = base_token #["what*", "sort*", "of*", "white*","man" ,"or", "woman", "would", "vote", "for", "this", "nigger"]
#custom_labels = ["no*", "liberal*","congratulated*", "hindu*", "refugees", "post", "cab", "because", "they", "hate", "hindus"]
breaks = list(range(len(custom_labels)))#[0, 1, 2, 3, 4, 5, 6,7,8,9,10,11]
plot = (
ggplot(df, aes(x="layer", y="pos"))
+ geom_tile(aes(fill=prob_type))
+ scale_fill_cmap(colors[stream]) + xlab(titles[stream])
+ scale_y_reverse(
limits = (-0.5, len(custom_labels)),
breaks=breaks, labels=custom_labels)
+ theme(figure_size=(6,9)) + ylab("")
+ theme(axis_text_y = element_text(angle = 90, hjust = 1))
)
ggsave(
plot, filename=f"./cs772_proj/tutorial_data/pyvene_rome_{stream}.png", dpi=200
)
if stream == "mlp_activation":
mlp_img_path = f"./cs772_proj/tutorial_data/pyvene_rome_{stream}.png"
elif stream=="block_output":
bo_path = f"./cs772_proj/tutorial_data/pyvene_rome_{stream}.png"
else:attention_path = f"./cs772_proj/tutorial_data/pyvene_rome_{stream}.png"
return mlp_img_path,bo_path,attention_path
def restore_corrupted_with_interval_config(
layer, stream="mlp_activation", window=5, num_layers=12):
start = max(0, layer - window // 2)
end = min(num_layers, layer - (-window // 2))
config = IntervenableConfig(
representations=[
RepresentationConfig(
0, # layer
"block_input", # intervention type
),
] + [
RepresentationConfig(
i, # layer
stream, # intervention type
) for i in range(start, end)],
intervention_types=\
[NoiseIntervention]+[VanillaIntervention]*(end-start)
)
return config
class NoiseIntervention(ConstantSourceIntervention, LocalistRepresentationIntervention):
def __init__(self, embed_dim, **kwargs):
super().__init__()
self.interchange_dim = embed_dim
rs = np.random.RandomState(1)
prng = lambda *shape: rs.randn(*shape)
self.noise = torch.from_numpy(
prng(1, 4, embed_dim)).to(device)
self.noise_level = 0.7462981581687927 #0.3462981581687927
def forward(self, base, source=None, subspaces=None):
base[..., : self.interchange_dim] += self.noise * self.noise_level
return base
def __str__(self):
return f"NoiseIntervention(embed_dim={self.embed_dim})"
def corrupted_config(model_type):
config = IntervenableConfig(
model_type=model_type,
representations=[
RepresentationConfig(
0, # layer
"block_input", # intervention type
),
],
intervention_types=NoiseIntervention,
)
return config
def create_bert(cache_dir=None):
"""Creates a GPT2 model, config, and tokenizer from the given name and revision"""
from transformers import BertConfig
config = BertConfig.from_pretrained("./cs772_proj/bert_base/checkpoint-3848/config.json")
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
gpt = AutoModelForSequenceClassification.from_pretrained("./cs772_proj/bert_base/checkpoint-3848", config=config, cache_dir=cache_dir)
print("loaded model")
return config, tokenizer, gpt
# params = return_params('best_model_json/distilbert.json', 0.001 )
#params = return_params('best_model_json/distilbert.json', 1 )
'''embeddings=None
if(params['bert_tokens']):
train,val,test=createDatasetSplit(params) #update
else:
train,val,test,vocab_own=createDatasetSplit(params)
params['embed_size']=vocab_own.embeddings.shape[1]
params['vocab_size']=vocab_own.embeddings.shape[0]
embeddings=vocab_own.embeddings
if(params['auto_weights']):
y_test = [ele[2] for ele in test]
# print(y_test)
encoder = LabelEncoder()
encoder.classes_ = np.load(params['class_names'],allow_pickle=True)
params['weights']=class_weight.compute_class_weight('balanced',np.unique(y_test),y_test).astype('float32')
#params['weights']=np.array([len(y_test)/y_test.count(encoder.classes_[0]),len(y_test)/y_test.count(encoder.classes_[1]),len(y_test)/y_test.count(encoder.classes_[2])]).astype('float32')
model=select_model(params,embeddings)
model = model.eval()
tokenizer = DistilBertTokenizer.from_pretrained('distilbert-base-uncased')
classes_ = np.load('Data/classes.npy')
'''
def main_function(text,label):
'''tokens = tokenizer.encode_plus(text)
input_ids = pad_sequences(torch.tensor(tokens['input_ids']).unsqueeze(0),maxlen=int(params['max_length']),\
dtype="long",
value=0, truncating="post", padding="post")
# att_vals = pad_sequences(att_vals,maxlen=int(params['max_length']), dtype="float",
# value=0.0, truncating="post", padding="post")
att_masks=custom_att_masks(input_ids)
outs = model(torch.tensor(input_ids),
attention_mask=torch.tensor(att_masks, dtype=bool),
labels=None,
device='cuda')
text_tokens = tokenizer.convert_ids_to_tokens(input_ids.squeeze())
text_tokens_ = text_tokens[:len(tokens['input_ids'])]
print ('xyz')
print (outs[1][5].shape)
avg_attn = torch.mean(outs[1][5], dim=1)
avg_attn_np = avg_attn[0,0,:len(tokens['input_ids'])].detach().squeeze().numpy()
logits = outs[0]
print (logits)
print (np.sum(avg_attn_np))
print (avg_attn_np)
pred = torch.argmax(logits)
pred_label = classes_[pred]
'''
ml_img_path,bo_img_path,atten_img_path = interpret(text,label)
ml_im = Image.open(ml_img_path)
bo_im = Image.open(bo_img_path)
atten_im = Image.open(atten_img_path)
yield ml_im, bo_im, atten_im
'''
sns.set_theme(rc={'figure.figsize':(30,1)})
# creating subplot
fig, ax = plt.subplots()
# drawing heatmap on current axes
ax = sns.heatmap(np.expand_dims(avg_attn_np,0), annot= np.expand_dims(np.array(text_tokens_),0), \
fmt="", annot_kws={'size': 10}, cmap="magma")
fig = ax.get_figure()
fig.savefig("out.png" ,bbox_inches='tight')
im = Image.open("out.png")
yield im
'''
#return list(zip(text_tokens_ , avg_attn_np)), pred_label
# return list(zip(text_tokens_[1:-1] , avg_attn_np[1:-1]))
demo = gr.Interface(main_function,
inputs="textbox",
outputs="image",
theme = 'compact')
with gr.Blocks() as demo:
with gr.Tab("Text Input"):
text_input = gr.Textbox()
label_input = gr.Textbox()
text_button = gr.Button("Show")
with gr.Tab("Interpretability"):
with gr.Row():
image_output1 = gr.Image()
image_output2 = gr.Image()
image_output3 = gr.Image()
text_button.click(main_function, inputs=[text_input,label_input], outputs=[image_output1,image_output2,image_output3])
if __name__ == "__main__":
demo.launch()