import os import subprocess from argparse import ArgumentParser import json from concurrent.futures import ThreadPoolExecutor from tqdm import tqdm import tempfile def wrapped_function(item): results = [] passed = 0 total = 0 temp_dir = tempfile.gettempdir() temp_file = os.path.join(temp_dir, f"test.lean") with open(temp_file, "w") as f: f.write(item['cmd']) # Rest of the function code... # Process the item using the temporary file # ... # Clean up the temporary file data = '{"path": "%s", "allTactics": true}' %(temp_file) command = 'echo \'%s\' | lake exe repl' % data try: result = subprocess.run(command, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout = result.stdout.decode('utf-8') stderr = result.stderr.decode('utf-8') # stdout = result.stdout.decode('utf-8') json_stdout = json.loads(stdout) if "messages" not in json_stdout.keys(): passed += 1 # results.append({'item': item['content'], 'stdout': stdout, 'stderr': stderr, 'status': 'pass'}) results.append({ 'stdout': stdout, 'stderr': stderr, 'status': 'pass'}) except subprocess.CalledProcessError as e: # results.append({'item': item['content'], 'error': str(e), 'status': 'nopass'}) results.append({ 'error': str(e), 'status': 'nopass'}) total += 1 pass_rate = passed / (passed + total) * 100 return {'results': results, 'pass_rate': pass_rate} # Set the directory where your .lean files are located # Get a list of all .lean files in the directory # lean_files = [f for f in os.listdir(directory) if f.endswith(".lean")] # lean_files = ["test/file.lean"] def single(command_list): results = [] passed = 0 total = 0 for item in tqdm(command_list): with open("test/test.lean", "w", encoding = 'utf-8') as f: f.write(item['cmd']) data = '{"path": "test/test.lean", "allTactics": true}' # data = '{"cmd": "%s", "allTactics": true}' % item['cmd'] command = 'echo \'%s\' | lake exe repl' % data import pdb pdb.set_trace() try: # process = subprocess.Popen(['lake', 'exe', 'repl'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, # stderr=subprocess.PIPE) # stdout, stderr = process.communicate(input=data.encode(encoding='utf-8')) # stdout = stdout.decode('utf-8') result = subprocess.run(command, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout = result.stdout.decode('utf-8') json_stdout = json.loads(stdout) if "messages" not in json_stdout.keys(): passed += 1 stderr = result.stderr.decode('utf-8') results.append({ # 'item': item['content'], 'stdout': stdout, 'stderr': stderr, 'status': 'pass' }) except subprocess.CalledProcessError as e: results.append({ # 'item': item['content'], 'error': str(e), 'status': 'nopass' }) total += 1 # Calculate pass rate pass_rate = passed / total * 100 print(pass_rate) # Save results to a JSON file with open('results.json', 'w') as f: json.dump({'results': results, 'pass_rate': pass_rate}, f, indent=2, ensure_ascii=False) def multi(command_list): results = [] passed = 0 total = 0 def execute_command(item): temp_dir = '/data/tmp' temp_file = os.path.join(temp_dir, f"test_{item['index']}.lean") # Ensure unique filenames with open(temp_file, "w") as f: f.write(item['cmd']) data = '{"path": "%s", "allTactics": true}' % temp_file command = f'echo \'{data}\' | lake exe repl' try: result = subprocess.run(command, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout = result.stdout.decode('utf-8') stderr = result.stderr.decode('utf-8') if "messages" not in json.loads(stdout): return {'stdout': stdout, 'stderr': stderr, 'status': 'pass'} else: return {'stdout': stdout, 'stderr': stderr, 'status': 'nopass'} except subprocess.CalledProcessError as e: return {'error': str(e), 'status': 'nopass'} os.remove(temp_file) total = len(command_list) with ThreadPoolExecutor(max_workers=32) as executor: futures = [executor.submit(execute_command, {'index': i, 'cmd': cmd['cmd']}) for i, cmd in enumerate(command_list)] for future in tqdm(futures, total=total, desc="Processing Commands"): result = future.result() results.append(result) if result['status'] == 'pass': passed += 1 pass_rate = (passed / total) * 100 print(f"Pass rate: {pass_rate}%") with open('results.json', 'w') as f: json.dump({'results': results, 'pass_rate': pass_rate}, f, indent=2, ensure_ascii=False) import re def remove_simp_pattern_from_end(s): pattern = r'@\[simp\s*.*?\]$' return re.sub(pattern, '', s) def main(): command_list = [] input_file = "/data/haiming/multilevel_isabelle-main/lean4/repl/self_autoformalization/data/mma_filepath/all_theorem.jsonl" json_list = json.load(open(input_file, 'r', encoding='utf-8')) for json_item in json_list[:10]: try: working_env = json_item["working_file"] statement = json_item['statement'] + " " + json_item['proof'] json_item['cmd'] = '\n\n'.join([working_env, statement]) assert len(statement) > 0 # json_item['cmd'] = '\n'.join([working_env, json_item['total output'][0]]) except: import pdb pdb.set_trace() command_list.append(json_item) command_list = command_list results = [] passed = 0 total = 0 multi( command_list) if __name__ == '__main__': main()