|
import io
|
|
import json
|
|
import os
|
|
import sys
|
|
import argparse
|
|
import re
|
|
import tarfile
|
|
from collections import defaultdict
|
|
import dataclasses
|
|
from datetime import datetime
|
|
from typing import Any, Dict, List, Tuple, Optional
|
|
|
|
import pandas as pd
|
|
import spacy
|
|
from nltk.corpus import framenet as fn
|
|
from nltk.corpus.reader.framenet import FramenetError
|
|
from spacy.tokens import Token
|
|
|
|
from sociofillmore.crashes.utils import is_a_dutch_text
|
|
|
|
ITALIAN_ACTIVE_AUX = ["avere", "ha", "ho", "hai", "avete", "hanno", "abbiamo"]
|
|
DUTCH_ACTIVE_AUX = ["heb", "hebben", "heeft"]
|
|
|
|
active_frames_df = pd.read_csv("resources/active_frames_full.csv")
|
|
ACTIVE_FRAMES = active_frames_df[active_frames_df["active"]]["frame"].tolist()
|
|
|
|
|
|
IGNORE_DEP_LABELS = ["punct"]
|
|
|
|
|
|
|
|
DEEP_FRAMES = [
|
|
"Transitive_action",
|
|
"Causation",
|
|
"Transition_to_a_state",
|
|
"Event",
|
|
"State",
|
|
]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
SYNTAX_ANALYSIS_CACHE_FILES = {
|
|
"femicides/rai": "output/femicides/syntax_cache/rai_ALL",
|
|
"femicides/rai_main": "output/femicides/syntax_cache/rai_main",
|
|
"femicides/rai_ALL": "output/femicides/syntax_cache/rai_ALL",
|
|
"femicides/olv": "output/femicides/syntax_cache/olv",
|
|
"crashes/thecrashes": "output/crashes/syntax_cache/thecrashes",
|
|
"migration/pavia": "output/migration/syntax_cache/pavia",
|
|
}
|
|
|
|
|
|
DEEP_FRAMES_CACHE_FILE = "resources/deep_frame_cache.json"
|
|
|
|
DEP_LABEL_CACHE_FILE = "resources/dep_labels.txt"
|
|
|
|
POSSIBLE_CONSTRUCTIONS = [
|
|
"nonverbal",
|
|
"verbal:active",
|
|
"verbal:impersonal",
|
|
"verbal:reflexive",
|
|
"verbal:passive",
|
|
"verbal:unaccusative",
|
|
"other",
|
|
]
|
|
|
|
|
|
def load_deep_frames_cache():
|
|
if os.path.isfile(DEEP_FRAMES_CACHE_FILE):
|
|
print("Loading deep frame cache...")
|
|
with open(DEEP_FRAMES_CACHE_FILE, encoding="utf-8") as f:
|
|
deep_frames_cache = json.load(f)
|
|
else:
|
|
deep_frames_cache = {}
|
|
return deep_frames_cache
|
|
|
|
|
|
|
|
|
|
|
|
nlp = None
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class AnnotationSpan:
|
|
tokens_idx: List[int]
|
|
tokens_str: List[str]
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class FrameStructure:
|
|
frame: str
|
|
deep_frame: str
|
|
target: Optional[AnnotationSpan]
|
|
roles: List[Tuple[str, AnnotationSpan]]
|
|
deep_roles: List[Tuple[str, AnnotationSpan]]
|
|
|
|
|
|
def make_syntax_cache(dataset, skip_fn=None):
|
|
print(f"make_syntax_cache({dataset})")
|
|
|
|
if dataset == "femicides/rai":
|
|
corpus_tarball = "output/femicides/lome/lome_0shot/multilabel_rai_ALL_blocks"
|
|
corpus = "rai"
|
|
spacy_model = "it_core_news_md"
|
|
elif dataset == "femicides/rai_main":
|
|
corpus_tarball = "output/femicides/lome/lome_0shot/multilabel_rai_main_blocks"
|
|
corpus = "rai_main"
|
|
spacy_model = "it_core_news_md"
|
|
elif dataset == "femicides/rai_ALL":
|
|
corpus_tarball = "output/femicides/lome/lome_0shot/multilabel_rai_ALL_blocks"
|
|
corpus = "rai_ALL"
|
|
spacy_model = "it_core_news_md"
|
|
elif dataset == "femicides/olv":
|
|
corpus_tarball = "output/femicides/lome/lome_0shot/multilabel_olv_blocks"
|
|
corpus = "olv"
|
|
spacy_model = "it_core_news_md"
|
|
elif dataset == "crashes/thecrashes":
|
|
corpus_tarball = "output/crashes/lome/lome_0shot/multilabel_thecrashes_blocks"
|
|
corpus = "thecrashes"
|
|
spacy_model = "nl_core_news_md"
|
|
elif dataset == "migration/pavia":
|
|
corpus_tarball = "output/migration/lome/lome_0shot/multilabel_pavia_blocks"
|
|
|
|
corpus = "pavia"
|
|
spacy_model = "it_core_news_md"
|
|
else:
|
|
raise ValueError("Unsupported dataset!")
|
|
|
|
print("params:")
|
|
print(f"\tcorpus_tarball: {corpus_tarball}")
|
|
print(f"\tcorpus: {corpus}")
|
|
print(f"\tspacy: {spacy_model}")
|
|
|
|
print("processing files...")
|
|
|
|
|
|
for block in os.listdir(corpus_tarball):
|
|
print(block)
|
|
|
|
with tarfile.open(os.path.join(corpus_tarball, block)) as tar_in:
|
|
|
|
|
|
cache_location = SYNTAX_ANALYSIS_CACHE_FILES[dataset]
|
|
if not os.path.isdir(cache_location):
|
|
os.makedirs(cache_location)
|
|
|
|
lome_files = [f for f in tar_in.getmembers(
|
|
) if f.name.endswith(".comm.json")]
|
|
|
|
lome_files.sort(key=lambda file: file.name)
|
|
for file in lome_files:
|
|
print(f"\tprocessing file {file}")
|
|
doc_id = re.search(r"lome_(\d+)\.comm\.json", file.name).group(1)
|
|
|
|
skipped = False
|
|
if skip_fn is not None:
|
|
if skip_fn(doc_id):
|
|
print(f"\t\tskip_fn: skipping file {file}")
|
|
skipped = True
|
|
|
|
if skipped:
|
|
syntax_analyses = None
|
|
else:
|
|
file_obj = io.TextIOWrapper(tar_in.extractfile(file))
|
|
annotations = json.load(file_obj)
|
|
|
|
syntax_analyses = []
|
|
for sentence in annotations:
|
|
syntax_analyses.append(
|
|
syntax_analyze(sentence, spacy_model))
|
|
|
|
|
|
file_key = doc_id[:2]
|
|
cache_file = f"{cache_location}/{file_key}.json"
|
|
if os.path.isfile(cache_file):
|
|
with open(cache_file, encoding="utf-8") as f:
|
|
key_cache = json.load(f)
|
|
else:
|
|
key_cache = {}
|
|
key_cache[doc_id] = syntax_analyses
|
|
with open(cache_file, "w", encoding="utf-8") as f:
|
|
json.dump(key_cache, f)
|
|
|
|
|
|
def make_syntax_cache_key(filename):
|
|
doc_id = re.search(r"/\d+/lome_(\d+)\.comm\.json", filename).group(1)
|
|
return doc_id
|
|
|
|
|
|
def clean_sentence_(sentence):
|
|
idx_to_remove = []
|
|
|
|
for i, tok in enumerate(sentence["tokens"]):
|
|
|
|
|
|
if not tok.strip():
|
|
idx_to_remove.append(i)
|
|
|
|
idx_to_remove.reverse()
|
|
|
|
for idx in idx_to_remove:
|
|
for annotation_list in sentence.values():
|
|
annotation_list.pop(idx)
|
|
|
|
|
|
def process_prediction_file(
|
|
filename: str,
|
|
dataset_name: str,
|
|
syntax_cache: str,
|
|
deep_frames_cache: dict,
|
|
tmp_cache: Optional[dict] = None,
|
|
file_obj: io.TextIOBase = None,
|
|
syntax_cache_key: Optional[str] = None,
|
|
deep_frames_list: Optional[List[str]] = None,
|
|
spacy_model: str = "it_core_news_md",
|
|
spacy_model_obj = None
|
|
) -> Tuple[List, ...]:
|
|
"""
|
|
Process a predictions JSON file
|
|
:param filename: path to the JSON file
|
|
:param syntax_cache: see `make_syntax_cache()`
|
|
:param spacy model: spacy model to be used for syntactic analysis
|
|
:param file_obj: already opened object corresponding to `filename`. If given, `file_obj` will be used instead
|
|
of loading it from `filename`. This is useful when reading the entire corpus from a tarball (which is what the
|
|
SocioFillmore webapp does)
|
|
:return:
|
|
"""
|
|
|
|
print("Processing", filename)
|
|
|
|
if file_obj is not None:
|
|
annotations = json.load(file_obj)
|
|
else:
|
|
with open(filename, encoding="utf-8") as f:
|
|
annotations = json.load(f)
|
|
|
|
if syntax_cache is None:
|
|
syntax_analyses = []
|
|
for sentence in annotations:
|
|
syntax_analyses.append(syntax_analyze(sentence, spacy_model, spacy_model_obj))
|
|
|
|
else:
|
|
if syntax_cache_key is None:
|
|
syntax_cache_key = make_syntax_cache_key(filename)
|
|
|
|
if tmp_cache is not None and syntax_cache_key in tmp_cache:
|
|
syntax_analyses = tmp_cache[syntax_cache_key]
|
|
|
|
else:
|
|
with open(f"{syntax_cache}/{syntax_cache_key[:2]}.json", encoding="utf-8") as cache_file:
|
|
grouped_analyses = json.load(cache_file)
|
|
syntax_analyses = grouped_analyses[syntax_cache_key]
|
|
if tmp_cache is not None:
|
|
tmp_cache.clear()
|
|
tmp_cache.update(grouped_analyses)
|
|
|
|
fn_structures: List[Dict[int, FrameStructure]] = []
|
|
sentences: List[List[str]] = []
|
|
role_analyses: List[Dict[int, Dict[str, str]]] = []
|
|
|
|
for sent_idx, sentence in enumerate(annotations):
|
|
|
|
clean_sentence_(sentence)
|
|
|
|
try:
|
|
sent_structures = process_fn_sentence(
|
|
sentence, deep_frames_cache, deep_frames_list=deep_frames_list
|
|
)
|
|
|
|
|
|
except AttributeError:
|
|
print("Error processing FN annotations")
|
|
sent_structures = {}
|
|
syntax = syntax_analyses[sent_idx]
|
|
|
|
|
|
for fs in sent_structures.values():
|
|
target_idx = str(fs.target.tokens_idx[0])
|
|
if target_idx not in syntax:
|
|
print(
|
|
f"Prediction file {filename}: Cannot find syntactic information for target at idx={target_idx}")
|
|
continue
|
|
fs_syn = syntax[target_idx][-1]
|
|
disambiguate_cxs_(fs, fs_syn)
|
|
|
|
roles = process_syn_sem_roles(sent_structures, syntax)
|
|
role_analyses.append(roles)
|
|
sentences.append(sentence["tokens"])
|
|
fn_structures.append(sent_structures)
|
|
|
|
return sentences, fn_structures, syntax_analyses, role_analyses
|
|
|
|
|
|
def disambiguate_cxs_(struct: FrameStructure, tgt_syntax):
|
|
|
|
cx = tgt_syntax["syn_construction"]
|
|
if not cx.startswith("_"):
|
|
return
|
|
|
|
|
|
|
|
|
|
if struct.deep_frame in ["Transitive_action", "Causation", "Emotion_directed", "Quarreling", "Impact", "Committing_crime"]:
|
|
frame_agentivity_type = "active"
|
|
elif struct.frame in ACTIVE_FRAMES:
|
|
frame_agentivity_type = "active"
|
|
elif struct.frame == "Event":
|
|
frame_agentivity_type = "impersonal"
|
|
else:
|
|
frame_agentivity_type = "unaccusative"
|
|
|
|
if cx == "_verbal:ACTIVE":
|
|
new_cx = f"verbal:{frame_agentivity_type}"
|
|
elif cx in ["_verbal:ADPOS", "_verbal:OTH_PART"]:
|
|
if frame_agentivity_type == "active":
|
|
new_cx = "verbal:passive"
|
|
else:
|
|
new_cx = f"verbal:{frame_agentivity_type}"
|
|
else:
|
|
raise ValueError(f"Unknown construction placeholder {cx}")
|
|
|
|
tgt_syntax["syn_construction"] = new_cx
|
|
|
|
|
|
def find_governed_roles(
|
|
syn_self: Dict[str, Any],
|
|
syn_children: List[Dict[str, Any]],
|
|
roles: List[Tuple[str, AnnotationSpan]],
|
|
) -> Dict[str, str]:
|
|
|
|
roles_found = {}
|
|
|
|
|
|
for node in [syn_self] + syn_children:
|
|
for role_name, role_span in roles:
|
|
if node["lome_idx"] in role_span.tokens_idx:
|
|
dep_label = node["dependency"]
|
|
if role_name not in roles_found and dep_label not in IGNORE_DEP_LABELS:
|
|
if node == syn_self:
|
|
roles_found[role_name] = None
|
|
else:
|
|
roles_found[role_name] = dep_label + "↓"
|
|
return roles_found
|
|
|
|
|
|
def analyze_role_dependencies(
|
|
fn_struct,
|
|
syntax,
|
|
role_analysis=None,
|
|
tgt_idx=None,
|
|
min_depth=-10,
|
|
max_depth=10,
|
|
depth=0,
|
|
label_prefix="",
|
|
):
|
|
|
|
if role_analysis is None:
|
|
role_analysis = {}
|
|
|
|
if tgt_idx is None:
|
|
tgt_idx = fn_struct.target.tokens_idx[0]
|
|
|
|
if depth > max_depth:
|
|
return role_analysis
|
|
|
|
if depth < min_depth:
|
|
return role_analysis
|
|
|
|
new_analysis = {}
|
|
new_analysis.update(role_analysis)
|
|
token_syntax = syntax[str(tgt_idx)][0]
|
|
|
|
def update_analysis(mapping):
|
|
for role, dep in mapping.items():
|
|
if role not in new_analysis:
|
|
if label_prefix:
|
|
if dep is None:
|
|
label = label_prefix
|
|
depth_label = depth
|
|
else:
|
|
label = label_prefix + "--" + dep
|
|
depth_label = depth + 1 if depth > 0 else depth - 1
|
|
else:
|
|
if dep is None:
|
|
label = "⋆"
|
|
depth_label = depth
|
|
else:
|
|
label = dep
|
|
depth_label = depth + 1 if depth > 0 else depth - 1
|
|
new_analysis[role] = label, depth_label
|
|
|
|
update_analysis(
|
|
find_governed_roles(
|
|
token_syntax, token_syntax["children"], fn_struct.roles)
|
|
)
|
|
|
|
|
|
if depth <= 0:
|
|
for child in token_syntax["children"]:
|
|
child_analysis = analyze_role_dependencies(
|
|
fn_struct,
|
|
syntax,
|
|
role_analysis=new_analysis,
|
|
tgt_idx=child["lome_idx"],
|
|
max_depth=max_depth,
|
|
min_depth=min_depth,
|
|
depth=depth - 1,
|
|
label_prefix=child["dependency"] + "↓"
|
|
)
|
|
new_analysis.update(child_analysis)
|
|
|
|
|
|
if depth >= 0:
|
|
if not token_syntax["ancestors"]:
|
|
return new_analysis
|
|
|
|
first_ancestor = token_syntax["ancestors"][0]
|
|
return analyze_role_dependencies(
|
|
fn_struct,
|
|
syntax,
|
|
role_analysis=new_analysis,
|
|
tgt_idx=first_ancestor["lome_idx"],
|
|
max_depth=max_depth,
|
|
min_depth=min_depth,
|
|
depth=depth + 1,
|
|
label_prefix=token_syntax["dependency"] + "↑",
|
|
)
|
|
|
|
else:
|
|
return new_analysis
|
|
|
|
|
|
def process_syn_sem_roles(
|
|
sent_structures: Dict[int, FrameStructure], syntax: Dict[str, List[Dict[str, Any]]]
|
|
) -> Dict[int, Dict[str, str]]:
|
|
|
|
analyses = defaultdict(dict)
|
|
|
|
for struct in sent_structures.values():
|
|
tgt_idx = struct.target.tokens_idx[0]
|
|
role_deps = analyze_role_dependencies(struct, syntax, max_depth=10)
|
|
analyses[tgt_idx] = clean_role_deps(role_deps)
|
|
return analyses
|
|
|
|
|
|
def clean_role_deps(role_deps):
|
|
res = {}
|
|
for role, (dep_str, depth) in role_deps.items():
|
|
dep_parts = dep_str.split("--")
|
|
if len(dep_parts) == 1:
|
|
res[role] = dep_str, depth
|
|
else:
|
|
res[role] = "--".join([dp[-1]
|
|
for dp in dep_parts[:-1]] + [dep_parts[-1]]), depth
|
|
return res
|
|
|
|
|
|
def map_or_lookup_deep_frame(
|
|
frame: str, deep_frames_cache, save_modified_cache=False, deep_frames_list=None
|
|
) -> Tuple[str, Dict[str, str]]:
|
|
if frame in deep_frames_cache:
|
|
return deep_frames_cache[frame]
|
|
else:
|
|
deep_frame, mapping = map_to_deep_frame(
|
|
frame, deep_frames_list=deep_frames_list
|
|
)
|
|
deep_frames_cache[frame] = [deep_frame, mapping]
|
|
if save_modified_cache:
|
|
with open(DEEP_FRAMES_CACHE_FILE, "w", encoding="utf-8") as f:
|
|
json.dump(deep_frames_cache, f)
|
|
return deep_frames_cache[frame]
|
|
|
|
|
|
def map_to_deep_frame(
|
|
frame: str,
|
|
target: Optional[str] = None,
|
|
mapping: Optional[Dict[str, str]] = None,
|
|
self_mapping: Optional[Dict[str, str]] = None,
|
|
deep_frames_list: Optional[List[str]] = None,
|
|
) -> Tuple[str, Dict[str, str]]:
|
|
|
|
if deep_frames_list is None:
|
|
deep_frames_list = DEEP_FRAMES
|
|
|
|
|
|
try:
|
|
fn_entry = fn.frame(frame)
|
|
except FramenetError:
|
|
return frame, {}
|
|
except LookupError:
|
|
return frame, {}
|
|
|
|
|
|
if target is None:
|
|
target = frame
|
|
if mapping is None or self_mapping is None:
|
|
mapping = self_mapping = {role: role for role in fn_entry.FE.keys()}
|
|
|
|
|
|
if frame in deep_frames_list:
|
|
return frame, mapping
|
|
|
|
|
|
inh_relations = [
|
|
fr
|
|
for fr in fn_entry.frameRelations
|
|
if fr.type.name == "Inheritance" and fr.Child == fn_entry
|
|
]
|
|
parents = [fr.Parent for fr in inh_relations]
|
|
|
|
|
|
if not inh_relations:
|
|
return target, self_mapping
|
|
|
|
|
|
if len(inh_relations) == 1:
|
|
parent_rel = inh_relations[0]
|
|
parent = parents[0]
|
|
new_mapping = define_fe_mapping(mapping, parent_rel)
|
|
return map_to_deep_frame(
|
|
parent.name, target, new_mapping, self_mapping, deep_frames_list
|
|
)
|
|
|
|
|
|
deep_frames = []
|
|
deep_mappings = []
|
|
for parent_rel, parent in zip(inh_relations, parents):
|
|
new_mapping = define_fe_mapping(mapping, parent_rel)
|
|
final_frame, final_mapping = map_to_deep_frame(
|
|
parent.name, target, new_mapping, self_mapping, deep_frames_list
|
|
)
|
|
if final_frame in deep_frames_list:
|
|
deep_frames.append(final_frame)
|
|
deep_mappings.append(final_mapping)
|
|
|
|
for deep_frame in deep_frames_list:
|
|
if deep_frame in deep_frames:
|
|
idx = deep_frames.index(deep_frame)
|
|
return deep_frame, deep_mappings[idx]
|
|
|
|
|
|
return target, self_mapping
|
|
|
|
|
|
def define_fe_mapping(mapping, parent_rel):
|
|
child_to_parent_mapping = {
|
|
fer.subFEName: fer.superFEName for fer in parent_rel.feRelations
|
|
}
|
|
target_to_parent_mapping = {
|
|
role: child_to_parent_mapping[mapping[role]]
|
|
for role in mapping
|
|
if mapping[role] in child_to_parent_mapping
|
|
}
|
|
return target_to_parent_mapping
|
|
|
|
|
|
def is_at_root(syntax_info):
|
|
|
|
|
|
if syntax_info["dependency"] == "ROOT":
|
|
return True
|
|
|
|
|
|
if syntax_info["dependency"] == "nsubj" and syntax_info["ancestors"][0]["dependency"] == "ROOT":
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
def get_tarball_blocks(dataset, lome_model="lome_0shot"):
|
|
if dataset == "femicides/rai":
|
|
return f"output/femicides/lome/{lome_model}/multilabel_rai_ALL_blocks"
|
|
if dataset == "femicides/rai_main":
|
|
return f"output/femicides/lome/{lome_model}/multilabel_rai_main_blocks"
|
|
elif dataset == "femicides/olv":
|
|
return f"output/femicides/lome/{lome_model}/multilabel_olv_blocks"
|
|
elif dataset == "crashes/thecrashes":
|
|
return f"output/crashes/lome/{lome_model}/multilabel_thecrashes_blocks"
|
|
elif dataset == "migration/pavia":
|
|
return f"output/migration/lome/{lome_model}/multilabel_pavia_blocks"
|
|
else:
|
|
raise ValueError("Unsupported dataset!")
|
|
|
|
|
|
def analyze_single_document(doc_id, event_id, lome_model, dataset, texts_df, deep_frames_cache):
|
|
data_domain, data_corpus = dataset.split("/")
|
|
|
|
syntax_cache = SYNTAX_ANALYSIS_CACHE_FILES[dataset]
|
|
|
|
print(dataset)
|
|
|
|
if dataset == "migration/pavia":
|
|
pred_file_path = f"output/migration/lome/multilabel/{lome_model}/pavia/{event_id}/lome_{doc_id}.comm.json"
|
|
elif dataset == "femicides/olv":
|
|
pred_file_path = f"output/femicides/lome/lome_0shot/multilabel/olv/{event_id}/lome_{doc_id}.comm.json"
|
|
elif dataset == "femicides/rai":
|
|
pred_file_path = f"output/{data_domain}/lome/lome_0shot/multilabel/rai_ALL/{event_id}/lome_{doc_id}.comm.json"
|
|
else:
|
|
pred_file_path = f"output/{data_domain}/lome/lome_0shot/multilabel/{data_corpus}/{event_id}/lome_{doc_id}.comm.json"
|
|
print(f"Analyzing file {pred_file_path}")
|
|
|
|
doc_id = os.path.basename(pred_file_path).split(".")[0].split("_")[1]
|
|
doc_key = doc_id[:2]
|
|
tarball = get_tarball_blocks(dataset, lome_model) + f"/block_{doc_key}.tar"
|
|
with tarfile.open(tarball, "r") as tar_f:
|
|
pred_file = io.TextIOWrapper(tar_f.extractfile(pred_file_path))
|
|
|
|
(
|
|
sents,
|
|
pred_structures,
|
|
syntax_analyses,
|
|
role_analyses,
|
|
) = process_prediction_file(
|
|
filename=pred_file_path,
|
|
dataset_name=dataset,
|
|
file_obj=pred_file,
|
|
syntax_cache=syntax_cache,
|
|
deep_frames_cache=deep_frames_cache
|
|
)
|
|
output = []
|
|
for sent, structs, syntax, roles in zip(
|
|
sents, pred_structures, syntax_analyses, role_analyses
|
|
):
|
|
output.append(
|
|
{
|
|
"sentence": sent,
|
|
"fn_structures": [
|
|
dataclasses.asdict(fs) for fs in structs.values()
|
|
],
|
|
"syntax": syntax,
|
|
"roles": roles,
|
|
"meta": {
|
|
"event_id": event_id,
|
|
"doc_id": doc_id,
|
|
"text_meta": get_text_meta(doc_id, texts_df),
|
|
},
|
|
}
|
|
)
|
|
return output
|
|
|
|
|
|
def get_text_meta(doc_id, texts_df):
|
|
row = texts_df[texts_df["text_id"] == int(doc_id)].iloc[0]
|
|
if "pubdate" in row:
|
|
pubdate = row["pubdate"] if not pd.isna(row["pubdate"]) else None
|
|
elif "pubyear" in row:
|
|
pubdate = int(row["pubyear"])
|
|
else:
|
|
pubdate = None
|
|
return {
|
|
"url": row["url"] if "url" in row else None,
|
|
"pubdate": pubdate,
|
|
"provider": row["provider"],
|
|
"title": row["title"] if not pd.isna(row["title"]) else None,
|
|
"days_after_event": int(row["days_after_event"]) if "days_after_event" in row and not pd.isna(row["days_after_event"]) else 0
|
|
}
|
|
|
|
|
|
def process_fn_sentence(
|
|
sentence, deep_frames_cache, post_process=True, deep_frames_list=None
|
|
):
|
|
|
|
sent_structures: Dict[int, FrameStructure] = {}
|
|
|
|
|
|
cur_spans: Dict[Tuple[int, str]] = {}
|
|
for token_idx, (token_str, frame_annos) in enumerate(
|
|
zip(sentence["tokens"], sentence["frame_list"])
|
|
):
|
|
for fa in frame_annos:
|
|
|
|
if "@@VIRTUAL_ROOT@@" in fa:
|
|
continue
|
|
fa = fa.split("@@")[0]
|
|
anno, struct_id_str = fa.split("@")
|
|
struct_id = int(struct_id_str)
|
|
frame_name = anno.split(":")[1]
|
|
deep_frame, deep_frame_mapping = map_or_lookup_deep_frame(
|
|
frame_name, deep_frames_cache, deep_frames_list=deep_frames_list
|
|
)
|
|
if struct_id not in sent_structures:
|
|
sent_structures[struct_id] = FrameStructure(
|
|
frame=frame_name,
|
|
deep_frame=deep_frame,
|
|
target=None,
|
|
roles=[],
|
|
deep_roles=[],
|
|
)
|
|
cur_struct = sent_structures[struct_id]
|
|
|
|
|
|
anno = anno.replace("I::", "I:")
|
|
anno = anno.replace("B::", "B:")
|
|
|
|
if anno.split(":")[0] == "T":
|
|
if cur_struct.target is None:
|
|
cur_struct.target = AnnotationSpan(
|
|
[token_idx], [token_str])
|
|
else:
|
|
cur_struct.target.tokens_idx.append(token_idx)
|
|
cur_struct.target.tokens_str.append(token_str)
|
|
elif anno.split(":")[0] == "B":
|
|
role_name = anno.split(":")[2]
|
|
role_span = AnnotationSpan([token_idx], [token_str])
|
|
cur_struct.roles.append((role_name, role_span))
|
|
if role_name in deep_frame_mapping:
|
|
cur_struct.deep_roles.append(
|
|
(deep_frame_mapping[role_name], role_span)
|
|
)
|
|
cur_spans[(struct_id, role_name)] = role_span
|
|
elif anno.split(":")[0] == "I":
|
|
role_name = anno.split(":")[2]
|
|
role_span = cur_spans[(struct_id, role_name)]
|
|
role_span.tokens_str.append(token_str)
|
|
role_span.tokens_idx.append(token_idx)
|
|
|
|
|
|
if post_process:
|
|
for fs in sent_structures.values():
|
|
if len(fs.target.tokens_str) > 1:
|
|
target_tok_str_to_remove = []
|
|
target_tok_idx_to_remove = []
|
|
for tok_str, tok_idx in zip(fs.target.tokens_str, fs.target.tokens_idx):
|
|
if tok_str in ["``", "''", "`", "'", ".", ",", ";", ":"]:
|
|
target_tok_str_to_remove.append(tok_str)
|
|
target_tok_idx_to_remove.append(tok_idx)
|
|
for tok_str, tok_idx in zip(
|
|
target_tok_str_to_remove, target_tok_idx_to_remove
|
|
):
|
|
fs.target.tokens_str.remove(tok_str)
|
|
fs.target.tokens_idx.remove(tok_idx)
|
|
|
|
return sent_structures
|
|
|
|
|
|
def map_back_spacy_lome_tokens(spacy_doc, lome_tokens):
|
|
if len(lome_tokens) > len(spacy_doc):
|
|
raise ValueError(
|
|
f"Cannot re-tokenize (#lome={len(lome_tokens)} // #spacy={len(spacy_doc)})"
|
|
)
|
|
|
|
spacy_to_lome = {}
|
|
lome_idx = 0
|
|
for spacy_idx, spacy_token in enumerate(spacy_doc):
|
|
spacy_to_lome[spacy_idx] = lome_idx
|
|
|
|
|
|
if spacy_token.whitespace_:
|
|
lome_idx += 1
|
|
return spacy_to_lome
|
|
|
|
|
|
def get_syn_category(spacy_token):
|
|
if spacy_token.pos_ == "NOUN":
|
|
return "n"
|
|
if spacy_token.pos_ == "ADJ":
|
|
return "adj"
|
|
if spacy_token.pos_ == "ADV":
|
|
return "adv"
|
|
if spacy_token.pos_ == "ADP":
|
|
return "p"
|
|
if spacy_token.pos_ == "VERB":
|
|
if spacy_token.morph.get("VerbForm") == ["Fin"]:
|
|
return "v:fin"
|
|
if spacy_token.morph.get("VerbForm") == ["Part"]:
|
|
return "v:part"
|
|
if spacy_token.morph.get("VerbForm") == ["Ger"]:
|
|
return "v:ger"
|
|
if spacy_token.morph.get("VerbForm") == ["Inf"]:
|
|
return "v:inf"
|
|
return "other"
|
|
|
|
|
|
def syntax_analyze(sentence, spacy_model_name, spacy_model_obj=None) -> Dict[str, Dict[str, Any]]:
|
|
lome_tokens = sentence["tokens"]
|
|
|
|
|
|
|
|
if spacy_model_obj is not None:
|
|
nlp = spacy_model_obj
|
|
else:
|
|
nlp = spacy.load(spacy_model_name)
|
|
|
|
spacy_doc = nlp(" ".join(lome_tokens))
|
|
analysis = defaultdict(list)
|
|
spacy_to_lome_tokens = map_back_spacy_lome_tokens(spacy_doc, lome_tokens)
|
|
for spacy_idx, token in enumerate(spacy_doc):
|
|
lome_idx = spacy_to_lome_tokens[spacy_idx]
|
|
syn_category = get_syn_category(token)
|
|
syn_construction = get_syn_construction(token, syn_category)
|
|
children = []
|
|
for c in token.children:
|
|
children.append(
|
|
{
|
|
"token": c.text,
|
|
"spacy_idx": c.i,
|
|
"lome_idx": spacy_to_lome_tokens[c.i],
|
|
"syn_category": get_syn_category(c),
|
|
"dependency": c.dep_,
|
|
}
|
|
)
|
|
ancestors = []
|
|
for a in token.ancestors:
|
|
ancestors.append(
|
|
{
|
|
"token": a.text,
|
|
"spacy_idx": a.i,
|
|
"lome_idx": spacy_to_lome_tokens[a.i],
|
|
"syn_category": get_syn_category(a),
|
|
"dependency": a.dep_,
|
|
}
|
|
)
|
|
|
|
|
|
lome_key = str(lome_idx)
|
|
analysis[lome_key].append(
|
|
{
|
|
"token": token.text,
|
|
"dependency": token.dep_,
|
|
"spacy_idx": spacy_idx,
|
|
"lome_idx": lome_idx,
|
|
"syn_category": syn_category,
|
|
"syn_construction": syn_construction,
|
|
"children": children,
|
|
"ancestors": ancestors,
|
|
}
|
|
)
|
|
return analysis
|
|
|
|
|
|
def get_syn_construction(token: Token, syn_category: str) -> str:
|
|
if syn_category in ["n", "adj", "adv", "p"]:
|
|
return "nonverbal"
|
|
|
|
if syn_category.startswith("v:"):
|
|
|
|
for c in token.children:
|
|
if c.lemma_.lower() in ["si", "zich", "zichzelf"]:
|
|
return "verbal:reflexive"
|
|
|
|
|
|
for c in token.children:
|
|
if c.dep_ == "expl":
|
|
return "verbal:impersonal"
|
|
|
|
|
|
if syn_category in ["v:fin", "v:ger", "v:inf"]:
|
|
return "_verbal:ACTIVE"
|
|
|
|
if syn_category == "v:part":
|
|
|
|
if token.dep_ == "acl":
|
|
return "_verbal:ADPOS"
|
|
|
|
for c in token.children:
|
|
|
|
|
|
if c.dep_ in ["nsubj:pass", "aux:pass"]:
|
|
return "verbal:passive"
|
|
|
|
|
|
if (
|
|
c.dep_ == "aux"
|
|
and c.lemma_.lower() in ITALIAN_ACTIVE_AUX + DUTCH_ACTIVE_AUX
|
|
):
|
|
return "verbal:active"
|
|
|
|
return "_verbal:OTH_PART"
|
|
|
|
return "other"
|
|
|
|
|
|
def get_syntax_info(struct: FrameStructure, syntax: Dict) -> Dict:
|
|
target_idx = str(struct.target.tokens_idx[0])
|
|
|
|
syntax_for_target = syntax[target_idx]
|
|
return syntax_for_target[-1]
|
|
|
|
|
|
def enrich_texts_df(texts_df: pd.DataFrame, events_df: pd.DataFrame):
|
|
time_delta_rows: List[Optional[int]] = []
|
|
for idx, text_row in texts_df.iterrows():
|
|
try:
|
|
event_row = events_df[events_df["event:id"]
|
|
== text_row["event_id"]].iloc[0]
|
|
except IndexError:
|
|
print(f"Skipping {idx} (IndexError)")
|
|
time_delta_rows.append(None)
|
|
if "pubdate" not in text_row or pd.isna(text_row["pubdate"]) or pd.isna(event_row["event:date"]):
|
|
time_delta_rows.append(None)
|
|
else:
|
|
try:
|
|
pub_date = datetime.strptime(
|
|
text_row["pubdate"], "%Y-%m-%d %H:%M:%S")
|
|
event_date = datetime.strptime(
|
|
event_row["event:date"], "%Y-%m-%d")
|
|
time_delta = pub_date - event_date
|
|
time_delta_days = time_delta.days
|
|
time_delta_rows.append(time_delta_days)
|
|
except ValueError as e:
|
|
print(
|
|
f"\t\terror parsing dates, see below for more info:\n\t\t{e}")
|
|
time_delta_rows.append(None)
|
|
|
|
return texts_df.assign(days_after_event=time_delta_rows)
|
|
|
|
|
|
def read_frames_of_interest(dataset) -> List[str]:
|
|
if dataset in ["femicides/rai", "femicides/olv"]:
|
|
file = "resources/femicide_frame_list.txt"
|
|
elif dataset == "crashes/thecrashes":
|
|
file = "resources/crashes_frame_list.txt"
|
|
elif dataset == "migration/pavia":
|
|
file = "resources/migration_frame_list.txt"
|
|
else:
|
|
raise ValueError("Unsupported dataset")
|
|
|
|
frames = set()
|
|
with open(file, encoding="utf-8") as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if line.startswith("#") or not line:
|
|
continue
|
|
frames.add(line[0].upper() + line[1:].lower())
|
|
return sorted(frames)
|
|
|
|
|
|
def make_dep_label_cache():
|
|
|
|
labels = set()
|
|
|
|
for dataset in ["femicides/rai", "crashes/thecrashes", "migration/pavia"]:
|
|
|
|
tarball = (
|
|
"output/femicides/lome/lome_0shot/multilabel_rai.tar.gz"
|
|
if dataset == "femicides/rai"
|
|
else "output/crashes/lome/lome_0shot/multilabel_thecrashes.tar.gz"
|
|
if dataset == "crashes/thecrashes"
|
|
else "output/migration/lome/lome_0shot/multilabel_pavia.tar.gz"
|
|
)
|
|
|
|
spacy_model = (
|
|
"it_core_news_md" if dataset["femicides/rai",
|
|
"migration/pavia"] else "nl_core_news_md"
|
|
)
|
|
|
|
deep_frames_cache = load_deep_frames_cache(dataset)
|
|
syntax_cache = SYNTAX_ANALYSIS_CACHE_FILES[dataset]
|
|
|
|
with tarfile.open(tarball, "r:gz") as tar_f:
|
|
for mem in [
|
|
m.name for m in tar_f.getmembers() if m.name.endswith(".comm.json")
|
|
]:
|
|
if mem is None:
|
|
continue
|
|
|
|
print(mem)
|
|
mem_obj = io.TextIOWrapper(tar_f.extractfile(mem))
|
|
(_, _, _, role_analyses,) = process_prediction_file(
|
|
filename=mem,
|
|
dataset_name=dataset,
|
|
file_obj=mem_obj,
|
|
syntax_cache=syntax_cache,
|
|
deep_frames_cache=deep_frames_cache,
|
|
spacy_model=spacy_model,
|
|
)
|
|
if role_analyses is None:
|
|
print(f"\tSkipping file {mem}, no role analyses found")
|
|
continue
|
|
for sent_ra in role_analyses:
|
|
for ra in sent_ra.values():
|
|
for dep, _ in ra.values():
|
|
labels.add(dep)
|
|
with open(DEP_LABEL_CACHE_FILE, "w", encoding="utf-8") as f_out:
|
|
for label in sorted(labels):
|
|
f_out.write(label + os.linesep)
|
|
|
|
|
|
def analyze_external_file(file_in, file_out, spacy_model):
|
|
deep_frames_cache = load_deep_frames_cache()
|
|
(
|
|
sents,
|
|
pred_structures,
|
|
syntax_analyses,
|
|
role_analyses,
|
|
) = process_prediction_file(file_in, "", None, deep_frames_cache, spacy_model_obj=spacy_model)
|
|
output = []
|
|
for sent, structs, syntax, roles in zip(
|
|
sents, pred_structures, syntax_analyses, role_analyses
|
|
):
|
|
output.append(
|
|
{
|
|
"sentence": sent,
|
|
"fn_structures": [
|
|
dataclasses.asdict(fs) for fs in structs.values()
|
|
],
|
|
"syntax": syntax,
|
|
"roles": roles
|
|
}
|
|
)
|
|
with open(file_out, "w", encoding="utf-8") as f_out:
|
|
json.dump(output, f_out, indent=4)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument("command", choices=[
|
|
"make_syntax_cache", "make_dep_label_cache", "analyze_file"
|
|
])
|
|
ap.add_argument("dataset", choices=["femicides/rai", "femicides/rai_main", "femicides/rai_ALL",
|
|
"femicides/olv", "crashes/thecrashes", "migration/pavia", "*"])
|
|
ap.add_argument("--input_file", type=str, default="")
|
|
ap.add_argument("--output_file", type=str, default="")
|
|
args = ap.parse_args()
|
|
|
|
if args.command == "make_syntax_cache":
|
|
|
|
if args.dataset == "*":
|
|
raise ValueError(
|
|
"Please specificy a dataset for `make_syntax_cache`")
|
|
|
|
if args.dataset == "crashes/thecrashes":
|
|
make_syntax_cache(
|
|
"crashes/thecrashes", skip_fn=lambda f: not is_a_dutch_text(f)
|
|
)
|
|
elif args.dataset == "femicides/rai":
|
|
make_syntax_cache("femicides/rai")
|
|
elif args.dataset == "femicides/rai_main":
|
|
make_syntax_cache("femicides/rai_main")
|
|
elif args.dataset == "femicides/rai_ALL":
|
|
make_syntax_cache("femicides/rai_ALL")
|
|
elif args.dataset == "femicides/olv":
|
|
make_syntax_cache("femicides/olv")
|
|
else:
|
|
make_syntax_cache("migration/pavia")
|
|
|
|
elif args.command == "make_dep_label_cache":
|
|
make_dep_label_cache()
|
|
|
|
elif args.command == "analyze_file":
|
|
analyze_external_file(args.input_file, args.output_file)
|
|
|
|
|
|
|
|
|