lean4-compile / pass_rate_test.py
rookiemango's picture
Upload folder using huggingface_hub
dddc1ae verified
import os
import subprocess
from argparse import ArgumentParser
import json
from concurrent.futures import ThreadPoolExecutor
from tqdm import tqdm
import tempfile
def wrapped_function(item):
results = []
passed = 0
total = 0
temp_dir = tempfile.gettempdir()
temp_file = os.path.join(temp_dir, f"test.lean")
with open(temp_file, "w") as f:
f.write(item['cmd'])
# Rest of the function code...
# Process the item using the temporary file
# ...
# Clean up the temporary file
data = '{"path": "%s", "allTactics": true}' %(temp_file)
command = 'echo \'%s\' | lake exe repl' % data
try:
result = subprocess.run(command, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout = result.stdout.decode('utf-8')
stderr = result.stderr.decode('utf-8')
# stdout = result.stdout.decode('utf-8')
json_stdout = json.loads(stdout)
if "messages" not in json_stdout.keys():
passed += 1
# results.append({'item': item['content'], 'stdout': stdout, 'stderr': stderr, 'status': 'pass'})
results.append({ 'stdout': stdout, 'stderr': stderr, 'status': 'pass'})
except subprocess.CalledProcessError as e:
# results.append({'item': item['content'], 'error': str(e), 'status': 'nopass'})
results.append({ 'error': str(e), 'status': 'nopass'})
total += 1
pass_rate = passed / (passed + total) * 100
return {'results': results, 'pass_rate': pass_rate}
# Set the directory where your .lean files are located
# Get a list of all .lean files in the directory
# lean_files = [f for f in os.listdir(directory) if f.endswith(".lean")]
# lean_files = ["test/file.lean"]
def single(command_list):
results = []
passed = 0
total = 0
for item in tqdm(command_list):
with open("test/test.lean", "w", encoding = 'utf-8') as f:
f.write(item['cmd'])
data = '{"path": "test/test.lean", "allTactics": true}'
# data = '{"cmd": "%s", "allTactics": true}' % item['cmd']
command = 'echo \'%s\' | lake exe repl' % data
import pdb
pdb.set_trace()
try:
# process = subprocess.Popen(['lake', 'exe', 'repl'], stdin=subprocess.PIPE, stdout=subprocess.PIPE,
# stderr=subprocess.PIPE)
# stdout, stderr = process.communicate(input=data.encode(encoding='utf-8'))
# stdout = stdout.decode('utf-8')
result = subprocess.run(command, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout = result.stdout.decode('utf-8')
json_stdout = json.loads(stdout)
if "messages" not in json_stdout.keys():
passed += 1
stderr = result.stderr.decode('utf-8')
results.append({
# 'item': item['content'],
'stdout': stdout,
'stderr': stderr,
'status': 'pass'
})
except subprocess.CalledProcessError as e:
results.append({
# 'item': item['content'],
'error': str(e),
'status': 'nopass'
})
total += 1
# Calculate pass rate
pass_rate = passed / total * 100
print(pass_rate)
# Save results to a JSON file
with open('results.json', 'w') as f:
json.dump({'results': results, 'pass_rate': pass_rate}, f, indent=2, ensure_ascii=False)
def multi(command_list):
results = []
passed = 0
total = 0
def execute_command(item):
temp_dir = '/data/tmp'
temp_file = os.path.join(temp_dir, f"test_{item['index']}.lean") # Ensure unique filenames
with open(temp_file, "w") as f:
f.write(item['cmd'])
data = '{"path": "%s", "allTactics": true}' % temp_file
command = f'echo \'{data}\' | lake exe repl'
try:
result = subprocess.run(command, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout = result.stdout.decode('utf-8')
stderr = result.stderr.decode('utf-8')
if "messages" not in json.loads(stdout):
return {'stdout': stdout, 'stderr': stderr, 'status': 'pass'}
else:
return {'stdout': stdout, 'stderr': stderr, 'status': 'nopass'}
except subprocess.CalledProcessError as e:
return {'error': str(e), 'status': 'nopass'}
os.remove(temp_file)
total = len(command_list)
with ThreadPoolExecutor(max_workers=32) as executor:
futures = [executor.submit(execute_command, {'index': i, 'cmd': cmd['cmd']}) for i, cmd in enumerate(command_list)]
for future in tqdm(futures, total=total, desc="Processing Commands"):
result = future.result()
results.append(result)
if result['status'] == 'pass':
passed += 1
pass_rate = (passed / total) * 100
print(f"Pass rate: {pass_rate}%")
with open('results.json', 'w') as f:
json.dump({'results': results, 'pass_rate': pass_rate}, f, indent=2, ensure_ascii=False)
import re
def remove_simp_pattern_from_end(s):
pattern = r'@\[simp\s*.*?\]$'
return re.sub(pattern, '', s)
def main():
command_list = []
input_file = "/data/haiming/multilevel_isabelle-main/lean4/repl/self_autoformalization/data/mma_filepath/all_theorem.jsonl"
json_list = json.load(open(input_file, 'r', encoding='utf-8'))
for json_item in json_list[:10]:
try:
working_env = json_item["working_file"]
statement = json_item['statement'] + " " + json_item['proof']
json_item['cmd'] = '\n\n'.join([working_env, statement])
assert len(statement) > 0
# json_item['cmd'] = '\n'.join([working_env, json_item['total output'][0]])
except:
import pdb
pdb.set_trace()
command_list.append(json_item)
command_list = command_list
results = []
passed = 0
total = 0
multi( command_list)
if __name__ == '__main__':
main()