from transformers import pipeline, AutoTokenizer, ElectraForPreTraining import pandas as pd import numpy as np import torch import streamlit as st from annotated_text import annotated_text USE_GPU = True if USE_GPU and torch.cuda.is_available(): device = torch.device("cuda:0") else: device = torch.device('cpu') MODEL_NAME_CHINESE = "IDEA-CCNL/Erlangshen-DeBERTa-v2-710M-Chinese" RTD_MODEL_NAME_CHINESE = "hfl/chinese-electra-180g-large-discriminator" WORD_PROBABILITY_THRESHOLD = 0.05 TOP_K_WORDS = 10 @st.cache_resource def get_model_chinese(): return pipeline("fill-mask", MODEL_NAME_CHINESE, device = device) @st.cache_resource def get_rtd_tokenizer_chinese(): return AutoTokenizer.from_pretrained(RTD_MODEL_NAME_CHINESE) @st.cache_resource def get_rtd_model_chinese(): return ElectraForPreTraining.from_pretrained(RTD_MODEL_NAME_CHINESE) @st.cache_resource def get_wordlist_chinese(): df = pd.read_csv('wordlist_chinese_v2.csv') wordlist = df[df.assess == True] return wordlist['Chinese'].tolist() @st.cache_resource def get_allowed_words(): df = pd.read_csv('allowed_words.csv') return set(list(df['word'])) def assess_chinese(word, sentence): print("Assessing Chinese") number_of_chars = len(word) assert number_of_chars == 2 allowed_words = get_allowed_words() if sentence.lower().find(word.lower()) == -1: print('Sentence does not contain the word!') return text = sentence.replace(word.lower(), "[MASK]"*number_of_chars) top_k_prediction = [] candidates = mask_filler_chinese(text, top_k=TOP_K_WORDS)[0] for candidate in candidates: temp_text = text.replace("[MASK]", candidate['token_str'], 1) second_predictions = mask_filler_chinese(temp_text, top_k=5) for prediction in second_predictions: prediction['token_str'] = candidate['token_str'] + prediction['token_str'] prediction['score'] = candidate['score'] * prediction['score'] top_k_prediction.extend(second_predictions) top_k_prediction = sorted(top_k_prediction, key = lambda x: x['score'], reverse = True)[:(TOP_K_WORDS*5)] norm_factor = 0 for output in top_k_prediction: if output['token_str'] not in allowed_words: norm_factor += output['score'] top_k_prediction_new = [] for output in top_k_prediction: if output['token_str'] in allowed_words: output['score'] = output['score']/(1-min(0.5,norm_factor)) top_k_prediction_new.append(output) print (f"NORM_FACTOR: {norm_factor}") # Get target word prediction temp_text = text output1 = mask_filler_chinese(text, targets=word[0])[0][0] temp_text = text.replace("[MASK]", word[0], 1) output2 = mask_filler_chinese(temp_text, targets = word[1])[0] output2['token_str'] = output1['token_str'] + output2['token_str'] output2['score'] = output1['score'] * output2['score'] target_word_prediction = output2 target_word_prediction['score'] = target_word_prediction['score'] / (1-min(0.5,norm_factor)) score = target_word_prediction['score'] # append the original word if its not found in the results top_k_prediction_filtered = [output for output in top_k_prediction_new if \ output['token_str'] == word] if len(top_k_prediction_filtered) == 0: top_k_prediction_new.extend([target_word_prediction]) return top_k_prediction_new, score def assess_sentence(word, sentence): return assess_chinese(word, sentence) def get_annotated_sentence(sentence, errors): if len(errors) == 0: return sentence output = ["Input sentence: "] wrong_char_indices = [e[0].item() for e in errors] curr_ind = 0 for i in range(len(wrong_char_indices)): output.append(sentence[curr_ind:wrong_char_indices[i]]) output.append((sentence[wrong_char_indices[i]], "", "#F8C8DC")) # output.append((sentence[wrong_char_indices[i]], " ", "#ff4b4b")) curr_ind = wrong_char_indices[i] + 1 output.append(sentence[curr_ind:]) print(output) return output def get_word_errors(word, sentence): tokens = rtd_tokenizer_chinese(sentence, return_tensors = 'pt', return_offsets_mapping = True) scores = rtd_model_chinese(**rtd_tokenizer_chinese(sentence, return_tensors = 'pt'))[0][0] errors = [] for i in range(len(scores)): if scores[i] > 0: errors.append(tokens['offset_mapping'][0][i]) print(errors) return errors def get_chinese_word(): possible_words = get_wordlist_chinese() word = np.random.choice(possible_words) return word def get_word(): return get_chinese_word() mask_filler_chinese = get_model_chinese() #wordlist_chinese = get_wordlist_chinese() rtd_tokenizer_chinese = get_rtd_tokenizer_chinese() rtd_model_chinese = get_rtd_model_chinese() def highlight_given_word(row): color = '#ACE5EE' if row.Words == target_word else 'white' return [f'background-color:{color}'] * len(row) def get_top_5_results(top_k_prediction): predictions_df = pd.DataFrame(top_k_prediction) predictions_df = predictions_df.drop(columns=["token", "sequence"]) predictions_df = predictions_df.rename(columns={"score": "Probability", "token_str": "Words"}) if (predictions_df[:5].Words == target_word).sum() == 0: print("target word not in top 5") top_5_df = predictions_df[:5] target_word_df = predictions_df[(predictions_df.Words == target_word)] print(target_word_df) top_5_df = pd.concat([top_5_df, target_word_df]) else: top_5_df = predictions_df[:5] top_5_df['Probability'] = top_5_df['Probability'].apply(lambda x: f"{x:.2%}") return top_5_df #### Streamlit Page st.title("造句 Self-marking Demo") if 'target_word' not in st.session_state: st.session_state['target_word'] = get_word() target_word = st.session_state['target_word'] target_word_ind = get_wordlist_chinese().index(target_word) #st.write("Target word: ", target_word) target_word = st.selectbox("Choose a word:", get_wordlist_chinese(), index = target_word_ind) if st.button("Get random word"): st.session_state['target_word'] = get_word() st.experimental_rerun() st.subheader("Form your sentence and input below!") sentence = st.text_input('Enter your sentence here', placeholder="Enter your sentence here!") if st.button("Grade"): if sentence.find(target_word) == -1: st.error("Error: Sentence must include the target word!") top_k_prediction, score = assess_sentence(target_word, sentence) with open('./result01.json', 'w') as outfile: outfile.write(str(top_k_prediction)) errors = get_word_errors(target_word, sentence) annotated_sentence = get_annotated_sentence(sentence, errors) annotated_text(annotated_sentence) st.write(f"Probability score: {score:.1%}. (Target: {WORD_PROBABILITY_THRESHOLD:.1%})") # st.write(f"Target probability: {WORD_PROBABILITY_THRESHOLD:.1%}") predictions_df = get_top_5_results(top_k_prediction) df_style = predictions_df.style.apply(highlight_given_word, axis=1) if (score >= WORD_PROBABILITY_THRESHOLD): # st.balloons() if (len(errors) == 0): st.success("Yay good job! 🕺 Practice again with other words", icon="✅") else: st.warning("Potential word errors detected. Try again?") else: st.warning("Probability score too low. Maybe try again?") st.table(df_style)