Spaces:
Sleeping
Sleeping
import os | |
import time | |
from datetime import datetime | |
import logging | |
from pathlib import Path | |
import requests | |
import json | |
import numpy as np | |
import pandas as pd | |
import spacy | |
from sentence_transformers import CrossEncoder | |
import litellm | |
# from litellm import completion | |
from tqdm import tqdm | |
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig, AutoConfig, pipeline | |
# from accelerate import PartialState | |
# from accelerate.inference import prepare_pippy | |
import torch | |
# import cohere | |
# from openai import OpenAI | |
# # import google | |
import google.generativeai as genai | |
import src.backend.util as util | |
import src.envs as envs | |
# | |
# # import pandas as pd | |
# import scipy | |
from scipy.spatial.distance import jensenshannon | |
import numpy as np | |
import spacy_transformers | |
import subprocess | |
# Run the command to download the spaCy model | |
# subprocess.run(["python", "-m", "spacy", "download", "en_core_web_trf"], check=True) | |
subprocess.run(["python", "-m", "spacy", "download", "en_core_web_sm"], check=True) | |
# subprocess.run(["pip", "install", "spacy-transformers"], check=True) | |
subprocess.run(["pip", "install", "curated-transformers==0.1.1"], check=True) | |
# Load spacy model for word tokenization | |
# nlp = spacy.load("en_core_web_sm") | |
try: | |
nlp1 = spacy.load("en_core_web_sm") | |
except OSError: | |
print("无法加载模型,继续执行其他处理。") | |
# litellm.set_verbose=False | |
litellm.set_verbose=True | |
# Set up basic configuration for logging | |
logging.basicConfig(level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - %(message)s') | |
# os.environ["HUGGINGFACE_API_KEY"] = envs.TOKEN | |
def load_evaluation_model(model_path): | |
"""Load the evaluation model from the given path | |
Args: | |
model_path (str): Path to the evaluation model | |
Returns: | |
CrossEncoder: The evaluation model | |
""" | |
# model = CrossEncoder(model_path) | |
model = "" | |
return model | |
class ModelLoadingException(Exception): | |
"""Exception raised for errors in loading a model. | |
Attributes: | |
model_id (str): The model identifier. | |
revision (str): The model revision. | |
""" | |
def __init__(self, model_id, revision, messages="Error initializing model"): | |
self.model_id = model_id | |
self.revision = revision | |
super().__init__(f"{messages} id={model_id} revision={revision}") | |
class SummaryGenerator: | |
"""A class to generate summaries using a causal language model. | |
Attributes: | |
model (str): huggingface/{model_id} | |
api_base (str): https://api-inference.huggingface.co/models/{model_id} | |
summaries_df (DataFrame): DataFrame to store generated summaries. | |
revision (str): Model revision. | |
avg_length (float): Average length of summaries. | |
answer_rate (float): Rate of non-empty summaries. | |
""" | |
def __init__(self, model_id, revision): | |
""" | |
Initializes the SummaryGenerator with a model. | |
Args: | |
model_id (str): Identifier for the model. | |
revision (str): Revision of the model. | |
""" | |
self.model_id = model_id | |
self.model = f"huggingface/{model_id}" | |
self.api_base = f"https://api-inference.huggingface.co/models/{model_id}" | |
self.summaries_df = pd.DataFrame() | |
self.revision = revision | |
self.avg_length = None | |
self.answer_rate = None | |
self.exceptions = None | |
self.local_model = None | |
def generate_summaries(self, dataset, df_prompt, save_path=None): | |
"""Generate summaries for a given DataFrame of source docs. | |
修改这里拉取模型生成结果 | |
Args: | |
df (DataFrame): DataFrame containing source docs. | |
Returns: | |
summaries_df (DataFrame): Generated summaries by the model. | |
""" | |
exceptions = [] | |
if (save_path is not None) and os.path.exists(save_path): | |
'''已存在文件,可以读取已经存在的测试文本''' | |
self.summaries_df = pd.read_csv(save_path) | |
# print(self.summaries_df['Experiment']) | |
print(f'Loaded generated summaries from {save_path}') | |
else: | |
'''测试文件不存在,则需要调用指定的模型来进行测试''' | |
# prompt = {} | |
# for index, row in tqdm(df_prompt.iterrows(), total=df_prompt.shape[0]): | |
# prompt['E' + row['Item']] = row['Prompt'] | |
xls = pd.ExcelFile(dataset) | |
sheet_names = xls.sheet_names | |
# sheet_names = df.sheetnames | |
print(f"Total: {len(sheet_names)}") | |
print(sheet_names) | |
Experiment_ID, Questions_ID, Item_ID, Condition, User_prompt, Response, Factor_2, Stimuli_1 = [], [], [], [], [] ,[], [], [] | |
for i, sheet_name in enumerate(sheet_names, start=1): | |
# 读取每个工作表 | |
# if i > 2 and i ==1: | |
# continue | |
print(i, sheet_name) | |
df_sheet = pd.read_excel(xls, sheet_name=sheet_name) | |
# 假设第一列是'Prompt0',但这里我们使用列名来避免硬编码 | |
if 'Prompt0' in df_sheet.columns: | |
prompt_column = df_sheet['Prompt0'] | |
else: | |
# 如果'Prompt0'列不存在,则跳过该工作表或进行其他处理 | |
continue | |
if i == 3 : | |
word1_list = df_sheet['Stimuli-2'] | |
word2_list = df_sheet['Stimuli-3'] | |
V2_column = [] | |
for jj in range(len(word1_list)): | |
V2_column.append(word1_list[jj] + '_' + word2_list[jj]) | |
# print(V2_column) | |
elif i == 9: | |
V2_column = df_sheet['V2'] #SL, LS | |
elif i == 4 or i == 6 : | |
V2_column = df_sheet['Stimuli-2'] #Stimuli-2 | |
else: | |
V2_column = [""] * len(prompt_column) | |
q_column = df_sheet["ID"] | |
Item_column = df_sheet["Item"] | |
Condition_column = df_sheet["Condition"] | |
Stimuli_1_column = df_sheet["Stimuli-1"] | |
if 'Stimuli-2' in df_sheet.columns: | |
Stimuli_2_column = df_sheet["Stimuli-2"] | |
# 遍历Prompt0列的值 | |
for j, prompt_value in enumerate(tqdm(prompt_column, desc=f"Processing {sheet_name}"), start=0): | |
ID = 'E' + str(i) | |
# q_ID = ID + '_' + str(j) | |
# print(ID, q_ID, prompt_value) | |
system_prompt = envs.SYSTEM_PROMPT | |
_user_prompt = prompt_value | |
for ii in range(2): | |
# user_prompt = f"{envs.USER_PROMPT}\nPassage:\n{_source}" | |
while True: | |
try: | |
'''调用''' | |
print(ID,'-',ii) | |
_response = self.generate_summary(system_prompt, _user_prompt) | |
# print(f"Finish index {index}") | |
break | |
except Exception as e: | |
if 'Rate limit reached' in str(e): | |
wait_time = 3660 | |
current_time = datetime.now().strftime('%H:%M:%S') | |
print(f"Rate limit hit at {current_time}. Waiting for 1 hour before retrying...") | |
time.sleep(wait_time) | |
elif 'is currently loading' in str(e): | |
wait_time = 200 | |
print(f"Model is loading, wait for {wait_time}") | |
time.sleep(wait_time) | |
elif '429 Resource has been exhausted' in str(e): # for gemini models | |
wait_time = 60 | |
print(f"Quota has reached, wait for {wait_time}") | |
time.sleep(wait_time) | |
else: | |
print(f"Error at index {i}: {e}") | |
_response = "" | |
exceptions.append(i) | |
break | |
if i == 5: | |
print(_response) | |
def extract_responses(text, trigger_words=None): | |
if trigger_words is None: | |
# 如果没有提供特定的触发词列表,则使用默认值 | |
trigger_words = ["sure", "okay", "yes"] | |
try: | |
sentences = text.split('\n') | |
sentences = [sentence.strip() for sentence in sentences if sentence.strip()] | |
sentences = [sentence.split(':', 1)[-1].strip() if ':' in sentence else sentence for | |
sentence in sentences] | |
if any(sentences[0].lower().startswith(word) for word in trigger_words): | |
_response1 = sentences[1].strip() if len(sentences) > 1 else None | |
_response2 = sentences[2].strip() if len(sentences) > 2 else None | |
else: | |
_response1 = sentences[0].strip() if len(sentences) > 0 else None | |
_response2 = sentences[1].strip() if len(sentences) > 1 else None | |
except Exception as e: | |
print(f"Error occurred: {e}") | |
_response1, _response2 = None, None | |
return _response1, _response2 | |
_response1, _response2 = extract_responses(_response) | |
# if _response == None: | |
# _response1, _response2 = "", "" | |
# else: | |
# try: | |
# import re | |
# _response1,_response2 = re.split(r'\n\s*\n', _response.strip()) | |
# except: | |
# _response1 = _response.split('\n\n') | |
# if len(_response) == 2: | |
# _response1, _response2 = _response[0], _response[1] | |
# else: | |
# _response1, _response2 = _response[0], "" | |
Experiment_ID.append(ID) | |
Questions_ID.append(q_column[j]) | |
User_prompt.append(_user_prompt) | |
Response.append(_response2) | |
Factor_2.append(V2_column[j]) | |
Stimuli_1.append(Stimuli_2_column[j]) | |
Item_ID.append(Item_column[j]) | |
Condition.append(Condition_column[j]) | |
# the first sentence in the response is saved as E51 | |
Experiment_ID.append(ID + '1') | |
Questions_ID.append(str(q_column[j]) + '1') | |
User_prompt.append(_user_prompt) | |
Response.append(_response1) | |
Factor_2.append(V2_column[j]) | |
Stimuli_1.append(Stimuli_1_column[j]) | |
Item_ID.append(Item_column[j]) | |
Condition.append(Condition_column[j]) | |
else: | |
Experiment_ID.append(ID) | |
Questions_ID.append(q_column[j]) | |
User_prompt.append(_user_prompt) | |
Response.append(_response) | |
if i == 6: | |
Factor_2.append(Condition_column[j]) | |
Stimuli_1.append(V2_column[j]) | |
else: | |
Factor_2.append(V2_column[j]) | |
Stimuli_1.append(Stimuli_1_column[j]) | |
Item_ID.append(Item_column[j]) | |
Condition.append(Condition_column[j]) | |
print(_response) | |
# exit() | |
# Sleep to prevent hitting rate limits too frequently | |
time.sleep(1) | |
self.summaries_df = pd.DataFrame(list(zip(Experiment_ID, Questions_ID, Item_ID, Condition, User_prompt, Response, Factor_2, Stimuli_1)), | |
columns=["Experiment", "Question_ID", "Item", "Condition", "User_prompt", "Response","Factor 2","Stimuli 1"]) | |
if save_path is not None: | |
print(f'Save summaries to {save_path}') | |
fpath = Path(save_path) | |
fpath.parent.mkdir(parents=True, exist_ok=True) | |
self.summaries_df.to_csv(fpath) | |
self.exceptions = exceptions | |
# self._compute_avg_length() | |
# self._compute_answer_rate() | |
return self.summaries_df | |
def generate_summary(self, system_prompt: str, user_prompt: str): | |
# Using Together AI API | |
using_together_api = False | |
together_ai_api_models = ['mixtral', 'dbrx', 'wizardlm'] | |
for together_ai_api_model in together_ai_api_models: | |
if together_ai_api_model in self.model_id.lower(): | |
using_together_api = True | |
break | |
# print('适用哪一种LLM',together_ai_api_model , using_together_api) | |
# print(self.model_id.lower()) #meta-llama/llama-2-7b-chat-hf | |
# print('local',self.local_model) $None | |
# exit() | |
# if 'mixtral' in self.model_id.lower() or 'dbrx' in self.model_id.lower() or 'wizardlm' in self.model_id.lower(): # For mixtral and dbrx models, use Together AI API | |
if using_together_api: | |
# suffix = "completions" if ('mixtral' in self.model_id.lower() or 'base' in self.model_id.lower()) else "chat/completions" | |
suffix = "chat/completions" | |
url = f"https://api.together.xyz/v1/{suffix}" | |
payload = { | |
"model": self.model_id, | |
# "max_tokens": 4096, | |
'max_new_tokens': 50, | |
# "temperature": 0.0, | |
# 'repetition_penalty': 1.1 if 'mixtral' in self.model_id.lower() else 1 | |
} | |
# if 'mixtral' in self.model_id.lower(): | |
# # payload['prompt'] = user_prompt | |
# # payload['prompt'] = "Write a summary of the following passage:\nPassage:\n" + user_prompt.split('Passage:\n')[-1] + '\n\nSummary:' | |
# payload['prompt'] = 'You must stick to the passage provided. Provide a concise summary of the following passage, covering the core pieces of information described:\nPassage:\n' + user_prompt.split('Passage:\n')[-1] + '\n\nSummary:' | |
# print(payload) | |
# else: | |
# payload['messages'] = [{"role": "system", "content": system_prompt}, | |
# {"role": "user", "content": user_prompt}] | |
payload['messages'] = [{"role": "system", "content": system_prompt}, | |
{"role": "user", "content": user_prompt}] | |
headers = { | |
"accept": "application/json", | |
"content-type": "application/json", | |
"Authorization": f"Bearer {os.environ['TOGETHER_API_KEY']}" | |
} | |
response = requests.post(url, json=payload, headers=headers) | |
try: | |
result = json.loads(response.text) | |
# print(result) | |
result = result["choices"][0] | |
if 'message' in result: | |
result = result["message"]["content"].strip() | |
else: | |
result = result["text"] | |
result_candidates = [result_cancdidate for result_cancdidate in result.split('\n\n') if len(result_cancdidate) > 0] | |
result = result_candidates[0] | |
print(result) | |
except: | |
print(response) | |
result = '' | |
print(result) | |
return result | |
# Using OpenAI API | |
elif 'gpt' in self.model_id.lower(): | |
response = litellm.completion( | |
model=self.model_id.replace('openai/',''), | |
messages=[{"role": "system", "content": system_prompt}, | |
{"role": "user", "content": user_prompt}], | |
# temperature=0.0, | |
max_tokens=50, | |
) | |
result = response['choices'][0]['message']['content'] | |
# print() | |
print(result) | |
return result | |
# Using Google AI API for Gemini models | |
elif 'gemini' in self.model_id.lower(): | |
genai.configure(api_key=os.getenv('GOOGLE_AI_API_KEY')) | |
generation_config = { | |
"temperature": 0, | |
"top_p": 0.95, # cannot change | |
"top_k": 0, | |
"max_output_tokens": 50, | |
# "response_mime_type": "application/json", | |
} | |
safety_settings = [ | |
{ | |
"category": "HARM_CATEGORY_HARASSMENT", | |
"threshold": "BLOCK_NONE" | |
}, | |
{ | |
"category": "HARM_CATEGORY_HATE_SPEECH", | |
"threshold": "BLOCK_NONE" | |
}, | |
{ | |
"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", | |
"threshold": "BLOCK_NONE" | |
}, | |
{ | |
"category": "HARM_CATEGORY_DANGEROUS_CONTENT", | |
"threshold": "BLOCK_NONE" | |
}, | |
] | |
model = genai.GenerativeModel(model_name="gemini-1.5-pro-latest" if "gemini-1.5-pro" in self.model_id.lower() else self.model_id.lower().split('google/')[-1], | |
generation_config=generation_config, | |
system_instruction=system_prompt, | |
safety_settings=safety_settings) | |
convo = model.start_chat(history=[]) | |
convo.send_message(user_prompt) | |
# print(convo.last) | |
result = convo.last.text | |
print(result) | |
return result | |
# Using HF API or download checkpoints | |
elif self.local_model is None: | |
# print(self.model_id) | |
# print(self.api_base) | |
# mistralai/Mistral-7B-Instruct-v0.1 | |
# https://api-inference.huggingface.co/models/mistralai/Mistral-7B-Instruct-v0.1 | |
try: # try use HuggingFace API | |
# response = litellm.completion( | |
# model="huggingface/"+'command-r-plus' if 'command' in self.model_id else self.model_id, | |
# messages=[{"role": "system", "content": system_prompt}, | |
# {"role": "user", "content": user_prompt}], | |
# temperature=0.0, | |
# max_tokens=1024, | |
# api_base= "https://api-inference.huggingface.co/models/" + self.model_id, | |
# ) | |
# self.model_id = 'command-r-plus' if 'command' in self.model_id else self.model_id | |
# response = litellm.completion( | |
# model="huggingface/" + self.model_id, | |
# # mistralai/Mistral-7B-Instruct-v0.1", | |
# messages=[{"role": "system", "content": system_prompt}, | |
# {"role": "user", "content": user_prompt}], | |
# #temperature=0.0, | |
# max_tokens=1024, | |
# api_base="https://api-inference.huggingface.co/models/" + self.model_id) | |
# print("模型返回结果",response) | |
# print("模型返回结果结束") | |
# # exit() | |
# result = response['choices'][0]['message']['content'] | |
# print(result) | |
from huggingface_hub import InferenceClient | |
print("token_for_request:",envs.TOKEN) | |
client = InferenceClient(self.model_id,api_key=envs.TOKEN,headers={"X-use-cache": "false"}) | |
messages = [{"role": "system", "content": system_prompt},{"role": "user", "content": user_prompt}] | |
# outputs = client.chat_completion(messages, max_tokens=50) | |
result = None | |
while result is None: | |
outputs = client.chat_completion(messages, max_tokens=50) | |
result = outputs['choices'][0]['message']['content'] | |
if result is None: | |
time.sleep(1) # Optional: Add a small delay before retrying | |
return result | |
# exit() | |
except: # fail to call api. run it locally. | |
self.tokenizer = AutoTokenizer.from_pretrained(self.model_id, trust_remote_code=True) | |
print("Tokenizer loaded") | |
self.local_model = AutoModelForCausalLM.from_pretrained(self.model_id, trust_remote_code=True, device_map="auto", torch_dtype="auto", cache_dir='/home/paperspace/cache') | |
print("Local model loaded") | |
# exit() | |
# Using local model | |
if self.local_model: # cannot call API. using local model | |
messages=[ | |
{"role": "system", "content": system_prompt}, # gemma-1.1 does not accept system role | |
{"role": "user", "content": user_prompt} | |
] | |
try: # some models support pipeline | |
pipe = pipeline( | |
"text-generation", | |
model=self.local_model, | |
tokenizer=self.tokenizer, | |
) | |
generation_args = { | |
"max_new_tokens": 50, | |
"return_full_text": False, | |
#"temperature": 0.0, | |
"do_sample": False, | |
} | |
output = pipe(messages, **generation_args) | |
result = output[0]['generated_text'] | |
print(result) | |
except: | |
prompt = self.tokenizer.apply_chat_template(messages,add_generation_prompt=True, tokenize=False) | |
print(prompt) | |
input_ids = self.tokenizer(prompt, return_tensors="pt").to('cuda') | |
with torch.no_grad(): | |
outputs = self.local_model.generate(**input_ids, max_new_tokens=50, do_sample=True, pad_token_id=self.tokenizer.eos_token_id) | |
result = self.tokenizer.decode(outputs[0], skip_special_tokens=True) | |
result = result.replace(prompt[0], '') | |
print(result) | |
return result | |
def _compute_avg_length(self): | |
""" | |
Compute the average length of non-empty summaries using SpaCy. | |
""" | |
total_word_count = 0 | |
total_count = 0 | |
for summary in self.summaries_df['summary']: | |
if util.is_summary_valid(summary): | |
doc = nlp1(summary) | |
words = [token.text for token in doc if token.is_alpha] | |
total_word_count += len(words) | |
total_count += 1 | |
self.avg_length = 0 if total_count == 0 else total_word_count / total_count | |
def _compute_answer_rate(self): | |
""" | |
Compute the rate of non-empty summaries. | |
""" | |
valid_count = sum(1 for summary in self.summaries_df['summary'] | |
if util.is_summary_valid(summary)) | |
total_count = len(self.summaries_df) | |
self.answer_rate = 0 if total_count == 0 else valid_count / total_count | |
class EvaluationModel: | |
"""A class to evaluate generated summaries. | |
Attributes: | |
model (CrossEncoder): The evaluation model. | |
scores (list): List of evaluation scores. | |
accuracy (float): Accuracy of the summaries. | |
hallucination_rate (float): Rate of hallucination in summaries. | |
""" | |
def __init__(self, model_path): | |
""" | |
Initializes the EvaluationModel with a CrossEncoder model. | |
Args: | |
model_path (str): Path to the CrossEncoder model. | |
""" | |
self.model = load_evaluation_model(model_path) | |
self.scores = [] | |
self.factual_consistency_rate = None | |
self.hallucination_rate = None | |
self.humanlike_score = None | |
def code_results(self, summaries_df): | |
'''code results from LLM's response''' | |
output = [] | |
'''database for Exp4''' | |
item4 = pd.read_csv(envs.ITEM_4_DATA) | |
wordpair2code = {} | |
for j in range(len(item4['Coding'])): | |
wordpair2code[item4['Pair'][j]] = item4['Coding'][j] | |
'''verb for Exp5''' | |
item5 = pd.read_csv(envs.ITEM_5_DATA) | |
# item corresponding to verb, same item id corresponding to verb pair | |
item2verb2 = {} | |
item2verb1 = {} | |
Stimuli1, Stimuli2 = {}, {} | |
for j in range(len(item5['Item'])): | |
item2verb1[item5['Item'][j]] = item5['Verb1'][j] | |
item2verb2[item5['Item'][j]] = item5['Verb2'][j] | |
Stimuli1[item5['ID'][j]] = item5['Stimuli-1'][j] | |
Stimuli2[item5['ID'][j]] = item5['Stimuli-2'][j] | |
male_keyword = ["he", "his", "himself"] | |
female_keyword = ["she", "her", "herself"] | |
print(len(summaries_df["Experiment"])) | |
for i in range(len(summaries_df["Experiment"])): | |
# vote_1_1, vote_1_2, vote_1_3 = 0, 0, 0 | |
# print() | |
if pd.isna(summaries_df["Response"][i]): | |
output.append("Other") | |
continue | |
rs = summaries_df["Response"][i].strip().lower() | |
sentences = rs.split('\n') | |
sentences = [sentence.split(':', 1)[-1].strip() if ':' in sentence else sentence | |
for sentence in sentences] | |
rs = [sentence.strip() for sentence in sentences if sentence.strip()] | |
'''Exp1''' | |
if summaries_df["Experiment"][i] == "E1": | |
print("E1", rs) | |
rs = rs.replace('"','') | |
if rs == "round": | |
# vote_1_1 += 1 | |
output.append("Round") | |
elif rs == "spiky": | |
output.append("Spiky") | |
else: | |
output.append("Other") | |
'''Exp2''' | |
elif summaries_df["Experiment"][i] == "E2": | |
# rs = summaries_df["Response"][i].strip() | |
rs = rs.split(' ') | |
print("E2", rs) | |
male, female = 0, 0 | |
for word in rs: | |
if word in female_keyword and male == 0: | |
female = 1 | |
output.append("Female") | |
break | |
if word in male_keyword and female == 0: | |
male = 1 | |
output.append("Male") | |
break | |
if male == 0 and female == 0 : | |
output.append("Other") | |
'''Exp3''' | |
elif summaries_df["Experiment"][i] == "E3": | |
# rs = summaries_df["Response"][i].strip() | |
print("E3", rs) | |
if pd.isna(summaries_df["Factor 2"][i]): | |
output.append("Other") | |
else: | |
if summaries_df["Factor 2"][i].strip() == "LS": | |
if "2" in rs: | |
output.append("Long") | |
elif "3" in rs: | |
output.append("Short") | |
else: | |
output.append("Other") | |
if summaries_df["Factor 2"][i].strip() == "SL": | |
if "2" in rs: | |
output.append("Short") | |
elif "3" in rs: | |
output.append("Long") | |
else: | |
output.append("Other") | |
'''Exp4''' | |
elif summaries_df["Experiment"][i] == "E4": | |
# rs = summaries_df["Response"][i].strip() | |
target = summaries_df["Factor 2"][i].strip().lower() | |
pair = target + "_" + rs | |
print("E4:", pair) | |
if pair in wordpair2code.keys(): | |
output.append(wordpair2code[pair]) | |
else: | |
output.append("Other") | |
'''Exp5''' | |
elif summaries_df["Experiment"][i] == "E5" or summaries_df["Experiment"][i] == "E51": | |
# sentence = summaries_df["Response"][i].strip() | |
item_id = summaries_df["Item"][i] | |
question_id = summaries_df["Question_ID"][i] | |
sti1, sti2 = "", "" | |
if summaries_df["Experiment"][i] == "E51": | |
sti1 = Stimuli1[question_id[0:-1]].lower().replace("...", "") | |
sti2 = Stimuli2[question_id[0:-1]].lower().replace("...", "") | |
verb = item2verb1[item_id].lower() | |
sentence = sti1 + " " + rs.replace(sti1, "") | |
print("E5", verb, sentence) | |
if summaries_df["Experiment"][i] == "E5": | |
sti1 = Stimuli1[question_id].lower().replace("...", "") | |
# print(sti1) | |
sti2 = Stimuli2[question_id].lower().replace("...", "") | |
verb = item2verb2[item_id].lower() | |
sentence = sti2.replace("...","") + " " + rs.replace(sti2, "") | |
print("E5", verb, sentence) | |
doc = nlp1(sentence.replace(" "," ")) | |
# print(doc) | |
# print() | |
verb_token = None | |
for token in doc: | |
# print(token.lemma_) | |
if token.lemma_ == verb: | |
verb_token = token | |
break | |
# exit() | |
if verb_token is None: | |
output.append("Other") | |
print("E5 The target verb is missing from the sentence.") | |
else: | |
pobj, dative = None, None | |
# print(verb_token.children) | |
# exit() | |
for child in verb_token.children: | |
print(child) | |
if (child.dep_ == 'dative' and child.pos_ == "ADP") or (child.text == "to" and child.dep_ == 'prep' and child.pos_ == "ADP"): | |
pobj = child.text | |
if child.dep_ == 'dative': | |
dative = child.text | |
print("E5", pobj, dative) | |
# exit() | |
if pobj: | |
output.append("PO") | |
elif dative: | |
output.append("DO") | |
else: | |
print("Other", sentence, pobj, dative) | |
# exit() | |
output.append("Other") | |
'''Exp6''' | |
elif summaries_df["Experiment"][i] == "E6": | |
sentence = summaries_df["Stimuli 1"][i].strip().lower() | |
print("E6", sentence) | |
doc = nlp1(sentence) | |
subject = "None" | |
obj = "None" | |
# 遍历依存关系,寻找主语和宾语 | |
for token in doc: | |
if token.dep_ == "nsubj": | |
subject = token.text | |
elif token.dep_ == "dobj": | |
obj = token.text | |
print("E6", subject, obj) | |
if subject in rs and obj in rs: | |
print(rs, subject, obj, "Other") | |
output.append("Other") | |
elif subject in rs: | |
print(rs, subject, obj, "VP") | |
output.append("VP") | |
elif obj in rs: | |
print(rs, subject, obj, "NP") | |
output.append("NP") | |
else: | |
print(rs, subject, obj, "Other") | |
output.append("Other") | |
'''Exp7''' | |
elif summaries_df["Experiment"][i] == "E7": | |
# rs = summaries_df["Response"][i].strip().lower() | |
print("E7",rs) | |
if rs == "no": | |
output.append("0") | |
elif rs == "yes": | |
output.append("1") | |
else: | |
output.append("Other") | |
'''Exp8''' | |
elif summaries_df["Experiment"][i] == "E8": | |
# rs = summaries_df["Response"][i].strip() | |
if "something is wrong with the question" in rs: | |
output.append("1") | |
else: | |
output.append("0") | |
'''Exp9''' | |
elif summaries_df["Experiment"][i] == "E9": | |
male, female = 0, 0 | |
# rs = summaries_df["Response"][i].strip() | |
if "because" in rs: | |
rs = rs.replace("because because","because").split("because")[1] | |
else: | |
rs = rs | |
condition = summaries_df["Factor 2"][i].strip() | |
rs = rs.split(" ") | |
for w in rs: | |
if w in male_keyword and female != 1: | |
male = 1 | |
break | |
if w in female_keyword and male != 1: | |
female = 1 | |
break | |
print("E9", "condition", condition, "male", male, "female", female) | |
if male == 0 and female == 0: | |
output.append('Other') | |
else: | |
if male == 1 and female==0: | |
if condition == "MF": | |
output.append("Subject") | |
elif condition == "FM": | |
output.append("Object") | |
else: | |
output.append("Other") | |
elif female == 1 and male ==0: | |
if condition == "MF": | |
output.append("Object") | |
elif condition == "FM": | |
output.append("Subject") | |
else: | |
output.append("Other") | |
'''Exp10''' | |
elif summaries_df["Experiment"][i] == "E10": | |
# rs = summaries_df["Response"][i].strip() | |
if rs == "yes": | |
output.append("1") | |
else: | |
output.append("0") | |
else: | |
print("can;t find the Exp:", summaries_df["Experiment"][i]) | |
output.append("NA") | |
# print(output) | |
# exit() | |
'''human''' | |
self.data = pd.DataFrame(list(zip(summaries_df["Experiment"], summaries_df["Question_ID"], summaries_df["Item"], summaries_df["Response"], summaries_df["Factor 2"], summaries_df["Stimuli 1"], summaries_df["Coding"], output)), | |
columns=["Experiment", "Question_ID", "Item", "Response", "Factor 2", "Simulate 1","Original_Coding","Coding"]) | |
# '''LLM''' | |
# self.data = pd.DataFrame(list(zip(summaries_df["Experiment"], summaries_df["Question_ID"], summaries_df["Item"], summaries_df["Response"], summaries_df["Factor 2"], summaries_df["Stimuli 1"], output)), | |
# columns=["Experiment", "Question_ID", "Item", "Response", "Factor 2", "Simulate 1","Coding"]) | |
print(self.data.head()) | |
return self.data | |
def code_results_llm(self, summaries_df): | |
'''code results from LLM's response''' | |
output = [] | |
'''database for Exp4''' | |
item4 = pd.read_csv(envs.ITEM_4_DATA) | |
wordpair2code = {} | |
for j in range(len(item4['Coding'])): | |
wordpair2code[item4['Pair'][j]] = item4['Coding'][j] | |
'''verb for Exp5''' | |
item5 = pd.read_csv(envs.ITEM_5_DATA) | |
# item corresponding to verb, same item id corresponding to verb pair | |
item2verb2 = {} | |
item2verb1 = {} | |
Stimuli1, Stimuli2 = {}, {} | |
for j in range(len(item5['Item'])): | |
item2verb1[item5['Item'][j]] = item5['Verb1'][j] | |
item2verb2[item5['Item'][j]] = item5['Verb2'][j] | |
Stimuli1[item5['ID'][j]] = item5['Stimuli-1'][j] | |
Stimuli2[item5['ID'][j]] = item5['Stimuli-2'][j] | |
male_keyword = ["he", "his", "himself"] | |
female_keyword = ["she", "her", "herself"] | |
print(len(summaries_df["Experiment"])) | |
for i in range(len(summaries_df["Experiment"])): | |
# vote_1_1, vote_1_2, vote_1_3 = 0, 0, 0 | |
# print() | |
if pd.isna(summaries_df["Response"][i]): | |
output.append("Other") | |
continue | |
rs = summaries_df["Response"][i].strip().lower() | |
'''Exp1''' | |
if summaries_df["Experiment"][i] == "E1": | |
print("E1", rs) | |
rs = rs.replace('"','') | |
if rs == "round": | |
# vote_1_1 += 1 | |
output.append("Round") | |
elif rs == "spiky": | |
output.append("Spiky") | |
else: | |
output.append("Other") | |
'''Exp2''' | |
elif summaries_df["Experiment"][i] == "E2": | |
# rs = summaries_df["Response"][i].strip() | |
rs = rs.split(' ') | |
print("E2", rs) | |
male, female = 0, 0 | |
for word in rs: | |
if word in female_keyword and male == 0: | |
female = 1 | |
output.append("Female") | |
break | |
if word in male_keyword and female == 0: | |
male = 1 | |
output.append("Male") | |
break | |
if male == 0 and female == 0 : | |
output.append("Other") | |
'''Exp3''' | |
elif summaries_df["Experiment"][i] == "E3": | |
# rs = summaries_df["Response"][i].strip() | |
print("E3", rs) | |
rs = rs.replace('"', '') | |
pair = summaries_df["Factor 2"][i] | |
word1, word2 = pair.split('_') | |
if rs == word1: | |
if len(word1) > len(word2): | |
output.append("Long") | |
else: | |
output.append("Short") | |
elif rs == word2: | |
if len(word1) > len(word2): | |
output.append("Short") | |
else: | |
output.append("Long") | |
else: | |
output.append("Other") | |
'''Exp4''' | |
elif summaries_df["Experiment"][i] == "E4": | |
try: | |
meaning_word = rs.split(";")[4].replace(" ", '') | |
except IndexError: | |
output.append("Other") | |
continue | |
except Exception as e: | |
print(f"Unexpected error: {e}") | |
output.append("Other") | |
continue | |
target = summaries_df["Factor 2"][i].strip().lower() | |
pair = target + "_" + meaning_word | |
print("E4:", pair) | |
if pair in wordpair2code.keys(): | |
output.append(wordpair2code[pair]) | |
else: | |
output.append("Other") | |
'''Exp5''' | |
elif summaries_df["Experiment"][i] == "E5" or summaries_df["Experiment"][i] == "E51": | |
# sentence = summaries_df["Response"][i].strip() | |
item_id = summaries_df["Item"][i] | |
question_id = summaries_df["Question_ID"][i] | |
sti1, sti2 = "", "" | |
if summaries_df["Experiment"][i] == "E51": | |
sti1 = Stimuli1[question_id[0:-1]].lower().replace("...", "") | |
sti2 = Stimuli2[question_id[0:-1]].lower().replace("...", "") | |
verb = item2verb1[item_id].lower() | |
sentence = sti1 + " " + rs.replace(sti1, "") | |
print("E5", verb, sentence) | |
if summaries_df["Experiment"][i] == "E5": | |
sti1 = Stimuli1[question_id].lower().replace("...", "") | |
# print(sti1) | |
sti2 = Stimuli2[question_id].lower().replace("...", "") | |
verb = item2verb2[item_id].lower() | |
sentence = sti2.replace("...","") + " " + rs.replace(sti2, "") | |
print("E5", verb, sentence) | |
doc = nlp1(sentence.replace(" "," ")) | |
# print(doc) | |
# print() | |
verb_token = None | |
for token in doc: | |
# print(token.lemma_) | |
if token.lemma_ == verb: | |
verb_token = token | |
break | |
# exit() | |
if verb_token is None: | |
output.append("Other") | |
print("E5 The target verb is missing from the sentence.") | |
else: | |
pobj, dative = None, None | |
# print(verb_token.children) | |
# exit() | |
for child in verb_token.children: | |
print(child) | |
if (child.dep_ == 'dative' and child.pos_ == "ADP") or (child.text == "to" and child.dep_ == 'prep' and child.pos_ == "ADP"): | |
pobj = child.text | |
if child.dep_ == 'dative': | |
dative = child.text | |
print("E5", pobj, dative) | |
# exit() | |
if pobj: | |
output.append("PO") | |
elif dative: | |
output.append("DO") | |
else: | |
print("Other", sentence, pobj, dative) | |
# exit() | |
output.append("Other") | |
'''Exp6''' | |
elif summaries_df["Experiment"][i] == "E6": | |
sentence = summaries_df["Stimuli 1"][i].strip().lower() | |
print("E6", sentence) | |
doc = nlp1(sentence) | |
subject = "None" | |
obj = "None" | |
# 遍历依存关系,寻找主语和宾语 | |
for token in doc: | |
if token.dep_ == "nsubj": | |
subject = token.text | |
elif token.dep_ == "dobj": | |
obj = token.text | |
print("E6", subject, obj) | |
if subject in rs and obj in rs: | |
print(rs, subject, obj, "Other") | |
output.append("Other") | |
elif subject in rs: | |
print(rs, subject, obj, "VP") | |
output.append("VP") | |
elif obj in rs: | |
print(rs, subject, obj, "NP") | |
output.append("NP") | |
else: | |
print(rs, subject, obj, "Other") | |
output.append("Other") | |
'''Exp7''' | |
elif summaries_df["Experiment"][i] == "E7": | |
# rs = summaries_df["Response"][i].strip().lower() | |
rs = rs.replace(".", "").replace(",", "") | |
print("E7",rs) | |
if rs == "no": | |
output.append("0") | |
elif rs == "yes": | |
output.append("1") | |
else: | |
output.append("Other") | |
'''Exp8''' | |
elif summaries_df["Experiment"][i] == "E8": | |
# rs = summaries_df["Response"][i].strip() | |
print("E8",rs) | |
if "something is wrong with the question" in rs: | |
output.append("1") | |
else: | |
output.append("0") | |
'''Exp9''' | |
elif summaries_df["Experiment"][i] == "E9": | |
male, female = 0, 0 | |
# rs = summaries_df["Response"][i].strip() | |
if "because" in rs: | |
rs = rs.replace("because because","because").split("because")[1] | |
else: | |
rs = rs | |
condition = summaries_df["Factor 2"][i].strip() | |
rs = rs.split(" ") | |
for w in rs: | |
if w in male_keyword and female != 1: | |
male = 1 | |
break | |
if w in female_keyword and male != 1: | |
female = 1 | |
break | |
print("E9", "condition", condition, "male", male, "female", female) | |
if male == 0 and female == 0: | |
output.append('Other') | |
else: | |
if male == 1 and female==0: | |
if condition == "MF": | |
output.append("Subject") | |
elif condition == "FM": | |
output.append("Object") | |
else: | |
output.append("Other") | |
elif female == 1 and male ==0: | |
if condition == "MF": | |
output.append("Object") | |
elif condition == "FM": | |
output.append("Subject") | |
else: | |
output.append("Other") | |
'''Exp10''' | |
elif summaries_df["Experiment"][i] == "E10": | |
# rs = summaries_df["Response"][i].strip() | |
rs = rs.replace(".", "") | |
if rs == "yes": | |
output.append("1") | |
else: | |
output.append("0") | |
else: | |
print("can;t find the Exp:", summaries_df["Experiment"][i]) | |
output.append("NA") | |
# print(output) | |
# exit() | |
'''human''' | |
# self.data = pd.DataFrame(list(zip(summaries_df["Experiment"], summaries_df["Question_ID"], summaries_df["Item"], summaries_df["Response"], summaries_df["Factor 2"], summaries_df["Stimuli 1"], summaries_df["Coding"], output)), | |
# columns=["Experiment", "Question_ID", "Item", "Response", "Factor 2", "Simulate 1","Original_Coding","Coding"]) | |
'''LLM''' | |
self.data = pd.DataFrame(list(zip(summaries_df["Experiment"], summaries_df["Question_ID"], summaries_df["Item"], summaries_df["Response"], summaries_df["Factor 2"], summaries_df["Stimuli 1"], output)), | |
columns=["Experiment", "Question_ID", "Item", "Response", "Factor 2", "Simulate 1","Coding"]) | |
print(self.data.head()) | |
return self.data | |
def calculate_js_divergence(self, file_path_1, file_path_2): | |
""" | |
Calculate the Jensen-Shannon divergence for response distributions between two datasets. | |
- Extracts E5 and E51 pairs, creates new data based on comparison, | |
removes the original E5 and E51, and then calculates the JS divergence between the datasets. | |
Parameters: | |
file_path_1 (str): Path to the first dataset file (Excel format). | |
file_path_2 (str): Path to the second dataset file (CSV format). | |
Returns: | |
float: The average JS divergence across all common Question_IDs. | |
""" | |
# Load the datasets | |
human_df = pd.read_csv(file_path_1, encoding='ISO-8859-1') | |
llm_df = pd.read_csv(file_path_2) | |
def create_e5_entries(df): | |
new_entries = [] | |
for i in range(len(df) - 1): | |
if 'E51' in df.iloc[i]['Experiment']: | |
priming_id = df.iloc[i][0]-1 | |
priming_row_id = df[df.iloc[:, 0] == priming_id].index[0] | |
new_question_id = df.iloc[priming_row_id]['Question_ID'] | |
label = 1 if df.iloc[i]['Coding'] == df.iloc[priming_row_id]['Coding'] else 0 | |
new_entries.append({ | |
'Question_ID': new_question_id, | |
'Response': f'{df.iloc[i]["Coding"]}-{df.iloc[priming_row_id]["Coding"]}', | |
'Coding': label | |
}) | |
return pd.DataFrame(new_entries) | |
# Create new E5 entries for both datasets | |
human_e5 = create_e5_entries(human_df) | |
llm_e5 = create_e5_entries(llm_df) | |
# Remove E5 and E51 entries from both datasets | |
human_df = human_df[~human_df['Question_ID'].str.contains('E5')] | |
llm_df = llm_df[~llm_df['Question_ID'].str.contains('E5')] | |
# Append new E5 entries to the cleaned dataframes | |
human_df = pd.concat([human_df, human_e5], ignore_index=True) | |
llm_df = pd.concat([llm_df, llm_e5], ignore_index=True) | |
### Calculate Average JS Divergence ### | |
# Extract the relevant columns for JS divergence calculation | |
human_responses = human_df[['Question_ID', 'Coding']] | |
llm_responses = llm_df[['Question_ID', 'Coding']] | |
# Get unique Question_IDs present in both datasets | |
common_question_ids = set(human_responses['Question_ID']).intersection(set(llm_responses['Question_ID'])) | |
# Initialize a list to store JS divergence for each Question_ID | |
js_divergence_list = [] | |
js_divergence ={} | |
# Calculate JS divergence for each common Question_ID | |
for q_id in common_question_ids: | |
# Get response distributions for the current Question_ID in both datasets | |
human_dist = human_responses[human_responses['Question_ID'] == q_id]['Coding'].value_counts(normalize=True) | |
llm_dist = llm_responses[llm_responses['Question_ID'] == q_id]['Coding'].value_counts(normalize=True) | |
# Reindex the distributions to have the same index, filling missing values with 0 | |
all_responses = set(human_dist.index).union(set(llm_dist.index)) | |
human_dist = human_dist.reindex(all_responses, fill_value=0) | |
llm_dist = llm_dist.reindex(all_responses, fill_value=0) | |
# Calculate JS divergence and add to the list | |
js_div = jensenshannon(human_dist, llm_dist, base=2) | |
experiment_id = q_id.split('_')[1] | |
if experiment_id not in js_divergence: | |
js_divergence[experiment_id] = [] | |
js_divergence[experiment_id].append(js_div) | |
js_divergence_list.append(js_div) | |
#js_divergence[q_id] = js_div | |
# Calculate the average JS divergence | |
# JS per experiment | |
avg_js_divergence_per_experiment = {exp: 1- np.nanmean(divs) for exp, divs in js_divergence.items()} | |
print(avg_js_divergence_per_experiment) | |
# JS overall | |
avg_js_divergence = 1 - np.nanmean(js_divergence_list) | |
print("avg_js_divergence:", avg_js_divergence) | |
return avg_js_divergence | |
def evaluate_humanlike(self, summaries_df: object, human_data_path: object, result_save_path: object) -> object: | |
''' | |
evaluate humanlike score | |
1. code the result | |
2. comput the similaritirs between human and model | |
process model responses''' | |
'''coding human data''' | |
# self.huamn_df = pd.read_csv(human_data_path) | |
# self.data = self.code_results(self.huamn_df) | |
#save_path = human_data_path.replace('.csv','_coding.csv') | |
#human_save_path = "./src/datasets/coding_human.xlsx" | |
# if save_path is not None: | |
# print(f'Save human coding results to {save_path}') | |
# fpath = Path(save_path) | |
# fpath.parent.mkdir(parents=True, exist_ok=True) | |
# self.data.to_csv(fpath) | |
'''coding llm data''' | |
save_path = result_save_path.replace('.csv','_coding.csv') | |
self.llm_df = self.code_results_llm(summaries_df) | |
if save_path is not None: | |
print(f'Save LLM coding results to {save_path}') | |
fpath = Path(save_path) | |
fpath.parent.mkdir(parents=True, exist_ok=True) | |
self.llm_df.to_csv(fpath) | |
# file_path_1 = '/Users/simon/Downloads/coding_human.xlsx' | |
# file_path_2 = '/Users/simon/Downloads/Meta-Llama-3.1-70B-Instruct_coding.csv' | |
avg_js_divergence = self.calculate_js_divergence(human_data_path, save_path) | |
return avg_js_divergence | |
def evaluate_hallucination(self, summaries_df): | |
""" | |
Evaluate the hallucination rate in summaries. Updates the 'scores' attribute | |
of the instance with the computed scores. | |
Args: | |
summaries_df (DataFrame): DataFrame containing source docs and summaries. | |
Returns: | |
list: List of hallucination scores. Also updates the 'scores' attribute of the instance. | |
""" | |
hem_scores = [] | |
sources = [] | |
summaries = [] | |
source_summary_pairs = util.create_pairs(summaries_df) | |
'''评价模型结果''' | |
for doc, summary in tqdm(source_summary_pairs, desc="Evaluating Humanlikeness"): | |
if util.is_summary_valid(summary): | |
try: | |
summary = summary.replace('<bos>','').replace('<eos>','') | |
score = self.model.predict([doc, summary])# [0] | |
if not isinstance(score, float): | |
try: | |
score = score.item() | |
except: | |
logging.warning(f"Score type mismatch: Expected float, got {type(score)}.") | |
continue | |
hem_scores.append(score) | |
sources.append(doc) | |
summaries.append(summary) | |
except Exception as e: | |
logging.error(f"Error while running HEM: {e}") | |
raise | |
self.scores = hem_scores | |
eval_results = {'source': sources, 'summary': summaries, 'HEM scores': hem_scores} | |
return hem_scores, eval_results | |
# for doc, summary in tqdm(source_summary_pairs, desc="Evaluating hallucinations"): | |
# if util.is_summary_valid(summary): | |
# try: | |
# # summary_pieces = summary.split('\n') | |
# # summary = summary_pieces[0] if len(summary_pieces[0].strip()) > 0 else summary_pieces[1] | |
# summary = summary.replace('<bos>','').replace('<eos>','') | |
# # print([doc, summary]) | |
# # print(self.model.predict([doc, summary])) | |
# score = self.model.predict([doc, summary])# [0] | |
# if not isinstance(score, float): | |
# try: | |
# score = score.item() | |
# except: | |
# logging.warning(f"Score type mismatch: Expected float, got {type(score)}.") | |
# continue | |
# hem_scores.append(score) | |
# sources.append(doc) | |
# summaries.append(summary) | |
# except Exception as e: | |
# logging.error(f"Error while running HEM: {e}") | |
# raise | |
# self.scores = hem_scores | |
# eval_results = {'source': sources, 'summary': summaries, 'HEM scores': hem_scores} | |
# return hem_scores, eval_results | |
def compute_factual_consistency_rate(self, threshold=0.5): | |
""" | |
Compute the factual consistency rate of the evaluated summaries based on | |
the previously calculated scores. This method relies on the 'scores' | |
attribute being populated, typically via the 'evaluate_hallucination' method. | |
Returns: | |
float: Factual Consistency Rate. Also updates the 'factual_consistency_rate' | |
and 'hallucination_rate' attributes of the instance. | |
Raises: | |
ValueError: If scores have not been calculated prior to calling this method. | |
""" | |
if not self.scores: | |
error_msg = "Scores not calculated. Call evaluate_hallucination() first." | |
logging.error(error_msg) | |
raise ValueError(error_msg) | |
# Use threshold of 0.5 to compute factual_consistency_rate | |
num_above_threshold = sum(score >= threshold for score in self.scores) | |
num_total = len(self.scores) | |
if not num_total: | |
raise ValueError("No scores available to compute factual consistency rate.") | |
self.factual_consistency_rate = (num_above_threshold / num_total) * 100 | |
self.hallucination_rate = 100 - self.factual_consistency_rate | |
return self.factual_consistency_rate | |