lean4-compile / update_lean4_kv.py
rookiemango's picture
Upload folder using huggingface_hub
1d2a897 verified
raw
history blame
11 kB
import os
import subprocess
from argparse import ArgumentParser
import json
from concurrent.futures import ThreadPoolExecutor
from tqdm import tqdm
import glob
import tempfile
import random
def wrapped_function(item):
results = []
passed = 0
total = 0
temp_dir = tempfile.gettempdir()
temp_file = os.path.join(temp_dir, f"test.lean")
with open(temp_file, "w") as f:
f.write(item['cmd'])
# Rest of the function code...
# Process the item using the temporary file
# ...
# Clean up the temporary file
data = '{"path": "%s", "allTactics": true}' %(temp_file)
command = 'echo \'%s\' | lake exe repl' % data
try:
result = subprocess.run(command, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout = result.stdout.decode('utf-8')
stderr = result.stderr.decode('utf-8')
# stdout = result.stdout.decode('utf-8')
json_stdout = json.loads(stdout)
if "messages" not in json_stdout.keys():
passed += 1
# results.append({'item': item['content'], 'stdout': stdout, 'stderr': stderr, 'status': 'pass'})
results.append({ 'stdout': stdout, 'stderr': stderr, 'status': 'pass'})
except subprocess.CalledProcessError as e:
# results.append({'item': item['content'], 'error': str(e), 'status': 'nopass'})
results.append({ 'error': str(e), 'status': 'nopass'})
total += 1
pass_rate = passed / (passed + total) * 100
return {'results': results, 'pass_rate': pass_rate}
# Set the directory where your .lean files are located
# Get a list of all .lean files in the directory
# lean_files = [f for f in os.listdir(directory) if f.endswith(".lean")]
# lean_files = ["test/file.lean"]
def single(command_list, args):
results = []
passed = 0
total = 0
for item in tqdm(command_list):
with open("test/test.lean", "w", encoding = 'utf-8') as f:
f.write(item['cmd'])
data = '{"path": "test/test.lean", "allTactics": true}'
# data = '{"cmd": "%s", "allTactics": true}' % item['cmd']
command = 'echo \'%s\' | lake exe repl' % data
try:
# process = subprocess.Popen(['lake', 'exe', 'repl'], stdin=subprocess.PIPE, stdout=subprocess.PIPE,
# stderr=subprocess.PIPE)
# stdout, stderr = process.communicate(input=data.encode(encoding='utf-8'))
# stdout = stdout.decode('utf-8')
result = subprocess.run(command, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout = result.stdout.decode('utf-8')
json_stdout = json.loads(stdout)
if "messages" not in json_stdout.keys():
passed += 1
stderr = result.stderr.decode('utf-8')
results.append({
# 'item': item['content'],
'stdout': stdout,
'stderr': stderr,
'status': 'pass'
})
except subprocess.CalledProcessError as e:
results.append({
# 'item': item['content'],
'error': str(e),
'status': 'nopass'
})
total += 1
# Calculate pass rate
pass_rate = passed / total * 100
print(pass_rate)
# Save results to a JSON file
with open('results.json', 'w') as f:
json.dump({'results': results, 'pass_rate': pass_rate}, f, indent=2, ensure_ascii=False)
def multi(command_list, output_path, k ):
results = []
passed = 0
total = 0
def execute_command(item, index):
temp_dir = '/opt/jianqiao'
def filter_json(json_data):
filtered_data = {}
for key in json_data.keys():
if key in ['question', 'answer', 'total output', 'results']:
filtered_data[key] = json_data[key]
return filtered_data
result_dict = filter_json(item)
result_dict['results'] = []
for i, cmd in enumerate(item['cmd']):
temp_file = os.path.join(temp_dir,f"{index}_test_{i}.lean") # Ensure unique filenames
with open(temp_file, "w") as f:
f.write(cmd)
data = '{"path": "%s", "allTactics": true}' % temp_file
command = f'echo \'{data}\' | lake exe repl'
try:
result = subprocess.run(command, shell=True, check=True,timeout=600, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout = json.loads(result.stdout.decode('utf-8'))
stderr = result.stderr.decode('utf-8')
except subprocess.TimeoutExpired as e:
result_item = {'error': str(e), 'status': 'nopass_limit'}
except subprocess.CalledProcessError as e:
result_item = {'error': str(e), 'status': 'nopass_error'}
else:
if "messages" not in stdout and not len(stderr):
result_item = {'stdout': stdout, 'stderr': stderr, 'status': 'pass' }
elif not len(stderr) and "messages" in stdout:
flag = 0
for me in stdout['messages']:
import pdb
pdb.set_trace()
if me['severity'] == 'error':
flag = 1
start_line = me['pos']['line'] - 1
current_column =me['pos']['column'] -1
for line_n in range(start_line - 1, 0 , -1):
line_len = len(cmd.split('\n')[line_n])
current_column += line_len + 1
if not line_len:
break
result_item = {'stdout': stdout, 'stderr': stderr, 'status': 'nopass', 'string_pos':current_column}
break
if not flag :
result_item = {'stdout': stdout, 'stderr': stderr, 'status': 'pass'}
else:
assert len(stderr)
result_item = {'stdout': stdout, 'stderr': stderr, 'status': 'nopass', 'string_pos': 0 }
result_dict['results'].append(result_item)
return result_dict
total = len(command_list)
with ThreadPoolExecutor(max_workers=1) as executor:
futures = [executor.submit(execute_command, cmd, i) for i, cmd in enumerate(command_list)]
for future in tqdm(futures, total=total, desc="Processing Commands"):
result = future.result()
results.append(result)
# if result['status'] == 'pass':
# passed += 1
def calculate_pass(result_list, k):
pass_1_count = 0
pass_k_count = 0
for result in result_list:
results = result.get('results', [])
if results:
for j in range(min(1, len(results))):
if results[j].get('status') == 'pass':
pass_1_count += 1
break
for j in range(min(k, len(results))):
if results[j].get('status') == 'pass':
pass_k_count += 1
break
pass_1 = pass_1_count / len(result_list) if result_list else 0
pass_k = pass_k_count / len(result_list) if result_list else 0
return pass_1, pass_k
pass_1, pass_k = calculate_pass(results, k)
print("Pass@1:", pass_1)
print(f"Pass@{k}:", pass_k)
# pass_rate = (passed / total) * 100
# print(f"total test: {total}")
# print(f"Pass rate: {pass_rate}%")
output_file = f"pass_rate_results/{output_path}"
# Create the directory if it doesn't exist
os.makedirs(os.path.dirname(output_file), exist_ok=True)
with open(f"{output_file}", 'w') as f:
json.dump({'results': results, 'pass_1': pass_1, f"pass_{k}":pass_k}, f, indent=2, ensure_ascii=False)
import re
def remove_simp_pattern_from_end(s):
pattern = r'@\[simp\s*.*?\]$'
return re.sub(pattern, '', s)
def update_dict(lean_kv):
update_kv = {}
for k, v in lean_kv.items():
# print(k)
# print(k.split("#align")[0])
update_kv[k.split("#align")[0]] = v
# Write the combined data to a new JSON file
with open('up_lean4_kv.json', 'w') as output_file:
json.dump(update_kv, output_file, indent=4)
return update_kv
def find_key(lean_kv):
update_kv = {}
for k, v in lean_kv.items():
if "theorem odd_sub" in k:
import pdb
pdb.set_trace()
import os
import json
import tqdm
def main(args):
# Define the directory and file names
file_dir = '../formal-align/data'
file_names = ['FormL4_basic_test.json', 'FormL4_random_test.json', 'FormL4_train.json']
# Initialize a list to store all items from JSON files
all_items = []
# Loop through each file and load its content
for file_name in file_names:
file_path = os.path.join(file_dir, file_name)
with open(file_path, 'r', encoding='utf-8') as file:
data = json.load(file)
if isinstance(data, list):
all_items.extend(data) # Assuming each file contains a list of items
else:
all_items.append(data) # If the file contains a single item
# Load the original key-value pairs from the file
filename = 'data/lean4_kv.json'
with open(filename, 'r') as file:
original_kv = json.load(file)
new_kv = {}
# Iterate over each item in all_items and update the new_kv dictionary
for item in tqdm.tqdm(all_items):
formal = item['formal']
for k, v in list(original_kv.items()):
if formal in k:
new_kv[formal] = v
del original_kv[k]
break # Exit the inner loop since a match is found
# Save the updated key-value pairs to a new file
output_filename = 'data/updated_lean4_kv.json'
with open(output_filename, 'w') as file:
json.dump(new_kv, file, indent=4)
print(f"Updated key-value pairs saved to: {output_filename}")
if __name__ == '__main__':
arg_parser = ArgumentParser()
arg_parser.add_argument('--data_path', type=str,
default='data/grade-school-math-master/grade_school_math/data/test.jsonl')
arg_parser.add_argument('--input_path', type=str, default='')
arg_parser.add_argument('--cuda_num', type=int, default=8)
arg_parser.add_argument('--k', type=int, default=1)
arg_parser.add_argument('--output_path', type=str, default='total.json')
arg_parser.add_argument('--generate_method', type=str,
choices=['single', 'sft', 'comp', 'self_consistency', 'single_consistency'])
arg_parser.add_argument('--method', type=str, choices=['main', 'test', 'get_data'])
args = arg_parser.parse_args()
main(args)