|
import os |
|
import subprocess |
|
from argparse import ArgumentParser |
|
import json |
|
from concurrent.futures import ThreadPoolExecutor |
|
from tqdm import tqdm |
|
import tempfile |
|
|
|
def wrapped_function(item): |
|
results = [] |
|
passed = 0 |
|
total = 0 |
|
|
|
temp_dir = tempfile.gettempdir() |
|
temp_file = os.path.join(temp_dir, f"test.lean") |
|
|
|
with open(temp_file, "w") as f: |
|
f.write(item['cmd']) |
|
|
|
|
|
|
|
|
|
|
|
|
|
data = '{"path": "%s", "allTactics": true}' %(temp_file) |
|
command = 'echo \'%s\' | lake exe repl' % data |
|
|
|
try: |
|
result = subprocess.run(command, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
|
stdout = result.stdout.decode('utf-8') |
|
stderr = result.stderr.decode('utf-8') |
|
|
|
json_stdout = json.loads(stdout) |
|
if "messages" not in json_stdout.keys(): |
|
passed += 1 |
|
|
|
results.append({ 'stdout': stdout, 'stderr': stderr, 'status': 'pass'}) |
|
except subprocess.CalledProcessError as e: |
|
|
|
results.append({ 'error': str(e), 'status': 'nopass'}) |
|
total += 1 |
|
|
|
pass_rate = passed / (passed + total) * 100 |
|
|
|
|
|
return {'results': results, 'pass_rate': pass_rate} |
|
|
|
|
|
|
|
|
|
|
|
|
|
def single(command_list): |
|
results = [] |
|
passed = 0 |
|
total = 0 |
|
for item in tqdm(command_list): |
|
with open("test/test.lean", "w", encoding = 'utf-8') as f: |
|
f.write(item['cmd']) |
|
data = '{"path": "test/test.lean", "allTactics": true}' |
|
|
|
command = 'echo \'%s\' | lake exe repl' % data |
|
import pdb |
|
pdb.set_trace() |
|
try: |
|
|
|
|
|
|
|
|
|
result = subprocess.run(command, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
|
stdout = result.stdout.decode('utf-8') |
|
json_stdout = json.loads(stdout) |
|
if "messages" not in json_stdout.keys(): |
|
passed += 1 |
|
stderr = result.stderr.decode('utf-8') |
|
results.append({ |
|
|
|
'stdout': stdout, |
|
'stderr': stderr, |
|
'status': 'pass' |
|
}) |
|
except subprocess.CalledProcessError as e: |
|
results.append({ |
|
|
|
'error': str(e), |
|
'status': 'nopass' |
|
}) |
|
total += 1 |
|
|
|
|
|
pass_rate = passed / total * 100 |
|
print(pass_rate) |
|
|
|
|
|
with open('results.json', 'w') as f: |
|
json.dump({'results': results, 'pass_rate': pass_rate}, f, indent=2, ensure_ascii=False) |
|
|
|
|
|
|
|
|
|
def multi(command_list): |
|
results = [] |
|
passed = 0 |
|
total = 0 |
|
def execute_command(item): |
|
temp_dir = '/data/tmp' |
|
temp_file = os.path.join(temp_dir, f"test_{item['index']}.lean") |
|
with open(temp_file, "w") as f: |
|
f.write(item['cmd']) |
|
|
|
data = '{"path": "%s", "allTactics": true}' % temp_file |
|
command = f'echo \'{data}\' | lake exe repl' |
|
|
|
try: |
|
result = subprocess.run(command, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
|
stdout = result.stdout.decode('utf-8') |
|
stderr = result.stderr.decode('utf-8') |
|
|
|
if "messages" not in json.loads(stdout): |
|
return {'stdout': stdout, 'stderr': stderr, 'status': 'pass'} |
|
else: |
|
return {'stdout': stdout, 'stderr': stderr, 'status': 'nopass'} |
|
|
|
except subprocess.CalledProcessError as e: |
|
return {'error': str(e), 'status': 'nopass'} |
|
|
|
os.remove(temp_file) |
|
|
|
total = len(command_list) |
|
|
|
with ThreadPoolExecutor(max_workers=32) as executor: |
|
futures = [executor.submit(execute_command, {'index': i, 'cmd': cmd['cmd']}) for i, cmd in enumerate(command_list)] |
|
for future in tqdm(futures, total=total, desc="Processing Commands"): |
|
result = future.result() |
|
results.append(result) |
|
if result['status'] == 'pass': |
|
passed += 1 |
|
|
|
pass_rate = (passed / total) * 100 |
|
print(f"Pass rate: {pass_rate}%") |
|
|
|
with open('results.json', 'w') as f: |
|
json.dump({'results': results, 'pass_rate': pass_rate}, f, indent=2, ensure_ascii=False) |
|
|
|
import re |
|
def remove_simp_pattern_from_end(s): |
|
pattern = r'@\[simp\s*.*?\]$' |
|
return re.sub(pattern, '', s) |
|
|
|
def main(): |
|
command_list = [] |
|
input_file = "/data/haiming/multilevel_isabelle-main/lean4/repl/self_autoformalization/data/mma_filepath/all_theorem.jsonl" |
|
json_list = json.load(open(input_file, 'r', encoding='utf-8')) |
|
for json_item in json_list[:10]: |
|
try: |
|
working_env = json_item["working_file"] |
|
|
|
statement = json_item['statement'] + " " + json_item['proof'] |
|
|
|
json_item['cmd'] = '\n\n'.join([working_env, statement]) |
|
assert len(statement) > 0 |
|
|
|
except: |
|
import pdb |
|
pdb.set_trace() |
|
command_list.append(json_item) |
|
command_list = command_list |
|
results = [] |
|
passed = 0 |
|
total = 0 |
|
multi( command_list) |
|
|
|
if __name__ == '__main__': |
|
main() |
|
|
|
|