from genericpath import sameopenfile from os import remove from random import random import re import tqdm import json import pdb import io import os import statistics import random;random.seed(42) def filter_less_not_long_data(): training_data = [] with open("data/lean4_random/5k_first_filtered.json", "r") as f: training_data += json.load(f) with open("data/lean4_random/5k_second_filtered.json", "r") as f: training_data += json.load(f) with open("data/lean4_random/5k_third_filtered.json", "r") as f: training_data += json.load(f) # Filter and save filtered data with open("data/lean4_random/15k_filtered.json", "w") as f: handled_json_list = [item for item in training_data if len(item['model_response']) > 400 and len(item['statement_poof']) > 100] print("Filtered all length:", len(handled_json_list)) json.dump(handled_json_list, f, ensure_ascii=False, indent=2) with open("data/lean4_random/1k_test.json", "r") as f: random_test_data = json.load(f) handled_json_list = [item for item in random_test_data if len(item['model_response']) > 400 and len(item['statement_poof']) > 100] with open("data/lean4_random/1k_test_filtered.json", "w") as f: print("Filtered all length:", len(handled_json_list)) json.dump(handled_json_list, f, ensure_ascii=False, indent=2) with open("data/lean4_basic/1k_test_filtered.jsonl", "r") as f: random_test_data = json.load(f) handled_json_list = [item for item in random_test_data if len(item['model_response']) > 400 and len(item['statement_poof']) > 100] with open("data/lean4_basic/1k_test_filtered.jsonl", "w") as f: print("Filtered all length:", len(handled_json_list)) json.dump(handled_json_list, f, ensure_ascii=False, indent=2) def statistics_lean4_training_test(): import json with open("data/lean4_random/1k_test.json", "r") as f: test_data = json.load(f) test_data = [item['statement_poof'][:-2] for item in test_data] # Function to filter items based on existence in test data def handle(json_list): handled_json_list = [item for item in json_list if len(item['model_response']) > 400 and len(item['statement_poof']) > 100] statement_proof_lengths = [len(item['statement_poof']) for item in handled_json_list] model_output_lengths = [len(item['model_response']) for item in handled_json_list] # Calculate statistics statement_proof_stats = { 'mean': statistics.mean(statement_proof_lengths), 'median': statistics.median(statement_proof_lengths), 'min': min(statement_proof_lengths), 'max': max(statement_proof_lengths) } model_output_stats = { 'mean': statistics.mean(model_output_lengths), 'median': statistics.median(model_output_lengths), 'min': min(model_output_lengths), 'max': max(model_output_lengths) } # Print results print(f"total length is {len(handled_json_list)}") print(f"Statistics for 'statement_proof':\n {json.dumps(statement_proof_stats, indent=4)}") print(f"\nStatistics for 'model_output':\n {json.dumps(model_output_stats, indent=4)}") # Initialize minimum length to a large value and the hold strings min_sp_len = float('inf') min_sp_str = "" min_mo_len = float('inf') min_mo_str = "" # Extract lengths of values and find min len strings statement_proof_lengths = [] model_output_lengths = [] for item in handled_json_list: sp_len = len(item['statement_poof']) mo_len = len(item['model_response']) statement_proof_lengths.append(sp_len) model_output_lengths.append(mo_len) if sp_len < min_sp_len: min_sp_len = sp_len min_sp_str = item['statement_poof'] if mo_len < min_mo_len: min_mo_len = mo_len min_mo_str = item['model_response'] print(min_sp_str) print(min_mo_str) # Filter and save filtered data # training_data = [] # with open("data/lean4_random/5k_first_filtered.json", "r") as f: # training_data += json.load(f) # with open("data/lean4_random/5k_second_filtered.json", "r") as f: # training_data += json.load(f) # with open("data/lean4_random/5k_third_filtered.json", "r") as f: # training_data += json.load(f) # Filter and save filtered data # print(f"statisticals for training data") # handle(training_data) # with open("data/lean4_random/15k_filtered.json", "w") as f: # handled_json_list = [item for item in training_data if len(item['model_response']) > 400 and len(item['statement_poof']) > 100] # print("Filtered all length:", len(handled_json_list)) # json.dump(training_data, f, ensure_ascii=False, indent=2) # print("Filtered first file length:", len(filtered_second_5k)) # with open("data/lean4_random/1k_test.json", "r") as f: # random_test_data = json.load(f) # with open("data/lean4_random/1k_test_filtered.json", "w") as f: # print("Filtered all length:", len(training_data)) # json.dump(random_test_data, f, ensure_ascii=False, indent=2) # print(f"statisticals for random test data") # handle(random_test_data) PROMPT_DICT = { "wild": ( "Statement and proof in natural language:\n\n" "# Problem:\n{question}\n\n" "# Proof:\n{answer}\n\n" "Translate the statement and proof in natural language to lean4:" ), "lean4": ( "Statement and proof in natural language:\n\n" "{model_response}\n\n" "Translate the statement and proof in natural language to lean4:" ), "prompt_no_input": ( "Below is an instruction that describes a task. " "Write a response that appropriately completes the request.\n\n" "### Instruction:\n{instruction}\n\n### Response:" ), } with open("data/wild/wild_sample1k.jsonl", "r") as f: basic_test_data = json.load(f) basic_list = [] for item in basic_test_data: list_item = { "model_response": PROMPT_DICT['wild'].format(question = item['question'], answer = item['answer']), "statement_poof": PROMPT_DICT['wild'].format(question = item['question'], answer = item['answer']), } basic_list.append(list_item) # print(f"statisticals for basic test data") handle(basic_list) def filtered(): import json with open("data/lean4_basic/1k_test.jsonl", "r") as f: test_data = json.load(f) test_data = [item['statement_poof'] for item in test_data] # Function to filter items based on existence in test data def filter_items(data, test_data): filtered_data = [item for item in tqdm.tqdm(data) if item['statement_poof'][:-2] not in test_data] return filtered_data # Filter and save filtered data # with open("data/lean4_random/5k_first.json", "r") as f: # second_5k = json.load(f) # filtered_second_5k = filter_items(second_5k, test_data) # with open("data/lean4_random/5k_first_filtered.json", "w") as f: # json.dump(filtered_second_5k, f, ensure_ascii=False, indent=2) # print("Filtered first file length:", len(filtered_second_5k)) # with open("data/lean4_random/5k_second.json", "r") as f: # second_5k = json.load(f) # filtered_second_5k = filter_items(second_5k, test_data) # with open("data/lean4_random/5k_second_filtered.json", "w") as f: # json.dump(filtered_second_5k, f, ensure_ascii=False, indent=2) # print("Filtered second file length:", len(filtered_second_5k)) # with open("data/lean4_random/5k_third.json", "r") as f: # second_5k = json.load(f) # filtered_second_5k = filter_items(second_5k, test_data) # with open("data/lean4_random/5k_third_filtered.json", "w") as f: # json.dump(filtered_second_5k, f, ensure_ascii=False, indent=2) # print("Filtered third file length:", len(filtered_second_5k)) with open("data/lean4_random/1k_test.json", "r") as f: second_5k = json.load(f) filtered_second_5k = filter_items(second_5k, test_data) with open("data/lean4_random/1k_test_filtered.json", "w") as f: json.dump(filtered_second_5k, f, ensure_ascii=False, indent=2) print("Filtered first file length:", len(filtered_second_5k)) def insert_label_for_autoformalization(): input_lists = ["data/lean4_statement_translate/15k_state_problem_translation.json"] for input_file in input_lists: with open(input_file, "r") as f: test_data = json.load(f) for item in test_data: item['task']='statement_form' with open("data/lean4_statement_translate/15k_state_problem_translation_statement_form.json", "w") as f: json.dump(test_data, f, ensure_ascii=False, indent=2) input_lists = ["data/lean4_statement_translate/15k_state_problem_translation.json"] for input_file in input_lists: with open(input_file, "r") as f: test_data = json.load(f) for item in test_data: item['task']='statementproof_inform' with open("data/lean4_statement_translate/15k_state_problem_translation_statementproof_inform.json", "w") as f: json.dump(test_data, f, ensure_ascii=False, indent=2) input_lists = ["all_theorem.jsonl"] for input_file in input_lists: with open(input_file, "r") as f: test_data = json.load(f) for item in test_data: item['task']='solver' with open("data/all_theorem_solver.jsonl", "w") as f: json.dump(test_data, f, ensure_ascii=False, indent=2) def _make_r_io_base(f, mode: str): if not isinstance(f, io.IOBase): f = open(f, mode=mode) return f PROMPT_DICT = { "lean4": ( "Statement and proof in natural language:\n\n" "{statement_text}\n\n" "Translate the statement and proof in natural language to lean4:" ), "prompt_no_input": ( "Below is an instruction that describes a task. " "Write a response that appropriately completes the request.\n\n" "### Instruction:\n{instruction}\n\n### Response:" ), } def jload(f, mode="r"): """Load a .json file into a dictionary.""" f = _make_r_io_base(f, mode) jdict = json.load(f) f.close() return jdict def show_example(): item = 'data/lean4_random/5k_first.json' list_data_dict = [] try: list_data_dict += jload(item) except BaseException: with open(item, 'r') as f: lines = f.readlines() list_data_dict += [json.loads(line.strip()) for line in lines] # list_data_dict = random.sample(list_data_dict, len(list_data_dict)) prompt_lean4 = PROMPT_DICT["lean4"] list_data_dict = [{'instruction':prompt_lean4.format(statement_text = data['model_response']), 'input':'', 'output':data['statement_poof']} for data in list_data_dict] for item in list_data_dict: if "odd_iff" in item['output']: # if len(item['output']) > 150: print("input\n", item['instruction']) print('*'*10) print("out\n", item['output']) pdb.set_trace() def handle_lean4_compiler(): def filter_json(json_data): filtered_data = {} for key in json_data.keys(): if key in ['question', 'answer', 'total output', 'results']: filtered_data[key] = json_data[key] return filtered_data input_file = '../repl/pass_rate_results/lean4_15k_train_output.json' output_file = '../repl/pass_rate_results/lean4_15k_train_output_filter.json' with open(input_file, "r") as f: test_data = json.load(f) json_list = test_data['results'] for id in range(len(json_list)): json_list[id] = filter_json(json_list[id]) test_data['results'] = json_list with open(f"{output_file}", 'w') as f: json.dump(test_data, f,indent=2, ensure_ascii=False) def locate(location=14): from transformers import AutoTokenizer # Example string text = "Hello, how are you?" # Initialize the tokenizer tokenizer = AutoTokenizer.from_pretrained("../models/Mistral-7B-v0.1/") # Tokenize the string tokens = tokenizer("Hello, how are you?", padding=False, add_special_tokens=False).input_ids print(tokens) tokens = tokenizer(" Hello, how are you?", padding=False, add_special_tokens=False).input_ids print(tokens) # partial_tokens = tokenizer(text[:location].split()[-1], padding=False, add_special_tokens=False).input_ids # def find_subarray(lst, sub_lst): # for i in range(len(lst) - len(sub_lst) + 1): # if lst[i:i+len(sub_lst)] == sub_lst: # return i+len(sub_lst) # assert True # print(find_subarray(tokens, partial_tokens)) def handle_jsonfile(json_list): def get_label(result,output): if result['status'] == 'pass': return -1 else: return -2 assert True all_results = [] idx = 0 for item in json_list: dict_item = { "idx": idx, "question": item['question'], "input": item['question'], "ground_truth_cot": item['answer'], "ground_truth":item['answer'], "outputs": [ { "response": item['total output'][id].split("#align")[0], "label": get_label(item['results'][id], item['total output'][id].split("#align")[0]), } for id in range(len(item['results'])) ], } idx += 1 all_results.append(dict_item) return all_results def read_jsonl(path: str): try: with open(path) as fh: return [json.loads(line) for line in fh.readlines() if line] except: return json.load(open(path, 'r', encoding= 'utf-8')) def ensure_parent_dir_exists(file_path): # Extract the directory part of the file path parent_dir = os.path.dirname(file_path) # Check if the directory exists, and create it if it doesn't if not os.path.exists(parent_dir): os.makedirs(parent_dir) print(f"Created directory: {parent_dir}") else: print(f"Directory already exists: {parent_dir}") def save_jsonl(data, path): ensure_parent_dir_exists(path) with open(path, 'w') as f: json.dump(data, f, indent=2, ensure_ascii=False) def get_model_solutions_easy(data_dir): import pdb examples = [] for dd in data_dir.split(","): examples += read_jsonl(dd)['results'] examples = handle_jsonfile(examples) print(f"{len(examples)} examples, each with {len(examples[0]['outputs'])} solutions") pass1 = 0 pass5 = 0 for item in examples: pass1 += item['outputs'][0]['label'] == -1 for output in item['outputs']: if output['label'] == -1: pass5 += 1 break print(pass1/len(examples)) print(pass5/len(examples)) return examples def get_q_a(text): # Extract the content after "# Question:\n" question = re.search(r"# Question:\n(.*?)\n\n# Answer:", text, re.S) if question: question_content = question.group(1).strip() # print("Question Content:") # print(question_content) # Extract the content after "# Answer:\n" answer = re.search(r"# Answer:\n(.*?)(\n\n|$)", text, re.S) if answer: answer_content = answer.group(1).strip() # print("\nAnswer Content:") # print(answer_content) return question_content, answer_content def from_compiler_to_lean4(data_dir): # examples = [] # for dd in data_dir.split(","): # examples += read_jsonl(dd)['results'] # sample_examples = random.sample(examples, 1000) # # Save results to a JSON file # with open(f'sampled_compilation_results/sample_1k.json', 'w') as f: # json.dump(sample_examples, f, indent=2, ensure_ascii=False) examples = read_jsonl(data_dir) pass1 = 0 passk = 0 save_data_autoform = [] save_data_qa = [] # examples = handle_jsonfile(examples) for id in range(len(examples['results'])): result = examples['results'][id] pass1 += 1 if result['results'][0]['status'] == 'pass' else 0 flag = False for idx , item in enumerate(result['results']): if item['status'] == 'pass': q,a = get_q_a(result['question']) save_data_autoform.append( { "informal_question": q, "informal_answer": a, "formal_statement_proof": result['total output'][idx], "task": "autoform" } ) save_data_qa.append( { "informal_question": q, "informal_answer": a, "task": "qa" } ) if not flag: passk += 1 flag = True print(pass1/len(examples['results'])) print(passk/len(examples['results'])) print("len", len(save_data_autoform), len(examples['results'])) save_jsonl(data = save_data_autoform, path= "autoform_data/math_train/g-mistral-qa-gsm8kmath-autoform-forml4-d-mathtrain-t-autoform.jsonl") # save_jsonl(data = save_data_autoform, path= "autoform_data/math_train/g-mistral-qa-gsm8kmath-autoform-forml4-d-mathtrain-t-autoform.jsonl") def show_example_compilation_results(data_dir): # examples = [] # for dd in data_dir.split(","): # examples += read_jsonl(dd)['results'] # sample_examples = random.sample(examples, 1000) # # Save results to a JSON file # with open(f'sampled_compilation_results/sample_1k.json', 'w') as f: # json.dump(sample_examples, f, indent=2, ensure_ascii=False) item = read_jsonl(data_dir) # examples = handle_jsonfile(examples) for id in range(len(item['results'])): result = item['results'][id] for item in result['results']: if item['status'] == 'pass': print( result['total output'][id]) # print(result['cmd'][id].split('\n\n')[-1]) # if item['results'][id]['status'] == 'pass': # print(item['results'][id]['stdout']['tactics']) # print( item['total output'][id]) # pdb.set_trace() # print(item['results'][id]['stdout']['messages']) # print(item['results'][id]['stdout']['env']) # print(f"{len(examples)} examples, each with {len(examples[0]['outputs'])} solutions") # pass1 = 0 # pass5 = 0 # for item in examples: # pass1 += item['outputs'][0]['label'] == -1 # for output in item['outputs']: # if output['label'] == -1: # pass5 += 1 # break # print(pass1/len(examples)) # print(pass5/len(examples)) return examples def get_novel_premises(): training_data = json.load(open("data/lean4_random/15k_filtered.json", 'r', encoding='utf8')) random_data = json.load(open("data/lean4_random/1k_test_filtered.json", 'r', encoding='utf8')) all_data = training_data + random_data import pdb pdb.set_trace() def remove_align(): random_data = json.load(open("data/lean4_random/1k_test_filtered.json", 'r', encoding='utf8')) for item in random_data: input = item['model_response'] def handle_math(): input = 'data/test/gsm8k/train.jsonl' random_data = json.load(open(input, 'r', encoding='utf8')) new_data = [] for item in random_data: new_data.append( { 'informal_question': item['question'], 'informal_answer':item['answer'], 'task' : 'qa', } ) save_path = 'autoform_data/qa_gsm8k_train.json' print(f"len of {len(new_data)} is saved") with open(save_path, "w") as json_file: json.dump(new_data, json_file, indent=4 , ensure_ascii= False) if __name__ == '__main__': from_compiler_to_lean4("../repl/pass_rate_results/math_train/1/10pass10.jsonl") # handle_math() # remove_align() # get_novel_premises() # show_example_compilation_results(data_dir = '../repl/pass_rate_results/gpt_result/gpt4_wild.json') # locate() # handle_lean4_compiler() # show_example() # insert_label_for_autoformalization() # filtered() # statistics_lean4_training_test() # filter_less_not_long_data()