|
""" |
|
Learn to classify the manually annotated CDA attributes (frames, 'riferimento', orientation) |
|
""" |
|
|
|
import sys |
|
|
|
import torch |
|
|
|
from allennlp.data.vocabulary import Vocabulary |
|
from allennlp.data import DatasetReader, TokenIndexer, Instance, Token |
|
from allennlp.data.fields import TextField, LabelField |
|
from allennlp.data.token_indexers.pretrained_transformer_indexer import ( |
|
PretrainedTransformerIndexer, |
|
) |
|
from allennlp.data.tokenizers.pretrained_transformer_tokenizer import ( |
|
PretrainedTransformerTokenizer, |
|
) |
|
from allennlp.models import BasicClassifier |
|
from allennlp.modules.text_field_embedders.basic_text_field_embedder import ( |
|
BasicTextFieldEmbedder, |
|
) |
|
from allennlp.modules.token_embedders.pretrained_transformer_embedder import ( |
|
PretrainedTransformerEmbedder, |
|
) |
|
from allennlp.modules.seq2vec_encoders.bert_pooler import BertPooler |
|
from allennlp.training.checkpointer import Checkpointer |
|
from allennlp.training.gradient_descent_trainer import GradientDescentTrainer |
|
from allennlp.data.data_loaders.simple_data_loader import SimpleDataLoader |
|
from allennlp.training.optimizers import AdamOptimizer |
|
from allennlp.predictors.text_classifier import TextClassifierPredictor |
|
|
|
from sklearn.svm import SVC |
|
from sklearn.feature_extraction.text import CountVectorizer |
|
from sklearn.metrics import precision_recall_fscore_support |
|
from sklearn.tree import DecisionTreeClassifier |
|
from sklearn.dummy import DummyClassifier |
|
|
|
import pandas as pd |
|
import numpy as np |
|
import spacy |
|
|
|
import json |
|
import os |
|
from typing import Dict, Iterable |
|
|
|
|
|
class MigrationReader(DatasetReader): |
|
def __init__(self, token_indexers, tokenizer): |
|
self.token_indexers = token_indexers |
|
self.tokenizer = tokenizer |
|
|
|
def text_to_instance(self, sentence, label=None) -> Instance: |
|
text_field = TextField(self.tokenizer.tokenize(sentence), self.token_indexers) |
|
fields = {"tokens": text_field} |
|
if label is not None: |
|
label_field = LabelField(label) |
|
fields["label"] = label_field |
|
return Instance(fields) |
|
|
|
|
|
def read_instances( |
|
self, text: pd.Series, labels: pd.Series |
|
) -> Iterable[Instance]: |
|
for sentence, label in zip(text, labels): |
|
instance = self.text_to_instance(sentence, label) |
|
yield instance |
|
|
|
|
|
def train(attrib, use_gpu=False): |
|
assert attrib in ["cda_frame", "riferimento", "orientation", "fake"] |
|
|
|
|
|
print("Loading data...") |
|
x_train, y_train, x_dev, y_dev = load_data(attrib) |
|
print(f"\t\ttrain size: {len(x_train)}") |
|
print(f"\t\tdev size: {len(x_dev)}") |
|
|
|
|
|
print("Running training setups...") |
|
scores = [] |
|
setups = [ |
|
|
|
|
|
( |
|
{}, |
|
{}, |
|
{ |
|
"type": "bert", |
|
"options": {"transformer": "Musixmatch/umberto-commoncrawl-cased-v1"}, |
|
}, |
|
), |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
] |
|
|
|
nlp = spacy.load("it_core_news_md") |
|
|
|
for s_idx, (text_options, vect_options, model_info) in enumerate(setups): |
|
|
|
if model_info["type"] == "bert": |
|
print("\t\tPreparing BERT model...") |
|
|
|
|
|
cuda_device = None if use_gpu and torch.cuda.is_available() else -1 |
|
|
|
transformer = model_info["options"]["transformer"] |
|
token_indexers = {"tokens": PretrainedTransformerIndexer(transformer)} |
|
tokenizer = PretrainedTransformerTokenizer(transformer) |
|
|
|
reader = MigrationReader(token_indexers, tokenizer) |
|
train_instances = list( |
|
reader.read_instances(x_train, y_train) |
|
) |
|
dev_instances = list( |
|
reader.read_instances(x_dev, y_dev) |
|
) |
|
vocab = Vocabulary.from_instances(train_instances + dev_instances) |
|
print(vocab.get_vocab_size("tags")) |
|
|
|
embedder = BasicTextFieldEmbedder( |
|
{"tokens": PretrainedTransformerEmbedder(transformer)} |
|
) |
|
seq2vec = BertPooler(transformer) |
|
model = BasicClassifier(vocab, embedder, seq2vec, namespace="tags") |
|
if use_gpu: |
|
model = model.cuda(cuda_device) |
|
|
|
checkpoint_dir = f"/scratch/p289731/cda_classify/model_{attrib}/checkpoints/" |
|
serialization_dir = f"/scratch/p289731/cda_classify/model_{attrib}/serialize/" |
|
os.makedirs(checkpoint_dir) |
|
os.makedirs(serialization_dir) |
|
checkpointer = Checkpointer(checkpoint_dir) |
|
optimizer = AdamOptimizer( |
|
[(n, p) for n, p in model.named_parameters() if p.requires_grad], |
|
lr=1e-6 |
|
) |
|
train_loader = SimpleDataLoader(train_instances, batch_size=8, shuffle=True) |
|
dev_loader = SimpleDataLoader(dev_instances, batch_size=8, shuffle=False) |
|
train_loader.index_with(vocab) |
|
dev_loader.index_with(vocab) |
|
|
|
print("\t\tTraining BERT model") |
|
trainer = GradientDescentTrainer( |
|
model, |
|
optimizer, |
|
train_loader, |
|
validation_data_loader=dev_loader, |
|
patience=32, |
|
checkpointer=checkpointer, |
|
cuda_device=cuda_device, |
|
serialization_dir=serialization_dir |
|
) |
|
trainer.train() |
|
|
|
print("\t\tProducing predictions...") |
|
predictor = TextClassifierPredictor(model, reader) |
|
predictions = [predictor.predict(sentence) for sentence in x_dev] |
|
y_dev_pred = [p["label"] for p in predictions] |
|
class_labels = list(vocab.get_token_to_index_vocabulary("labels").keys()) |
|
|
|
elif model_info["type"] in ["svm", "tree", "dummy"]: |
|
|
|
print("\t\tExtracting features...") |
|
x_train_fts, vectorizer = extract_features( |
|
x_train, nlp, text_options, **vect_options |
|
) |
|
x_dev_fts, _ = extract_features( |
|
x_dev, nlp, text_options, **vect_options, vectorizer=vectorizer |
|
) |
|
|
|
if not vect_options["embed"]: |
|
print(f"\t\t\tnum features: {len(vectorizer.vocabulary_)}") |
|
else: |
|
assert model_info["type"] != "tree", "Decision tree does not support embedding input" |
|
|
|
print("\t\tTraining the model...") |
|
if model_info["type"] == "svm": |
|
model = SVC(**model_info["options"]) |
|
elif model_info["type"] == "tree": |
|
model = DecisionTreeClassifier() |
|
else: |
|
model = DummyClassifier() |
|
model.fit(x_train_fts, y_train) |
|
|
|
|
|
print("\t\tValidating the model...") |
|
y_dev_pred = model.predict(x_dev_fts) |
|
class_labels = model.classes_ |
|
|
|
p_micro, r_micro, f_micro, _ = precision_recall_fscore_support( |
|
y_dev, y_dev_pred, average="micro" |
|
) |
|
p_classes, r_classes, f_classes, _ = precision_recall_fscore_support( |
|
y_dev, y_dev_pred, average=None, labels=class_labels, zero_division=0 |
|
) |
|
print( |
|
f"\t\t\tOverall scores (micro-averaged):\tP={p_micro}\tR={r_micro}\tF={f_micro}" |
|
) |
|
|
|
scores.append( |
|
{ |
|
"micro": {"p": p_micro, "r": r_micro, "f": f_micro}, |
|
"classes": { |
|
"p": list(zip(class_labels, p_classes)), |
|
"r": list(zip(class_labels, r_classes)), |
|
"f": list(zip(class_labels, f_classes)), |
|
}, |
|
} |
|
) |
|
|
|
prediction_df = pd.DataFrame( |
|
zip(x_dev, y_dev, y_dev_pred), columns=["headline", "gold", "prediction"] |
|
) |
|
prediction_df.to_csv( |
|
f"output/migration/cda_classify/predictions_{attrib}_{s_idx:02}.csv" |
|
) |
|
|
|
with open( |
|
f"output/migration/cda_classify/scores_{attrib}.json", "w", encoding="utf-8" |
|
) as f_scores: |
|
json.dump(scores, f_scores, indent=4) |
|
|
|
|
|
def load_data(attrib): |
|
train_data = pd.read_csv("output/migration/preprocess/annotations_train.csv") |
|
dev_data = pd.read_csv("output/migration/preprocess/annotations_dev.csv") |
|
|
|
x_train = train_data["Titolo"] |
|
x_dev = dev_data["Titolo"] |
|
|
|
if attrib == "cda_frame": |
|
y_train = train_data["frame"] |
|
y_dev = dev_data["frame"] |
|
elif attrib == "riferimento": |
|
y_train = train_data["riferimento"] |
|
y_dev = dev_data["riferimento"] |
|
elif attrib == "orientation": |
|
y_train = train_data["orientation"] |
|
y_dev = dev_data["orientation"] |
|
|
|
|
|
else: |
|
y_train = pd.Series(["true" if "rifugiato" in exa else "false" for exa in x_train]) |
|
y_dev = pd.Series(["true" if "rifugiato" in exa else "false" for exa in x_dev]) |
|
|
|
return x_train, y_train, x_dev, y_dev |
|
|
|
|
|
def extract_features( |
|
headlines, |
|
nlp, |
|
text_options, |
|
embed=False, |
|
min_freq=1, |
|
max_freq=1.0, |
|
ngram_range=(1, 1), |
|
vectorizer=None, |
|
): |
|
|
|
if embed: |
|
vectorized = np.array( |
|
[vec for vec in process_text(headlines, nlp, embed=True, **text_options)] |
|
) |
|
else: |
|
tokenized = [ |
|
" ".join(sent) for sent in process_text(headlines, nlp, **text_options) |
|
] |
|
if vectorizer is None: |
|
vectorizer = CountVectorizer( |
|
lowercase=False, |
|
analyzer="word", |
|
min_df=min_freq, |
|
max_df=max_freq, |
|
ngram_range=ngram_range, |
|
) |
|
vectorized = vectorizer.fit_transform(tokenized) |
|
else: |
|
vectorized = vectorizer.transform(tokenized) |
|
return vectorized, vectorizer |
|
|
|
|
|
def process_text( |
|
headlines, |
|
nlp, |
|
embed=False, |
|
remove_punct=True, |
|
lowercase=True, |
|
lemmatize=False, |
|
remove_stop=False, |
|
): |
|
for sent in headlines: |
|
doc = nlp(sent) |
|
tokens = ( |
|
t |
|
for t in doc |
|
if (not remove_stop or not t.is_stop) |
|
and (not remove_punct or t.pos_ not in ["PUNCT", "SYM", "X"]) |
|
) |
|
if embed: |
|
if lemmatize: |
|
tokens = (t.vocab[t.lemma].vector for t in tokens) |
|
else: |
|
tokens = (t.vector for t in tokens if t.has_vector) |
|
else: |
|
if lemmatize: |
|
tokens = (t.lemma_ for t in tokens) |
|
else: |
|
tokens = (t.text for t in tokens) |
|
|
|
if lowercase: |
|
tokens = (t.lower() for t in tokens) |
|
|
|
if embed: |
|
token_arr = np.array([t for t in tokens]) |
|
if len(token_arr) == 0: |
|
yield np.random.rand(300) |
|
else: |
|
yield np.mean(token_arr, axis=0) |
|
else: |
|
yield list(tokens) |
|
|
|
|
|
if __name__ == "__main__": |
|
use_gpu = True if sys.argv[1] == "gpu" else False |
|
|
|
train(attrib="cda_frame", use_gpu=use_gpu) |
|
|
|
|
|
|