transcript-analysis / handler.py
ikarasz's picture
feat/extend-math-words-list (#1)
4e928fa
from typing import Dict, List, Any
from scipy.special import softmax
import numpy as np
import weakref
import re
import nltk
from nltk.corpus import stopwords
nltk.download('stopwords')
from utils import clean_str, clean_str_nopunct
import torch
from utils import MultiHeadModel, BertInputBuilder, get_num_words, MATH_PREFIXES, MATH_WORDS
import transformers
from transformers import BertTokenizer, BertForSequenceClassification
from transformers.utils import logging
transformers.logging.set_verbosity_debug()
UPTAKE_MODEL = 'ddemszky/uptake-model'
REASONING_MODEL = 'ddemszky/student-reasoning'
QUESTION_MODEL = 'ddemszky/question-detection'
FOCUSING_QUESTION_MODEL = 'ddemszky/focusing-questions'
class Utterance:
def __init__(self, speaker, text, uid=None,
transcript=None, starttime=None, endtime=None, **kwargs):
self.speaker = speaker
self.text = text
self.uid = uid
self.starttime = starttime
self.endtime = endtime
self.transcript = weakref.ref(transcript) if transcript else None
self.props = kwargs
self.role = None
self.word_count = self.get_num_words()
self.timestamp = [starttime, endtime]
if starttime is not None and endtime is not None:
self.unit_measure = endtime - starttime
else:
self.unit_measure = None
self.aggregate_unit_measure = endtime
self.num_math_terms = None
self.math_terms = None
# moments
self.uptake = None
self.reasoning = None
self.question = None
self.focusing_question = None
def get_clean_text(self, remove_punct=False):
if remove_punct:
return clean_str_nopunct(self.text)
return clean_str(self.text)
def get_num_words(self):
return get_num_words(self.text)
def to_dict(self):
return {
'speaker': self.speaker,
'text': self.text,
'uid': self.uid,
'starttime': self.starttime,
'endtime': self.endtime,
'uptake': self.uptake,
'reasoning': self.reasoning,
'question': self.question,
'focusingQuestion': self.focusing_question,
'numMathTerms': self.num_math_terms,
'mathTerms': self.math_terms,
**self.props
}
def to_talk_timeline_dict(self):
return{
'speaker': self.speaker,
'text': self.text,
'uid': self.uid,
'role': self.role,
'timestamp': self.timestamp,
'moments': {'reasoning': True if self.reasoning else False, 'questioning': True if self.question else False, 'uptake': True if self.uptake else False, 'focusingQuestion': True if self.focusing_question else False},
'unitMeasure': self.unit_measure,
'aggregateUnitMeasure': self.aggregate_unit_measure,
'wordCount': self.word_count,
'numMathTerms': self.num_math_terms,
'mathTerms': self.math_terms,
}
def __repr__(self):
return f"Utterance(speaker='{self.speaker}'," \
f"text='{self.text}', uid={self.uid}," \
f"starttime={self.starttime}, endtime={self.endtime}, props={self.props})"
class Transcript:
def __init__(self, **kwargs):
self.utterances = []
self.params = kwargs
def add_utterance(self, utterance):
utterance.transcript = weakref.ref(self)
self.utterances.append(utterance)
def get_idx(self, idx):
if idx >= len(self.utterances):
return None
return self.utterances[idx]
def get_uid(self, uid):
for utt in self.utterances:
if utt.uid == uid:
return utt
return None
def length(self):
return len(self.utterances)
def update_utterance_roles(self, uptake_speaker):
for utt in self.utterances:
if (utt.speaker == uptake_speaker):
utt.role = 'teacher'
else:
utt.role = 'student'
def get_talk_distribution_and_length(self, uptake_speaker):
if ((uptake_speaker is None)):
return None
teacher_words = 0
teacher_utt_count = 0
student_words = 0
student_utt_count = 0
for utt in self.utterances:
if (utt.speaker == uptake_speaker):
utt.role = 'teacher'
teacher_words += utt.get_num_words()
teacher_utt_count += 1
else:
utt.role = 'student'
student_words += utt.get_num_words()
student_utt_count += 1
if teacher_words + student_words > 0:
teacher_percentage = round(
(teacher_words / (teacher_words + student_words)) * 100)
student_percentage = 100 - teacher_percentage
else:
teacher_percentage = student_percentage = 0
avg_teacher_length = teacher_words / teacher_utt_count if teacher_utt_count > 0 else 0
avg_student_length = student_words / student_utt_count if student_utt_count > 0 else 0
return {'teacher': teacher_percentage, 'student': student_percentage}, {'teacher': avg_teacher_length, 'student': avg_student_length}
def get_word_clouds(self):
teacher_dict = {}
student_dict = {}
uptake_teacher_dict = {}
stop_words = stopwords.words('english')
for utt in self.utterances:
words = (utt.get_clean_text(remove_punct=True)).split(' ')
for word in words:
if word in stop_words or word in ['inaudible', 'crosstalk']: continue
# handle uptake case
if utt.role == 'teacher':
if utt.uptake == 1:
if word not in uptake_teacher_dict:
uptake_teacher_dict[word] = 0
uptake_teacher_dict[word] += 1
# ignore math words so they don't get tagged as general
if any(math_word in word for math_word in utt.math_terms): continue
if utt.role == 'teacher':
if word not in teacher_dict:
teacher_dict[word] = 0
teacher_dict[word] += 1
else:
if word not in student_dict:
student_dict[word] = 0
student_dict[word] += 1
dict_list = []
uptake_dict_list = []
teacher_dict_list = []
student_dict_list = []
for word in uptake_teacher_dict.keys():
uptake_dict_list.append({'text': word, 'value': uptake_teacher_dict[word], 'category': 'teacher'})
for word in teacher_dict.keys():
teacher_dict_list.append(
{'text': word, 'value': teacher_dict[word], 'category': 'general'})
dict_list.append({'text': word, 'value': teacher_dict[word], 'category': 'general'})
for word in student_dict.keys():
student_dict_list.append(
{'text': word, 'value': student_dict[word], 'category': 'general'})
dict_list.append({'text': word, 'value': student_dict[word], 'category': 'general'})
sorted_dict_list = sorted(dict_list, key=lambda x: x['value'], reverse=True)
sorted_uptake_dict_list = sorted(uptake_dict_list, key=lambda x: x['value'], reverse=True)
sorted_teacher_dict_list = sorted(teacher_dict_list, key=lambda x: x['value'], reverse=True)
sorted_student_dict_list = sorted(student_dict_list, key=lambda x: x['value'], reverse=True)
return sorted_dict_list[:50], sorted_uptake_dict_list[:50], sorted_teacher_dict_list[:50], sorted_student_dict_list[:50]
def get_talk_timeline(self):
return [utterance.to_talk_timeline_dict() for utterance in self.utterances]
def calculate_aggregate_word_count(self):
unit_measures = [utt.unit_measure for utt in self.utterances]
if None in unit_measures:
aggregate_word_count = 0
for utt in self.utterances:
aggregate_word_count += utt.get_num_words()
utt.unit_measure = utt.get_num_words()
utt.aggregate_unit_measure = aggregate_word_count
def to_dict(self):
return {
'utterances': [utterance.to_dict() for utterance in self.utterances],
**self.params
}
def __repr__(self):
return f"Transcript(utterances={self.utterances}, custom_params={self.params})"
class QuestionModel:
def __init__(self, device, tokenizer, input_builder, max_length=300, path=QUESTION_MODEL):
print("Loading models...")
self.device = device
self.tokenizer = tokenizer
self.input_builder = input_builder
self.max_length = max_length
self.model = MultiHeadModel.from_pretrained(
path, head2size={"is_question": 2})
self.model.to(self.device)
def run_inference(self, transcript):
self.model.eval()
with torch.no_grad():
for i, utt in enumerate(transcript.utterances):
if "?" in utt.text:
utt.question = 1
else:
text = utt.get_clean_text(remove_punct=True)
instance = self.input_builder.build_inputs([], text,
max_length=self.max_length,
input_str=True)
output = self.get_prediction(instance)
# print(output)
utt.question = np.argmax(
output["is_question_logits"][0].tolist())
def get_prediction(self, instance):
instance["attention_mask"] = [[1] * len(instance["input_ids"])]
for key in ["input_ids", "token_type_ids", "attention_mask"]:
instance[key] = torch.tensor(
instance[key]).unsqueeze(0) # Batch size = 1
instance[key].to(self.device)
output = self.model(input_ids=instance["input_ids"],
attention_mask=instance["attention_mask"],
token_type_ids=instance["token_type_ids"],
return_pooler_output=False)
return output
class ReasoningModel:
def __init__(self, device, tokenizer, input_builder, max_length=128, path=REASONING_MODEL):
print("Loading models...")
self.device = device
self.tokenizer = tokenizer
self.input_builder = input_builder
self.max_length = max_length
self.model = BertForSequenceClassification.from_pretrained(path)
self.model.to(self.device)
def run_inference(self, transcript, min_num_words=8, uptake_speaker=None):
self.model.eval()
with torch.no_grad():
for i, utt in enumerate(transcript.utterances):
if utt.get_num_words() >= min_num_words and utt.speaker != uptake_speaker:
instance = self.input_builder.build_inputs([], utt.text,
max_length=self.max_length,
input_str=True)
output = self.get_prediction(instance)
utt.reasoning = np.argmax(output["logits"][0].tolist())
def get_prediction(self, instance):
instance["attention_mask"] = [[1] * len(instance["input_ids"])]
for key in ["input_ids", "token_type_ids", "attention_mask"]:
instance[key] = torch.tensor(
instance[key]).unsqueeze(0) # Batch size = 1
instance[key].to(self.device)
output = self.model(input_ids=instance["input_ids"],
attention_mask=instance["attention_mask"],
token_type_ids=instance["token_type_ids"])
return output
class UptakeModel:
def __init__(self, device, tokenizer, input_builder, max_length=120, path=UPTAKE_MODEL):
print("Loading models...")
self.device = device
self.tokenizer = tokenizer
self.input_builder = input_builder
self.max_length = max_length
self.model = MultiHeadModel.from_pretrained(path, head2size={"nsp": 2})
self.model.to(self.device)
def run_inference(self, transcript, min_prev_words, uptake_speaker=None):
self.model.eval()
prev_num_words = 0
prev_utt = None
with torch.no_grad():
for i, utt in enumerate(transcript.utterances):
if ((uptake_speaker is None) or (utt.speaker == uptake_speaker)) and (prev_num_words >= min_prev_words):
textA = prev_utt.get_clean_text(remove_punct=False)
textB = utt.get_clean_text(remove_punct=False)
instance = self.input_builder.build_inputs([textA], textB,
max_length=self.max_length,
input_str=True)
output = self.get_prediction(instance)
utt.uptake = int(
softmax(output["nsp_logits"][0].tolist())[1] > .8)
prev_num_words = utt.get_num_words()
prev_utt = utt
def get_prediction(self, instance):
instance["attention_mask"] = [[1] * len(instance["input_ids"])]
for key in ["input_ids", "token_type_ids", "attention_mask"]:
instance[key] = torch.tensor(
instance[key]).unsqueeze(0) # Batch size = 1
instance[key].to(self.device)
output = self.model(input_ids=instance["input_ids"],
attention_mask=instance["attention_mask"],
token_type_ids=instance["token_type_ids"],
return_pooler_output=False)
return output
class FocusingQuestionModel:
def __init__(self, device, tokenizer, input_builder, max_length=128, path=FOCUSING_QUESTION_MODEL):
print("Loading models...")
self.device = device
self.tokenizer = tokenizer
self.input_builder = input_builder
self.model = BertForSequenceClassification.from_pretrained(path)
self.model.to(self.device)
self.max_length = max_length
def run_inference(self, transcript, min_focusing_words=0, uptake_speaker=None):
self.model.eval()
with torch.no_grad():
for i, utt in enumerate(transcript.utterances):
if utt.speaker != uptake_speaker or uptake_speaker is None:
utt.focusing_question = None
continue
if utt.get_num_words() < min_focusing_words:
utt.focusing_question = None
continue
instance = self.input_builder.build_inputs([], utt.text, max_length=self.max_length, input_str=True)
output = self.get_prediction(instance)
utt.focusing_question = np.argmax(output["logits"][0].tolist())
def get_prediction(self, instance):
instance["attention_mask"] = [[1] * len(instance["input_ids"])]
for key in ["input_ids", "token_type_ids", "attention_mask"]:
instance[key] = torch.tensor(
instance[key]).unsqueeze(0) # Batch size = 1
instance[key].to(self.device)
output = self.model(input_ids=instance["input_ids"],
attention_mask=instance["attention_mask"],
token_type_ids=instance["token_type_ids"])
return output
def load_math_terms():
math_terms = []
math_terms_dict = {}
for term in MATH_WORDS:
if term in MATH_PREFIXES:
math_terms_dict[f"(^|[^a-zA-Z]){term}(s|es|d|ed)?([^a-zA-Z]|$)"] = term
math_terms.append(f"(^|[^a-zA-Z]){term}(s|es|d|ed)?([^a-zA-Z]|$)")
else:
math_terms.append(term)
math_terms_dict[term] = term
return math_terms, math_terms_dict
def run_math_density(transcript):
math_terms, math_terms_dict = load_math_terms()
sorted_terms = sorted(math_terms, key=len, reverse=True)
teacher_math_word_cloud = {}
student_math_word_cloud = {}
for i, utt in enumerate(transcript.utterances):
text = utt.get_clean_text(remove_punct=True)
num_matches = 0
matched_positions = set()
match_list = []
for term in sorted_terms:
matches = list(re.finditer(term, text, re.IGNORECASE))
# Filter out matches that share positions with longer terms
matches = [match for match in matches if not any(match.start() in range(existing[0], existing[1]) for existing in matched_positions)]
# matched_text = [match.group(0) for match in matches]
if len(matches) > 0:
if utt.role == "teacher":
if math_terms_dict[term] not in teacher_math_word_cloud:
teacher_math_word_cloud[math_terms_dict[term]] = 0
teacher_math_word_cloud[math_terms_dict[term]] += len(matches)
else:
if math_terms_dict[term] not in student_math_word_cloud:
student_math_word_cloud[math_terms_dict[term]] = 0
student_math_word_cloud[math_terms_dict[term]] += len(matches)
match_list.append(math_terms_dict[term])
# Update matched positions
matched_positions.update((match.start(), match.end()) for match in matches)
num_matches += len(matches)
# print("match group list: ", [match.group(0) for match in matches])
utt.num_math_terms = num_matches
utt.math_terms = match_list
# utt.math_match_positions = list(matched_positions)
# utt.math_terms_raw = [text[start:end] for start, end in matched_positions]
teacher_dict_list = []
student_dict_list = []
dict_list = []
for word in teacher_math_word_cloud.keys():
teacher_dict_list.append(
{'text': word, 'value': teacher_math_word_cloud[word], 'category': "math"})
dict_list.append({'text': word, 'value': teacher_math_word_cloud[word], 'category': "math"})
for word in student_math_word_cloud.keys():
student_dict_list.append(
{'text': word, 'value': student_math_word_cloud[word], 'category': "math"})
dict_list.append({'text': word, 'value': student_math_word_cloud[word], 'category': "math"})
sorted_dict_list = sorted(dict_list, key=lambda x: x['value'], reverse=True)
sorted_teacher_dict_list = sorted(teacher_dict_list, key=lambda x: x['value'], reverse=True)
sorted_student_dict_list = sorted(student_dict_list, key=lambda x: x['value'], reverse=True)
# return sorted_dict_list[:50]
return sorted_dict_list[:50], sorted_teacher_dict_list[:50], sorted_student_dict_list[:50]
class EndpointHandler():
def __init__(self, path="."):
print("Loading models...")
self.device = "cuda" if torch.cuda.is_available() else "cpu"
self.tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")
self.input_builder = BertInputBuilder(tokenizer=self.tokenizer)
def __call__(self, data: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
data args:
inputs (:obj: `list`):
List of dicts, where each dict represents an utterance; each utterance object must have a `speaker`,
`text` and `uid`and can include list of custom properties
parameters (:obj: `dict`)
Return:
A :obj:`list` | `dict`: will be serialized and returned
"""
# get inputs
utterances = data.pop("inputs", data)
params = data.pop("parameters", None)
transcript = Transcript(filename=params.pop("filename", None))
for utt in utterances:
transcript.add_utterance(Utterance(**utt))
print("Running inference on %d examples..." % transcript.length())
logging.set_verbosity_info()
# Uptake
uptake_model = UptakeModel(
self.device, self.tokenizer, self.input_builder)
uptake_speaker = params.pop("uptake_speaker", None)
uptake_model.run_inference(transcript, min_prev_words=params['uptake_min_num_words'],
uptake_speaker=uptake_speaker)
del uptake_model
# Reasoning
reasoning_model = ReasoningModel(
self.device, self.tokenizer, self.input_builder)
reasoning_model.run_inference(transcript, uptake_speaker=uptake_speaker)
del reasoning_model
# Question
question_model = QuestionModel(
self.device, self.tokenizer, self.input_builder)
question_model.run_inference(transcript)
del question_model
# Focusing Question
focusing_question_model = FocusingQuestionModel(
self.device, self.tokenizer, self.input_builder)
focusing_question_model.run_inference(transcript, uptake_speaker=uptake_speaker)
del focusing_question_model
transcript.update_utterance_roles(uptake_speaker)
sorted_math_cloud, teacher_math_cloud, student_math_cloud = run_math_density(transcript)
transcript.calculate_aggregate_word_count()
return_dict = {'talkDistribution': None, 'talkLength': None, 'talkMoments': None, 'studentTopWords': None, 'teacherTopWords': None}
talk_dist, talk_len = transcript.get_talk_distribution_and_length(uptake_speaker)
return_dict['talkDistribution'] = talk_dist
return_dict['talkLength'] = talk_len
talk_moments = transcript.get_talk_timeline()
return_dict['talkMoments'] = talk_moments
word_cloud, uptake_word_cloud, teacher_general_cloud, student_general_cloud = transcript.get_word_clouds()
teacher_cloud = teacher_math_cloud + teacher_general_cloud
student_cloud = student_math_cloud + student_general_cloud
return_dict['teacherTopWords'] = teacher_cloud
return_dict['studentTopWords'] = student_cloud
return return_dict