|
import os |
|
import subprocess |
|
from argparse import ArgumentParser |
|
import json |
|
from concurrent.futures import ThreadPoolExecutor |
|
from tqdm import tqdm |
|
import tempfile |
|
|
|
def wrapped_function(item): |
|
results = [] |
|
passed = 0 |
|
total = 0 |
|
|
|
temp_dir = tempfile.gettempdir() |
|
temp_file = os.path.join(temp_dir, f"test.lean") |
|
|
|
with open(temp_file, "w") as f: |
|
f.write(item['cmd']) |
|
|
|
|
|
|
|
|
|
|
|
|
|
data = '{"path": "%s", "allTactics": true}' %(temp_file) |
|
command = 'echo \'%s\' | lake exe repl' % data |
|
|
|
try: |
|
result = subprocess.run(command, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
|
stdout = result.stdout.decode('utf-8') |
|
stderr = result.stderr.decode('utf-8') |
|
|
|
json_stdout = json.loads(stdout) |
|
if "messages" not in json_stdout.keys(): |
|
passed += 1 |
|
|
|
results.append({ 'stdout': stdout, 'stderr': stderr, 'status': 'pass'}) |
|
except subprocess.CalledProcessError as e: |
|
|
|
results.append({ 'error': str(e), 'status': 'nopass'}) |
|
total += 1 |
|
|
|
pass_rate = passed / (passed + total) * 100 |
|
|
|
|
|
return {'results': results, 'pass_rate': pass_rate} |
|
|
|
|
|
|
|
|
|
|
|
|
|
def single(command_list): |
|
results = [] |
|
passed = 0 |
|
total = 0 |
|
for item in tqdm(command_list): |
|
with open("test/test.lean", "w", encoding = 'utf-8') as f: |
|
f.write(item['cmd']) |
|
data = '{"path": "test/test.lean", "allTactics": true}' |
|
|
|
command = 'echo \'%s\' | lake exe repl' % data |
|
try: |
|
|
|
|
|
|
|
|
|
import pdb |
|
pdb.set_trace() |
|
result = subprocess.run(command, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
|
stdout = result.stdout.decode('utf-8') |
|
json_stdout = json.loads(stdout) |
|
if "messages" not in json_stdout.keys(): |
|
passed += 1 |
|
stderr = result.stderr.decode('utf-8') |
|
results.append({ |
|
|
|
'stdout': stdout, |
|
'stderr': stderr, |
|
'status': 'pass' |
|
}) |
|
except subprocess.CalledProcessError as e: |
|
results.append({ |
|
|
|
'error': str(e), |
|
'status': 'nopass' |
|
}) |
|
total += 1 |
|
|
|
|
|
pass_rate = passed / total * 100 |
|
print(pass_rate) |
|
|
|
|
|
with open('results.json', 'w') as f: |
|
json.dump({'results': results, 'pass_rate': pass_rate}, f, indent=2, ensure_ascii=False) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def multi(command_list): |
|
results = [] |
|
passed = 0 |
|
total = 0 |
|
def execute_command(item): |
|
temp_dir = '/hpc2hdd/home/zyang398/lujianqiao/lean4/repl/tmp' |
|
temp_file = os.path.join(temp_dir, f"test_{item['index']}.lean") |
|
with open(temp_file, "w") as f: |
|
f.write(item['cmd']) |
|
|
|
data = '{"path": "%s", "allTactics": true}' % temp_file |
|
command = f'echo \'{data}\' | lake exe repl' |
|
|
|
try: |
|
result = subprocess.run(command, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
|
stdout = result.stdout.decode('utf-8') |
|
stderr = result.stderr.decode('utf-8') |
|
|
|
if "messages" not in json.loads(stdout): |
|
return {'stdout': stdout, 'stderr': stderr, 'status': 'pass'} |
|
else: |
|
return {'stdout': stdout, 'stderr': stderr, 'status': 'nopass'} |
|
|
|
except subprocess.CalledProcessError as e: |
|
return {'error': str(e), 'status': 'nopass'} |
|
|
|
os.remove(temp_file) |
|
|
|
total = len(command_list) |
|
|
|
with ThreadPoolExecutor(max_workers=32) as executor: |
|
futures = [executor.submit(execute_command, {'index': i, 'cmd': cmd['cmd']}) for i, cmd in enumerate(command_list)] |
|
for future in tqdm(futures, total=total, desc="Processing Commands"): |
|
result = future.result() |
|
results.append(result) |
|
if result['status'] == 'pass': |
|
passed += 1 |
|
|
|
pass_rate = (passed / total) * 100 |
|
print(f"Pass rate: {pass_rate}%") |
|
|
|
with open('results.json', 'w') as f: |
|
json.dump({'results': results, 'pass_rate': pass_rate}, f, indent=2, ensure_ascii=False) |
|
|
|
import re |
|
def remove_simp_pattern_from_end(s): |
|
pattern = r'@\[simp\s*.*?\]$' |
|
return re.sub(pattern, '', s) |
|
|
|
def main(args): |
|
command_list = [] |
|
for i in range(args.cuda_num): |
|
with open(f"{args.input_path}/{i}.json", 'r', encoding='utf-8') as rf: |
|
for line in rf.readlines(): |
|
try: |
|
json_item = json.loads(line) |
|
|
|
|
|
working_env = json_item['content']['working_file'] |
|
|
|
|
|
statement = json_item['total output'][0] |
|
|
|
json_item['cmd'] = '\n'.join([working_env, statement]) |
|
|
|
assert len(statement) > 0 |
|
|
|
except: |
|
import pdb |
|
pdb.set_trace() |
|
command_list.append(json_item) |
|
command_list = command_list |
|
results = [] |
|
passed = 0 |
|
total = 0 |
|
single(command_list) |
|
|
|
if __name__ == '__main__': |
|
arg_parser = ArgumentParser() |
|
arg_parser.add_argument('--data_path', type=str, |
|
default='data/grade-school-math-master/grade_school_math/data/test.jsonl') |
|
arg_parser.add_argument('--input_path', type=str, default='') |
|
arg_parser.add_argument('--cuda_num', type=int, default=4) |
|
arg_parser.add_argument('--output_path', type=str, default='total.json') |
|
arg_parser.add_argument('--generate_method', type=str, |
|
choices=['single', 'sft', 'comp', 'self_consistency', 'single_consistency']) |
|
arg_parser.add_argument('--method', type=str, choices=['main', 'test', 'get_data']) |
|
args = arg_parser.parse_args() |
|
main(args) |
|
|
|
|
|
|